12 from framework import VppTestCase, VppTestRunner, running_extended_tests
13 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
14 from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
15 IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
17 from scapy.data import IP_PROTOS
18 from scapy.layers.inet import IP, TCP, UDP, ICMP
19 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
20 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
21 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
22 ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6
23 from scapy.layers.l2 import Ether, ARP, GRE
24 from scapy.packet import Raw
25 from syslog_rfc5424_parser import SyslogMessage, ParseError
26 from syslog_rfc5424_parser.constants import SyslogSeverity
27 from util import ip4_range
28 from util import ppc, ppp
29 from vpp_acl import AclRule, VppAcl, VppAclInterface
30 from vpp_ip_route import VppIpRoute, VppRoutePath
31 from vpp_neighbor import VppNeighbor
32 from vpp_papi import VppEnum
35 # NAT HA protocol event data
38 fields_desc = [ByteEnumField("event_type", None,
39 {1: "add", 2: "del", 3: "refresh"}),
40 ByteEnumField("protocol", None,
41 {0: "other", 1: "udp", 2: "tcp", 3: "icmp"}),
42 ShortField("flags", 0),
43 IPField("in_addr", None),
44 IPField("out_addr", None),
45 ShortField("in_port", None),
46 ShortField("out_port", None),
47 IPField("eh_addr", None),
48 IPField("ehn_addr", None),
49 ShortField("eh_port", None),
50 ShortField("ehn_port", None),
51 IntField("fib_index", None),
52 IntField("total_pkts", 0),
53 LongField("total_bytes", 0)]
55 def extract_padding(self, s):
59 # NAT HA protocol header
60 class HANATStateSync(Packet):
61 name = "HA NAT state sync"
62 fields_desc = [XByteField("version", 1),
63 FlagsField("flags", 0, 8, ['ACK']),
64 FieldLenField("count", None, count_of="events"),
65 IntField("sequence_number", 1),
66 IntField("thread_index", 0),
67 PacketListField("events", [], Event,
68 count_from=lambda pkt: pkt.count)]
71 class MethodHolder(VppTestCase):
72 """ NAT create capture and verify method holder """
75 def config_flags(self):
76 return VppEnum.vl_api_nat_config_flags_t
79 def nat44_config_flags(self):
80 return VppEnum.vl_api_nat44_config_flags_t
83 def SYSLOG_SEVERITY(self):
84 return VppEnum.vl_api_syslog_severity_t
86 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
87 local_port=0, external_port=0, vrf_id=0,
88 is_add=1, external_sw_if_index=0xFFFFFFFF,
89 proto=0, tag="", flags=0):
91 Add/delete NAT44 static mapping
93 :param local_ip: Local IP address
94 :param external_ip: External IP address
95 :param local_port: Local port number (Optional)
96 :param external_port: External port number (Optional)
97 :param vrf_id: VRF ID (Default 0)
98 :param is_add: 1 if add, 0 if delete (Default add)
99 :param external_sw_if_index: External interface instead of IP address
100 :param proto: IP protocol (Mandatory if port specified)
101 :param tag: Opaque string tag
102 :param flags: NAT configuration flags
105 if not (local_port and external_port):
106 flags |= self.config_flags.NAT_IS_ADDR_ONLY
108 self.vapi.nat44_add_del_static_mapping(
110 local_ip_address=local_ip,
111 external_ip_address=external_ip,
112 external_sw_if_index=external_sw_if_index,
113 local_port=local_port,
114 external_port=external_port,
115 vrf_id=vrf_id, protocol=proto,
119 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
121 Add/delete NAT44 address
123 :param ip: IP address
124 :param is_add: 1 if add, 0 if delete (Default add)
125 :param twice_nat: twice NAT address for external hosts
127 flags = self.config_flags.NAT_IS_TWICE_NAT if twice_nat else 0
128 self.vapi.nat44_add_del_address_range(first_ip_address=ip,
134 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
136 Create packet stream for inside network
138 :param in_if: Inside interface
139 :param out_if: Outside interface
140 :param dst_ip: Destination address
141 :param ttl: TTL of generated packets
144 dst_ip = out_if.remote_ip4
148 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
149 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
150 TCP(sport=self.tcp_port_in, dport=20))
154 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
155 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
156 UDP(sport=self.udp_port_in, dport=20))
160 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
161 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
162 ICMP(id=self.icmp_id_in, type='echo-request'))
167 def compose_ip6(self, ip4, pref, plen):
169 Compose IPv4-embedded IPv6 addresses
171 :param ip4: IPv4 address
172 :param pref: IPv6 prefix
173 :param plen: IPv6 prefix length
174 :returns: IPv4-embedded IPv6 addresses
176 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
177 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
192 pref_n[10] = ip4_n[3]
196 pref_n[10] = ip4_n[2]
197 pref_n[11] = ip4_n[3]
200 pref_n[10] = ip4_n[1]
201 pref_n[11] = ip4_n[2]
202 pref_n[12] = ip4_n[3]
204 pref_n[12] = ip4_n[0]
205 pref_n[13] = ip4_n[1]
206 pref_n[14] = ip4_n[2]
207 pref_n[15] = ip4_n[3]
208 packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n])
209 return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
211 def extract_ip4(self, ip6, plen):
213 Extract IPv4 address embedded in IPv6 addresses
215 :param ip6: IPv6 address
216 :param plen: IPv6 prefix length
217 :returns: extracted IPv4 address
219 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
251 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
253 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
254 use_inside_ports=False):
256 Create packet stream for outside network
258 :param out_if: Outside interface
259 :param dst_ip: Destination IP address (Default use global NAT address)
260 :param ttl: TTL of generated packets
261 :param use_inside_ports: Use inside NAT ports as destination ports
262 instead of outside ports
265 dst_ip = self.nat_addr
266 if not use_inside_ports:
267 tcp_port = self.tcp_port_out
268 udp_port = self.udp_port_out
269 icmp_id = self.icmp_id_out
271 tcp_port = self.tcp_port_in
272 udp_port = self.udp_port_in
273 icmp_id = self.icmp_id_in
276 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
277 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
278 TCP(dport=tcp_port, sport=20))
282 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
283 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
284 UDP(dport=udp_port, sport=20))
288 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
289 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
290 ICMP(id=icmp_id, type='echo-reply'))
295 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
297 Create packet stream for outside network
299 :param out_if: Outside interface
300 :param dst_ip: Destination IP address (Default use global NAT address)
301 :param hl: HL of generated packets
305 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
306 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
307 TCP(dport=self.tcp_port_out, sport=20))
311 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
312 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
313 UDP(dport=self.udp_port_out, sport=20))
317 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
318 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
319 ICMPv6EchoReply(id=self.icmp_id_out))
324 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
325 dst_ip=None, is_ip6=False, ignore_port=False):
327 Verify captured packets on outside network
329 :param capture: Captured packets
330 :param nat_ip: Translated IP address (Default use global NAT address)
331 :param same_port: Source port number is not translated (Default False)
332 :param dst_ip: Destination IP address (Default do not verify)
333 :param is_ip6: If L3 protocol is IPv6 (Default False)
337 ICMP46 = ICMPv6EchoRequest
342 nat_ip = self.nat_addr
343 for packet in capture:
346 self.assert_packet_checksums_valid(packet)
347 self.assertEqual(packet[IP46].src, nat_ip)
348 if dst_ip is not None:
349 self.assertEqual(packet[IP46].dst, dst_ip)
350 if packet.haslayer(TCP):
354 packet[TCP].sport, self.tcp_port_in)
357 packet[TCP].sport, self.tcp_port_in)
358 self.tcp_port_out = packet[TCP].sport
359 self.assert_packet_checksums_valid(packet)
360 elif packet.haslayer(UDP):
364 packet[UDP].sport, self.udp_port_in)
367 packet[UDP].sport, self.udp_port_in)
368 self.udp_port_out = packet[UDP].sport
373 packet[ICMP46].id, self.icmp_id_in)
376 packet[ICMP46].id, self.icmp_id_in)
377 self.icmp_id_out = packet[ICMP46].id
378 self.assert_packet_checksums_valid(packet)
380 self.logger.error(ppp("Unexpected or invalid packet "
381 "(outside network):", packet))
384 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
387 Verify captured packets on outside network
389 :param capture: Captured packets
390 :param nat_ip: Translated IP address
391 :param same_port: Source port number is not translated (Default False)
392 :param dst_ip: Destination IP address (Default do not verify)
394 return self.verify_capture_out(capture, nat_ip, same_port, dst_ip,
397 def verify_capture_in(self, capture, in_if):
399 Verify captured packets on inside network
401 :param capture: Captured packets
402 :param in_if: Inside interface
404 for packet in capture:
406 self.assert_packet_checksums_valid(packet)
407 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
408 if packet.haslayer(TCP):
409 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
410 elif packet.haslayer(UDP):
411 self.assertEqual(packet[UDP].dport, self.udp_port_in)
413 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
415 self.logger.error(ppp("Unexpected or invalid packet "
416 "(inside network):", packet))
419 def verify_capture_in_ip6(self, capture, src_ip, dst_ip):
421 Verify captured IPv6 packets on inside network
423 :param capture: Captured packets
424 :param src_ip: Source IP
425 :param dst_ip: Destination IP address
427 for packet in capture:
429 self.assertEqual(packet[IPv6].src, src_ip)
430 self.assertEqual(packet[IPv6].dst, dst_ip)
431 self.assert_packet_checksums_valid(packet)
432 if packet.haslayer(TCP):
433 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
434 elif packet.haslayer(UDP):
435 self.assertEqual(packet[UDP].dport, self.udp_port_in)
437 self.assertEqual(packet[ICMPv6EchoReply].id,
440 self.logger.error(ppp("Unexpected or invalid packet "
441 "(inside network):", packet))
444 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
446 Verify captured packet that don't have to be translated
448 :param capture: Captured packets
449 :param ingress_if: Ingress interface
450 :param egress_if: Egress interface
452 for packet in capture:
454 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
455 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
456 if packet.haslayer(TCP):
457 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
458 elif packet.haslayer(UDP):
459 self.assertEqual(packet[UDP].sport, self.udp_port_in)
461 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
463 self.logger.error(ppp("Unexpected or invalid packet "
464 "(inside network):", packet))
467 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
470 Verify captured packets with ICMP errors on outside network
472 :param capture: Captured packets
473 :param src_ip: Translated IP address or IP address of VPP
474 (Default use global NAT address)
475 :param icmp_type: Type of error ICMP packet
476 we are expecting (Default 11)
479 src_ip = self.nat_addr
480 for packet in capture:
482 self.assertEqual(packet[IP].src, src_ip)
483 self.assertEqual(packet.haslayer(ICMP), 1)
485 self.assertEqual(icmp.type, icmp_type)
486 self.assertTrue(icmp.haslayer(IPerror))
487 inner_ip = icmp[IPerror]
488 if inner_ip.haslayer(TCPerror):
489 self.assertEqual(inner_ip[TCPerror].dport,
491 elif inner_ip.haslayer(UDPerror):
492 self.assertEqual(inner_ip[UDPerror].dport,
495 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
497 self.logger.error(ppp("Unexpected or invalid packet "
498 "(outside network):", packet))
501 def verify_capture_in_with_icmp_errors(self, capture, in_if, icmp_type=11):
503 Verify captured packets with ICMP errors on inside network
505 :param capture: Captured packets
506 :param in_if: Inside interface
507 :param icmp_type: Type of error ICMP packet
508 we are expecting (Default 11)
510 for packet in capture:
512 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
513 self.assertEqual(packet.haslayer(ICMP), 1)
515 self.assertEqual(icmp.type, icmp_type)
516 self.assertTrue(icmp.haslayer(IPerror))
517 inner_ip = icmp[IPerror]
518 if inner_ip.haslayer(TCPerror):
519 self.assertEqual(inner_ip[TCPerror].sport,
521 elif inner_ip.haslayer(UDPerror):
522 self.assertEqual(inner_ip[UDPerror].sport,
525 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
527 self.logger.error(ppp("Unexpected or invalid packet "
528 "(inside network):", packet))
531 def create_stream_frag(self, src_if, dst, sport, dport, data,
532 proto=IP_PROTOS.tcp, echo_reply=False):
534 Create fragmented packet stream
536 :param src_if: Source interface
537 :param dst: Destination IPv4 address
538 :param sport: Source port
539 :param dport: Destination port
540 :param data: Payload data
541 :param proto: protocol (TCP, UDP, ICMP)
542 :param echo_reply: use echo_reply if protocol is ICMP
545 if proto == IP_PROTOS.tcp:
546 p = (IP(src=src_if.remote_ip4, dst=dst) /
547 TCP(sport=sport, dport=dport) /
549 p = p.__class__(scapy.compat.raw(p))
550 chksum = p[TCP].chksum
551 proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
552 elif proto == IP_PROTOS.udp:
553 proto_header = UDP(sport=sport, dport=dport)
554 elif proto == IP_PROTOS.icmp:
556 proto_header = ICMP(id=sport, type='echo-request')
558 proto_header = ICMP(id=sport, type='echo-reply')
560 raise Exception("Unsupported protocol")
561 id = random.randint(0, 65535)
563 if proto == IP_PROTOS.tcp:
566 raw = Raw(data[0:16])
567 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
568 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
572 if proto == IP_PROTOS.tcp:
573 raw = Raw(data[4:20])
575 raw = Raw(data[16:32])
576 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
577 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
581 if proto == IP_PROTOS.tcp:
585 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
586 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto,
592 def reass_frags_and_verify(self, frags, src, dst):
594 Reassemble and verify fragmented packet
596 :param frags: Captured fragments
597 :param src: Source IPv4 address to verify
598 :param dst: Destination IPv4 address to verify
600 :returns: Reassembled IPv4 packet
604 self.assertEqual(p[IP].src, src)
605 self.assertEqual(p[IP].dst, dst)
606 self.assert_ip_checksum_valid(p)
607 buffer.seek(p[IP].frag * 8)
608 buffer.write(bytes(p[IP].payload))
609 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
610 proto=frags[0][IP].proto)
611 if ip.proto == IP_PROTOS.tcp:
612 p = (ip / TCP(buffer.getvalue()))
613 self.logger.debug(ppp("Reassembled:", p))
614 self.assert_tcp_checksum_valid(p)
615 elif ip.proto == IP_PROTOS.udp:
616 p = (ip / UDP(buffer.getvalue()[:8]) /
617 Raw(buffer.getvalue()[8:]))
618 elif ip.proto == IP_PROTOS.icmp:
619 p = (ip / ICMP(buffer.getvalue()))
622 def reass_frags_and_verify_ip6(self, frags, src, dst):
624 Reassemble and verify fragmented packet
626 :param frags: Captured fragments
627 :param src: Source IPv6 address to verify
628 :param dst: Destination IPv6 address to verify
630 :returns: Reassembled IPv6 packet
634 self.assertEqual(p[IPv6].src, src)
635 self.assertEqual(p[IPv6].dst, dst)
636 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
637 buffer.write(bytes(p[IPv6ExtHdrFragment].payload))
638 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
639 nh=frags[0][IPv6ExtHdrFragment].nh)
640 if ip.nh == IP_PROTOS.tcp:
641 p = (ip / TCP(buffer.getvalue()))
642 elif ip.nh == IP_PROTOS.udp:
643 p = (ip / UDP(buffer.getvalue()))
644 self.logger.debug(ppp("Reassembled:", p))
645 self.assert_packet_checksums_valid(p)
648 def initiate_tcp_session(self, in_if, out_if):
650 Initiates TCP session
652 :param in_if: Inside interface
653 :param out_if: Outside interface
657 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
658 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
659 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
662 self.pg_enable_capture(self.pg_interfaces)
664 capture = out_if.get_capture(1)
666 self.tcp_port_out = p[TCP].sport
668 # SYN + ACK packet out->in
669 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
670 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
671 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
674 self.pg_enable_capture(self.pg_interfaces)
679 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
680 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
681 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
684 self.pg_enable_capture(self.pg_interfaces)
686 out_if.get_capture(1)
689 self.logger.error("TCP 3 way handshake failed")
692 def verify_ipfix_nat44_ses(self, data):
694 Verify IPFIX NAT44 session create/delete event
696 :param data: Decoded IPFIX data records
698 nat44_ses_create_num = 0
699 nat44_ses_delete_num = 0
700 self.assertEqual(6, len(data))
703 self.assertIn(scapy.compat.orb(record[230]), [4, 5])
704 if scapy.compat.orb(record[230]) == 4:
705 nat44_ses_create_num += 1
707 nat44_ses_delete_num += 1
709 self.assertEqual(self.pg0.remote_ip4,
710 str(ipaddress.IPv4Address(record[8])))
711 # postNATSourceIPv4Address
712 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
715 self.assertEqual(struct.pack("!I", 0), record[234])
716 # protocolIdentifier/sourceTransportPort
717 # /postNAPTSourceTransportPort
718 if IP_PROTOS.icmp == scapy.compat.orb(record[4]):
719 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
720 self.assertEqual(struct.pack("!H", self.icmp_id_out),
722 elif IP_PROTOS.tcp == scapy.compat.orb(record[4]):
723 self.assertEqual(struct.pack("!H", self.tcp_port_in),
725 self.assertEqual(struct.pack("!H", self.tcp_port_out),
727 elif IP_PROTOS.udp == scapy.compat.orb(record[4]):
728 self.assertEqual(struct.pack("!H", self.udp_port_in),
730 self.assertEqual(struct.pack("!H", self.udp_port_out),
733 self.fail("Invalid protocol")
734 self.assertEqual(3, nat44_ses_create_num)
735 self.assertEqual(3, nat44_ses_delete_num)
737 def verify_ipfix_addr_exhausted(self, data):
739 Verify IPFIX NAT addresses event
741 :param data: Decoded IPFIX data records
743 self.assertEqual(1, len(data))
746 self.assertEqual(scapy.compat.orb(record[230]), 3)
748 self.assertEqual(struct.pack("!I", 0), record[283])
750 def verify_ipfix_max_sessions(self, data, limit):
752 Verify IPFIX maximum session entries exceeded event
754 :param data: Decoded IPFIX data records
755 :param limit: Number of maximum session entries that can be created.
757 self.assertEqual(1, len(data))
760 self.assertEqual(scapy.compat.orb(record[230]), 13)
761 # natQuotaExceededEvent
762 self.assertEqual(struct.pack("I", 1), record[466])
764 self.assertEqual(struct.pack("I", limit), record[471])
766 def verify_ipfix_max_bibs(self, data, limit):
768 Verify IPFIX maximum BIB entries exceeded event
770 :param data: Decoded IPFIX data records
771 :param limit: Number of maximum BIB entries that can be created.
773 self.assertEqual(1, len(data))
776 self.assertEqual(scapy.compat.orb(record[230]), 13)
777 # natQuotaExceededEvent
778 self.assertEqual(struct.pack("I", 2), record[466])
780 self.assertEqual(struct.pack("I", limit), record[472])
782 def verify_no_nat44_user(self):
783 """ Verify that there is no NAT44 user """
784 users = self.vapi.nat44_user_dump()
785 self.assertEqual(len(users), 0)
786 users = self.statistics.get_counter('/nat44/total-users')
787 self.assertEqual(users[0][0], 0)
788 sessions = self.statistics.get_counter('/nat44/total-sessions')
789 self.assertEqual(sessions[0][0], 0)
791 def verify_ipfix_max_entries_per_user(self, data, limit, src_addr):
793 Verify IPFIX maximum entries per user exceeded event
795 :param data: Decoded IPFIX data records
796 :param limit: Number of maximum entries per user
797 :param src_addr: IPv4 source address
799 self.assertEqual(1, len(data))
802 self.assertEqual(scapy.compat.orb(record[230]), 13)
803 # natQuotaExceededEvent
804 self.assertEqual(struct.pack("I", 3), record[466])
806 self.assertEqual(struct.pack("I", limit), record[473])
808 self.assertEqual(socket.inet_pton(socket.AF_INET, src_addr), record[8])
810 def verify_syslog_apmap(self, data, is_add=True):
811 message = data.decode('utf-8')
813 message = SyslogMessage.parse(message)
814 except ParseError as e:
818 self.assertEqual(message.severity, SyslogSeverity.info)
819 self.assertEqual(message.appname, 'NAT')
820 self.assertEqual(message.msgid, 'APMADD' if is_add else 'APMDEL')
821 sd_params = message.sd.get('napmap')
822 self.assertTrue(sd_params is not None)
823 self.assertEqual(sd_params.get('IATYP'), 'IPv4')
824 self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip4)
825 self.assertEqual(sd_params.get('ISPORT'), "%d" % self.tcp_port_in)
826 self.assertEqual(sd_params.get('XATYP'), 'IPv4')
827 self.assertEqual(sd_params.get('XSADDR'), self.nat_addr)
828 self.assertEqual(sd_params.get('XSPORT'), "%d" % self.tcp_port_out)
829 self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp)
830 self.assertTrue(sd_params.get('SSUBIX') is not None)
831 self.assertEqual(sd_params.get('SVLAN'), '0')
833 def verify_syslog_sess(self, data, is_add=True, is_ip6=False):
834 message = data.decode('utf-8')
836 message = SyslogMessage.parse(message)
837 except ParseError as e:
841 self.assertEqual(message.severity, SyslogSeverity.info)
842 self.assertEqual(message.appname, 'NAT')
843 self.assertEqual(message.msgid, 'SADD' if is_add else 'SDEL')
844 sd_params = message.sd.get('nsess')
845 self.assertTrue(sd_params is not None)
847 self.assertEqual(sd_params.get('IATYP'), 'IPv6')
848 self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip6)
850 self.assertEqual(sd_params.get('IATYP'), 'IPv4')
851 self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip4)
852 self.assertTrue(sd_params.get('SSUBIX') is not None)
853 self.assertEqual(sd_params.get('ISPORT'), "%d" % self.tcp_port_in)
854 self.assertEqual(sd_params.get('XATYP'), 'IPv4')
855 self.assertEqual(sd_params.get('XSADDR'), self.nat_addr)
856 self.assertEqual(sd_params.get('XSPORT'), "%d" % self.tcp_port_out)
857 self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp)
858 self.assertEqual(sd_params.get('SVLAN'), '0')
859 self.assertEqual(sd_params.get('XDADDR'), self.pg1.remote_ip4)
860 self.assertEqual(sd_params.get('XDPORT'),
861 "%d" % self.tcp_external_port)
863 def verify_mss_value(self, pkt, mss):
870 if not pkt.haslayer(IP) or not pkt.haslayer(TCP):
871 raise TypeError("Not a TCP/IP packet")
873 for option in pkt[TCP].options:
874 if option[0] == 'MSS':
875 self.assertEqual(option[1], mss)
876 self.assert_tcp_checksum_valid(pkt)
879 def proto2layer(proto):
880 if proto == IP_PROTOS.tcp:
882 elif proto == IP_PROTOS.udp:
884 elif proto == IP_PROTOS.icmp:
887 raise Exception("Unsupported protocol")
889 def frag_in_order(self, proto=IP_PROTOS.tcp, dont_translate=False,
891 layer = self.proto2layer(proto)
893 if proto == IP_PROTOS.tcp:
894 data = b"A" * 4 + b"B" * 16 + b"C" * 3
896 data = b"A" * 16 + b"B" * 16 + b"C" * 3
897 self.port_in = random.randint(1025, 65535)
900 pkts = self.create_stream_frag(self.pg0, self.pg1.remote_ip4,
901 self.port_in, 20, data, proto)
902 self.pg0.add_stream(pkts)
903 self.pg_enable_capture(self.pg_interfaces)
905 frags = self.pg1.get_capture(len(pkts))
906 if not dont_translate:
907 p = self.reass_frags_and_verify(frags,
911 p = self.reass_frags_and_verify(frags,
914 if proto != IP_PROTOS.icmp:
915 if not dont_translate:
916 self.assertEqual(p[layer].dport, 20)
918 self.assertNotEqual(p[layer].sport, self.port_in)
920 self.assertEqual(p[layer].sport, self.port_in)
923 if not dont_translate:
924 self.assertNotEqual(p[layer].id, self.port_in)
926 self.assertEqual(p[layer].id, self.port_in)
927 self.assertEqual(data, p[Raw].load)
930 if not dont_translate:
931 dst_addr = self.nat_addr
933 dst_addr = self.pg0.remote_ip4
934 if proto != IP_PROTOS.icmp:
936 dport = p[layer].sport
940 pkts = self.create_stream_frag(self.pg1, dst_addr, sport, dport, data,
941 proto, echo_reply=True)
942 self.pg1.add_stream(pkts)
943 self.pg_enable_capture(self.pg_interfaces)
945 frags = self.pg0.get_capture(len(pkts))
946 p = self.reass_frags_and_verify(frags,
949 if proto != IP_PROTOS.icmp:
950 self.assertEqual(p[layer].sport, 20)
951 self.assertEqual(p[layer].dport, self.port_in)
953 self.assertEqual(p[layer].id, self.port_in)
954 self.assertEqual(data, p[Raw].load)
956 def frag_in_order_in_plus_out(self, proto=IP_PROTOS.tcp):
957 layer = self.proto2layer(proto)
959 if proto == IP_PROTOS.tcp:
960 data = b"A" * 4 + b"B" * 16 + b"C" * 3
962 data = b"A" * 16 + b"B" * 16 + b"C" * 3
963 self.port_in = random.randint(1025, 65535)
967 pkts = self.create_stream_frag(self.pg0, self.server_out_addr,
968 self.port_in, self.server_out_port,
970 self.pg0.add_stream(pkts)
971 self.pg_enable_capture(self.pg_interfaces)
973 frags = self.pg1.get_capture(len(pkts))
974 p = self.reass_frags_and_verify(frags,
977 if proto != IP_PROTOS.icmp:
978 self.assertEqual(p[layer].sport, self.port_in)
979 self.assertEqual(p[layer].dport, self.server_in_port)
981 self.assertEqual(p[layer].id, self.port_in)
982 self.assertEqual(data, p[Raw].load)
985 if proto != IP_PROTOS.icmp:
986 pkts = self.create_stream_frag(self.pg1, self.pg0.remote_ip4,
988 p[layer].sport, data, proto)
990 pkts = self.create_stream_frag(self.pg1, self.pg0.remote_ip4,
991 p[layer].id, 0, data, proto,
993 self.pg1.add_stream(pkts)
994 self.pg_enable_capture(self.pg_interfaces)
996 frags = self.pg0.get_capture(len(pkts))
997 p = self.reass_frags_and_verify(frags,
998 self.server_out_addr,
1000 if proto != IP_PROTOS.icmp:
1001 self.assertEqual(p[layer].sport, self.server_out_port)
1002 self.assertEqual(p[layer].dport, self.port_in)
1004 self.assertEqual(p[layer].id, self.port_in)
1005 self.assertEqual(data, p[Raw].load)
1007 def reass_hairpinning(self, proto=IP_PROTOS.tcp, ignore_port=False):
1008 layer = self.proto2layer(proto)
1010 if proto == IP_PROTOS.tcp:
1011 data = b"A" * 4 + b"B" * 16 + b"C" * 3
1013 data = b"A" * 16 + b"B" * 16 + b"C" * 3
1015 # send packet from host to server
1016 pkts = self.create_stream_frag(self.pg0,
1019 self.server_out_port,
1022 self.pg0.add_stream(pkts)
1023 self.pg_enable_capture(self.pg_interfaces)
1025 frags = self.pg0.get_capture(len(pkts))
1026 p = self.reass_frags_and_verify(frags,
1029 if proto != IP_PROTOS.icmp:
1031 self.assertNotEqual(p[layer].sport, self.host_in_port)
1032 self.assertEqual(p[layer].dport, self.server_in_port)
1035 self.assertNotEqual(p[layer].id, self.host_in_port)
1036 self.assertEqual(data, p[Raw].load)
1038 def frag_out_of_order(self, proto=IP_PROTOS.tcp, dont_translate=False,
1040 layer = self.proto2layer(proto)
1042 if proto == IP_PROTOS.tcp:
1043 data = b"A" * 4 + b"B" * 16 + b"C" * 3
1045 data = b"A" * 16 + b"B" * 16 + b"C" * 3
1046 self.port_in = random.randint(1025, 65535)
1050 pkts = self.create_stream_frag(self.pg0, self.pg1.remote_ip4,
1051 self.port_in, 20, data, proto)
1053 self.pg0.add_stream(pkts)
1054 self.pg_enable_capture(self.pg_interfaces)
1056 frags = self.pg1.get_capture(len(pkts))
1057 if not dont_translate:
1058 p = self.reass_frags_and_verify(frags,
1060 self.pg1.remote_ip4)
1062 p = self.reass_frags_and_verify(frags,
1063 self.pg0.remote_ip4,
1064 self.pg1.remote_ip4)
1065 if proto != IP_PROTOS.icmp:
1066 if not dont_translate:
1067 self.assertEqual(p[layer].dport, 20)
1069 self.assertNotEqual(p[layer].sport, self.port_in)
1071 self.assertEqual(p[layer].sport, self.port_in)
1074 if not dont_translate:
1075 self.assertNotEqual(p[layer].id, self.port_in)
1077 self.assertEqual(p[layer].id, self.port_in)
1078 self.assertEqual(data, p[Raw].load)
1081 if not dont_translate:
1082 dst_addr = self.nat_addr
1084 dst_addr = self.pg0.remote_ip4
1085 if proto != IP_PROTOS.icmp:
1087 dport = p[layer].sport
1091 pkts = self.create_stream_frag(self.pg1, dst_addr, sport, dport,
1092 data, proto, echo_reply=True)
1094 self.pg1.add_stream(pkts)
1095 self.pg_enable_capture(self.pg_interfaces)
1097 frags = self.pg0.get_capture(len(pkts))
1098 p = self.reass_frags_and_verify(frags,
1099 self.pg1.remote_ip4,
1100 self.pg0.remote_ip4)
1101 if proto != IP_PROTOS.icmp:
1102 self.assertEqual(p[layer].sport, 20)
1103 self.assertEqual(p[layer].dport, self.port_in)
1105 self.assertEqual(p[layer].id, self.port_in)
1106 self.assertEqual(data, p[Raw].load)
1108 def frag_out_of_order_in_plus_out(self, proto=IP_PROTOS.tcp):
1109 layer = self.proto2layer(proto)
1111 if proto == IP_PROTOS.tcp:
1112 data = b"A" * 4 + b"B" * 16 + b"C" * 3
1114 data = b"A" * 16 + b"B" * 16 + b"C" * 3
1115 self.port_in = random.randint(1025, 65535)
1119 pkts = self.create_stream_frag(self.pg0, self.server_out_addr,
1120 self.port_in, self.server_out_port,
1123 self.pg0.add_stream(pkts)
1124 self.pg_enable_capture(self.pg_interfaces)
1126 frags = self.pg1.get_capture(len(pkts))
1127 p = self.reass_frags_and_verify(frags,
1128 self.pg0.remote_ip4,
1129 self.server_in_addr)
1130 if proto != IP_PROTOS.icmp:
1131 self.assertEqual(p[layer].dport, self.server_in_port)
1132 self.assertEqual(p[layer].sport, self.port_in)
1133 self.assertEqual(p[layer].dport, self.server_in_port)
1135 self.assertEqual(p[layer].id, self.port_in)
1136 self.assertEqual(data, p[Raw].load)
1139 if proto != IP_PROTOS.icmp:
1140 pkts = self.create_stream_frag(self.pg1, self.pg0.remote_ip4,
1141 self.server_in_port,
1142 p[layer].sport, data, proto)
1144 pkts = self.create_stream_frag(self.pg1, self.pg0.remote_ip4,
1145 p[layer].id, 0, data, proto,
1148 self.pg1.add_stream(pkts)
1149 self.pg_enable_capture(self.pg_interfaces)
1151 frags = self.pg0.get_capture(len(pkts))
1152 p = self.reass_frags_and_verify(frags,
1153 self.server_out_addr,
1154 self.pg0.remote_ip4)
1155 if proto != IP_PROTOS.icmp:
1156 self.assertEqual(p[layer].sport, self.server_out_port)
1157 self.assertEqual(p[layer].dport, self.port_in)
1159 self.assertEqual(p[layer].id, self.port_in)
1160 self.assertEqual(data, p[Raw].load)
1163 class TestNATMisc(MethodHolder):
1164 """ NAT misc Test Cases """
1166 max_translations = 10240
1170 super(TestNATMisc, self).setUp()
1171 self.vapi.nat44_plugin_enable_disable(
1172 sessions=self.max_translations,
1173 users=self.max_users, enable=1)
1176 super(TestNATMisc, self).tearDown()
1177 if not self.vpp_dead:
1178 self.vapi.nat44_plugin_enable_disable(enable=0)
1179 self.vapi.cli("clear logging")
1181 def test_show_max_translations(self):
1182 """ API test - max translations per thread """
1183 nat_config = self.vapi.nat_show_config_2()
1184 self.assertEqual(self.max_translations,
1185 nat_config.max_translations_per_thread)
1188 class TestNAT44(MethodHolder):
1189 """ NAT44 Test Cases """
1191 max_translations = 10240
1195 def setUpClass(cls):
1196 super(TestNAT44, cls).setUpClass()
1197 cls.vapi.cli("set log class nat level debug")
1199 cls.tcp_port_in = 6303
1200 cls.tcp_port_out = 6303
1201 cls.udp_port_in = 6304
1202 cls.udp_port_out = 6304
1203 cls.icmp_id_in = 6305
1204 cls.icmp_id_out = 6305
1205 cls.nat_addr = '10.0.0.3'
1206 cls.ipfix_src_port = 4739
1207 cls.ipfix_domain_id = 1
1208 cls.tcp_external_port = 80
1209 cls.udp_external_port = 69
1211 cls.create_pg_interfaces(range(10))
1212 cls.interfaces = list(cls.pg_interfaces[0:4])
1214 for i in cls.interfaces:
1219 cls.pg0.generate_remote_hosts(3)
1220 cls.pg0.configure_ipv4_neighbors()
1222 cls.pg1.generate_remote_hosts(1)
1223 cls.pg1.configure_ipv4_neighbors()
1225 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
1226 cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 10})
1227 cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 20})
1229 cls.pg4._local_ip4 = "172.16.255.1"
1230 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
1231 cls.pg4.set_table_ip4(10)
1232 cls.pg5._local_ip4 = "172.17.255.3"
1233 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
1234 cls.pg5.set_table_ip4(10)
1235 cls.pg6._local_ip4 = "172.16.255.1"
1236 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
1237 cls.pg6.set_table_ip4(20)
1238 for i in cls.overlapping_interfaces:
1246 cls.pg9.generate_remote_hosts(2)
1247 cls.pg9.config_ip4()
1248 cls.vapi.sw_interface_add_del_address(
1249 sw_if_index=cls.pg9.sw_if_index,
1250 prefix="10.0.0.1/24")
1253 cls.pg9.resolve_arp()
1254 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
1255 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
1256 cls.pg9.resolve_arp()
1259 super(TestNAT44, self).setUp()
1260 self.vapi.nat44_plugin_enable_disable(
1261 sessions=self.max_translations,
1262 users=self.max_users, enable=1)
1265 def tearDownClass(cls):
1266 super(TestNAT44, cls).tearDownClass()
1269 super(TestNAT44, self).tearDown()
1270 if not self.vpp_dead:
1271 self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
1272 src_port=self.ipfix_src_port,
1274 self.ipfix_src_port = 4739
1275 self.ipfix_domain_id = 1
1277 self.vapi.nat44_plugin_enable_disable(enable=0)
1278 self.vapi.cli("clear logging")
1280 def test_clear_sessions(self):
1281 """ NAT44 session clearing test """
1283 self.nat44_add_address(self.nat_addr)
1284 flags = self.config_flags.NAT_IS_INSIDE
1285 self.vapi.nat44_interface_add_del_feature(
1286 sw_if_index=self.pg0.sw_if_index,
1287 flags=flags, is_add=1)
1288 self.vapi.nat44_interface_add_del_feature(
1289 sw_if_index=self.pg1.sw_if_index,
1292 nat_config = self.vapi.nat_show_config()
1293 self.assertEqual(0, nat_config.endpoint_dependent)
1295 pkts = self.create_stream_in(self.pg0, self.pg1)
1296 self.pg0.add_stream(pkts)
1297 self.pg_enable_capture(self.pg_interfaces)
1299 capture = self.pg1.get_capture(len(pkts))
1300 self.verify_capture_out(capture)
1302 sessions = self.statistics.get_counter('/nat44/total-sessions')
1303 self.assertTrue(sessions[0][0] > 0)
1304 self.logger.info("sessions before clearing: %s" % sessions[0][0])
1306 self.vapi.cli("clear nat44 sessions")
1308 sessions = self.statistics.get_counter('/nat44/total-sessions')
1309 self.assertEqual(sessions[0][0], 0)
1310 self.logger.info("sessions after clearing: %s" % sessions[0][0])
1312 def test_dynamic(self):
1313 """ NAT44 dynamic translation test """
1314 self.nat44_add_address(self.nat_addr)
1315 flags = self.config_flags.NAT_IS_INSIDE
1316 self.vapi.nat44_interface_add_del_feature(
1317 sw_if_index=self.pg0.sw_if_index,
1318 flags=flags, is_add=1)
1319 self.vapi.nat44_interface_add_del_feature(
1320 sw_if_index=self.pg1.sw_if_index,
1324 tcpn = self.statistics.get_counter('/nat44/in2out/slowpath/tcp')[0]
1325 udpn = self.statistics.get_counter('/nat44/in2out/slowpath/udp')[0]
1326 icmpn = self.statistics.get_counter('/nat44/in2out/slowpath/icmp')[0]
1327 drops = self.statistics.get_counter('/nat44/in2out/slowpath/drops')[0]
1329 pkts = self.create_stream_in(self.pg0, self.pg1)
1330 self.pg0.add_stream(pkts)
1331 self.pg_enable_capture(self.pg_interfaces)
1333 capture = self.pg1.get_capture(len(pkts))
1334 self.verify_capture_out(capture)
1336 if_idx = self.pg0.sw_if_index
1337 cnt = self.statistics.get_counter('/nat44/in2out/slowpath/tcp')[0]
1338 self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
1339 cnt = self.statistics.get_counter('/nat44/in2out/slowpath/udp')[0]
1340 self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
1341 cnt = self.statistics.get_counter('/nat44/in2out/slowpath/icmp')[0]
1342 self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
1343 cnt = self.statistics.get_counter('/nat44/in2out/slowpath/drops')[0]
1344 self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
1347 tcpn = self.statistics.get_counter('/nat44/out2in/slowpath/tcp')[0]
1348 udpn = self.statistics.get_counter('/nat44/out2in/slowpath/udp')[0]
1349 icmpn = self.statistics.get_counter('/nat44/out2in/slowpath/icmp')[0]
1350 drops = self.statistics.get_counter('/nat44/out2in/slowpath/drops')[0]
1352 pkts = self.create_stream_out(self.pg1)
1353 self.pg1.add_stream(pkts)
1354 self.pg_enable_capture(self.pg_interfaces)
1356 capture = self.pg0.get_capture(len(pkts))
1357 self.verify_capture_in(capture, self.pg0)
1359 if_idx = self.pg1.sw_if_index
1360 cnt = self.statistics.get_counter('/nat44/out2in/slowpath/tcp')[0]
1361 self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
1362 cnt = self.statistics.get_counter('/nat44/out2in/slowpath/udp')[0]
1363 self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
1364 cnt = self.statistics.get_counter('/nat44/out2in/slowpath/icmp')[0]
1365 self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
1366 cnt = self.statistics.get_counter('/nat44/out2in/slowpath/drops')[0]
1367 self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
1369 users = self.statistics.get_counter('/nat44/total-users')
1370 self.assertEqual(users[0][0], 1)
1371 sessions = self.statistics.get_counter('/nat44/total-sessions')
1372 self.assertEqual(sessions[0][0], 3)
1374 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1375 """ NAT44 handling of client packets with TTL=1 """
1377 self.nat44_add_address(self.nat_addr)
1378 flags = self.config_flags.NAT_IS_INSIDE
1379 self.vapi.nat44_interface_add_del_feature(
1380 sw_if_index=self.pg0.sw_if_index,
1381 flags=flags, is_add=1)
1382 self.vapi.nat44_interface_add_del_feature(
1383 sw_if_index=self.pg1.sw_if_index,
1386 # Client side - generate traffic
1387 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1388 self.pg0.add_stream(pkts)
1389 self.pg_enable_capture(self.pg_interfaces)
1392 # Client side - verify ICMP type 11 packets
1393 capture = self.pg0.get_capture(len(pkts))
1394 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1396 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1397 """ NAT44 handling of server packets with TTL=1 """
1399 self.nat44_add_address(self.nat_addr)
1400 flags = self.config_flags.NAT_IS_INSIDE
1401 self.vapi.nat44_interface_add_del_feature(
1402 sw_if_index=self.pg0.sw_if_index,
1403 flags=flags, is_add=1)
1404 self.vapi.nat44_interface_add_del_feature(
1405 sw_if_index=self.pg1.sw_if_index,
1408 # Client side - create sessions
1409 pkts = self.create_stream_in(self.pg0, self.pg1)
1410 self.pg0.add_stream(pkts)
1411 self.pg_enable_capture(self.pg_interfaces)
1414 # Server side - generate traffic
1415 capture = self.pg1.get_capture(len(pkts))
1416 self.verify_capture_out(capture)
1417 pkts = self.create_stream_out(self.pg1, ttl=1)
1418 self.pg1.add_stream(pkts)
1419 self.pg_enable_capture(self.pg_interfaces)
1422 # Server side - verify ICMP type 11 packets
1423 capture = self.pg1.get_capture(len(pkts))
1424 self.verify_capture_out_with_icmp_errors(capture,
1425 src_ip=self.pg1.local_ip4)
1427 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1428 """ NAT44 handling of error responses to client packets with TTL=2 """
1430 self.nat44_add_address(self.nat_addr)
1431 flags = self.config_flags.NAT_IS_INSIDE
1432 self.vapi.nat44_interface_add_del_feature(
1433 sw_if_index=self.pg0.sw_if_index,
1434 flags=flags, is_add=1)
1435 self.vapi.nat44_interface_add_del_feature(
1436 sw_if_index=self.pg1.sw_if_index,
1439 # Client side - generate traffic
1440 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1441 self.pg0.add_stream(pkts)
1442 self.pg_enable_capture(self.pg_interfaces)
1445 # Server side - simulate ICMP type 11 response
1446 capture = self.pg1.get_capture(len(pkts))
1447 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1448 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1449 ICMP(type=11) / packet[IP] for packet in capture]
1450 self.pg1.add_stream(pkts)
1451 self.pg_enable_capture(self.pg_interfaces)
1454 # Client side - verify ICMP type 11 packets
1455 capture = self.pg0.get_capture(len(pkts))
1456 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1458 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1459 """ NAT44 handling of error responses to server packets with TTL=2 """
1461 self.nat44_add_address(self.nat_addr)
1462 flags = self.config_flags.NAT_IS_INSIDE
1463 self.vapi.nat44_interface_add_del_feature(
1464 sw_if_index=self.pg0.sw_if_index,
1465 flags=flags, is_add=1)
1466 self.vapi.nat44_interface_add_del_feature(
1467 sw_if_index=self.pg1.sw_if_index,
1470 # Client side - create sessions
1471 pkts = self.create_stream_in(self.pg0, self.pg1)
1472 self.pg0.add_stream(pkts)
1473 self.pg_enable_capture(self.pg_interfaces)
1476 # Server side - generate traffic
1477 capture = self.pg1.get_capture(len(pkts))
1478 self.verify_capture_out(capture)
1479 pkts = self.create_stream_out(self.pg1, ttl=2)
1480 self.pg1.add_stream(pkts)
1481 self.pg_enable_capture(self.pg_interfaces)
1484 # Client side - simulate ICMP type 11 response
1485 capture = self.pg0.get_capture(len(pkts))
1486 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1487 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1488 ICMP(type=11) / packet[IP] for packet in capture]
1489 self.pg0.add_stream(pkts)
1490 self.pg_enable_capture(self.pg_interfaces)
1493 # Server side - verify ICMP type 11 packets
1494 capture = self.pg1.get_capture(len(pkts))
1495 self.verify_capture_out_with_icmp_errors(capture)
1497 def test_ping_out_interface_from_outside(self):
1498 """ Ping NAT44 out interface from outside network """
1500 self.nat44_add_address(self.nat_addr)
1501 flags = self.config_flags.NAT_IS_INSIDE
1502 self.vapi.nat44_interface_add_del_feature(
1503 sw_if_index=self.pg0.sw_if_index,
1504 flags=flags, is_add=1)
1505 self.vapi.nat44_interface_add_del_feature(
1506 sw_if_index=self.pg1.sw_if_index,
1509 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1510 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1511 ICMP(id=self.icmp_id_out, type='echo-request'))
1513 self.pg1.add_stream(pkts)
1514 self.pg_enable_capture(self.pg_interfaces)
1516 capture = self.pg1.get_capture(len(pkts))
1519 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1520 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1521 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1522 self.assertEqual(packet[ICMP].type, 0) # echo reply
1524 self.logger.error(ppp("Unexpected or invalid packet "
1525 "(outside network):", packet))
1528 def test_ping_internal_host_from_outside(self):
1529 """ Ping internal host from outside network """
1531 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1532 flags = self.config_flags.NAT_IS_INSIDE
1533 self.vapi.nat44_interface_add_del_feature(
1534 sw_if_index=self.pg0.sw_if_index,
1535 flags=flags, is_add=1)
1536 self.vapi.nat44_interface_add_del_feature(
1537 sw_if_index=self.pg1.sw_if_index,
1541 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1542 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1543 ICMP(id=self.icmp_id_out, type='echo-request'))
1544 self.pg1.add_stream(pkt)
1545 self.pg_enable_capture(self.pg_interfaces)
1547 capture = self.pg0.get_capture(1)
1548 self.verify_capture_in(capture, self.pg0)
1549 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1552 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1553 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1554 ICMP(id=self.icmp_id_in, type='echo-reply'))
1555 self.pg0.add_stream(pkt)
1556 self.pg_enable_capture(self.pg_interfaces)
1558 capture = self.pg1.get_capture(1)
1559 self.verify_capture_out(capture, same_port=True)
1560 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1562 def test_forwarding(self):
1563 """ NAT44 forwarding test """
1565 flags = self.config_flags.NAT_IS_INSIDE
1566 self.vapi.nat44_interface_add_del_feature(
1567 sw_if_index=self.pg0.sw_if_index,
1568 flags=flags, is_add=1)
1569 self.vapi.nat44_interface_add_del_feature(
1570 sw_if_index=self.pg1.sw_if_index,
1572 self.vapi.nat44_forwarding_enable_disable(enable=1)
1574 real_ip = self.pg0.remote_ip4
1575 alias_ip = self.nat_addr
1576 flags = self.config_flags.NAT_IS_ADDR_ONLY
1577 self.vapi.nat44_add_del_static_mapping(is_add=1,
1578 local_ip_address=real_ip,
1579 external_ip_address=alias_ip,
1580 external_sw_if_index=0xFFFFFFFF,
1584 # static mapping match
1586 pkts = self.create_stream_out(self.pg1)
1587 self.pg1.add_stream(pkts)
1588 self.pg_enable_capture(self.pg_interfaces)
1590 capture = self.pg0.get_capture(len(pkts))
1591 self.verify_capture_in(capture, self.pg0)
1593 pkts = self.create_stream_in(self.pg0, self.pg1)
1594 self.pg0.add_stream(pkts)
1595 self.pg_enable_capture(self.pg_interfaces)
1597 capture = self.pg1.get_capture(len(pkts))
1598 self.verify_capture_out(capture, same_port=True)
1600 # no static mapping match
1602 host0 = self.pg0.remote_hosts[0]
1603 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1605 pkts = self.create_stream_out(self.pg1,
1606 dst_ip=self.pg0.remote_ip4,
1607 use_inside_ports=True)
1608 self.pg1.add_stream(pkts)
1609 self.pg_enable_capture(self.pg_interfaces)
1611 capture = self.pg0.get_capture(len(pkts))
1612 self.verify_capture_in(capture, self.pg0)
1614 pkts = self.create_stream_in(self.pg0, self.pg1)
1615 self.pg0.add_stream(pkts)
1616 self.pg_enable_capture(self.pg_interfaces)
1618 capture = self.pg1.get_capture(len(pkts))
1619 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1622 self.pg0.remote_hosts[0] = host0
1625 self.vapi.nat44_forwarding_enable_disable(enable=0)
1626 flags = self.config_flags.NAT_IS_ADDR_ONLY
1627 self.vapi.nat44_add_del_static_mapping(
1629 local_ip_address=real_ip,
1630 external_ip_address=alias_ip,
1631 external_sw_if_index=0xFFFFFFFF,
1634 def test_static_in(self):
1635 """ 1:1 NAT initialized from inside network """
1637 nat_ip = "10.0.0.10"
1638 self.tcp_port_out = 6303
1639 self.udp_port_out = 6304
1640 self.icmp_id_out = 6305
1642 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1643 flags = self.config_flags.NAT_IS_INSIDE
1644 self.vapi.nat44_interface_add_del_feature(
1645 sw_if_index=self.pg0.sw_if_index,
1646 flags=flags, is_add=1)
1647 self.vapi.nat44_interface_add_del_feature(
1648 sw_if_index=self.pg1.sw_if_index,
1650 sm = self.vapi.nat44_static_mapping_dump()
1651 self.assertEqual(len(sm), 1)
1652 self.assertEqual(sm[0].tag, '')
1653 self.assertEqual(sm[0].protocol, 0)
1654 self.assertEqual(sm[0].local_port, 0)
1655 self.assertEqual(sm[0].external_port, 0)
1658 pkts = self.create_stream_in(self.pg0, self.pg1)
1659 self.pg0.add_stream(pkts)
1660 self.pg_enable_capture(self.pg_interfaces)
1662 capture = self.pg1.get_capture(len(pkts))
1663 self.verify_capture_out(capture, nat_ip, True)
1666 pkts = self.create_stream_out(self.pg1, nat_ip)
1667 self.pg1.add_stream(pkts)
1668 self.pg_enable_capture(self.pg_interfaces)
1670 capture = self.pg0.get_capture(len(pkts))
1671 self.verify_capture_in(capture, self.pg0)
1673 def test_static_out(self):
1674 """ 1:1 NAT initialized from outside network """
1676 nat_ip = "10.0.0.20"
1677 self.tcp_port_out = 6303
1678 self.udp_port_out = 6304
1679 self.icmp_id_out = 6305
1682 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1683 flags = self.config_flags.NAT_IS_INSIDE
1684 self.vapi.nat44_interface_add_del_feature(
1685 sw_if_index=self.pg0.sw_if_index,
1686 flags=flags, is_add=1)
1687 self.vapi.nat44_interface_add_del_feature(
1688 sw_if_index=self.pg1.sw_if_index,
1690 sm = self.vapi.nat44_static_mapping_dump()
1691 self.assertEqual(len(sm), 1)
1692 self.assertEqual(sm[0].tag, tag)
1695 pkts = self.create_stream_out(self.pg1, nat_ip)
1696 self.pg1.add_stream(pkts)
1697 self.pg_enable_capture(self.pg_interfaces)
1699 capture = self.pg0.get_capture(len(pkts))
1700 self.verify_capture_in(capture, self.pg0)
1703 pkts = self.create_stream_in(self.pg0, self.pg1)
1704 self.pg0.add_stream(pkts)
1705 self.pg_enable_capture(self.pg_interfaces)
1707 capture = self.pg1.get_capture(len(pkts))
1708 self.verify_capture_out(capture, nat_ip, True)
1710 def test_static_with_port_in(self):
1711 """ 1:1 NAPT initialized from inside network """
1713 self.tcp_port_out = 3606
1714 self.udp_port_out = 3607
1715 self.icmp_id_out = 3608
1717 self.nat44_add_address(self.nat_addr)
1718 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1719 self.tcp_port_in, self.tcp_port_out,
1720 proto=IP_PROTOS.tcp)
1721 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1722 self.udp_port_in, self.udp_port_out,
1723 proto=IP_PROTOS.udp)
1724 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1725 self.icmp_id_in, self.icmp_id_out,
1726 proto=IP_PROTOS.icmp)
1727 flags = self.config_flags.NAT_IS_INSIDE
1728 self.vapi.nat44_interface_add_del_feature(
1729 sw_if_index=self.pg0.sw_if_index,
1730 flags=flags, is_add=1)
1731 self.vapi.nat44_interface_add_del_feature(
1732 sw_if_index=self.pg1.sw_if_index,
1736 pkts = self.create_stream_in(self.pg0, self.pg1)
1737 self.pg0.add_stream(pkts)
1738 self.pg_enable_capture(self.pg_interfaces)
1740 capture = self.pg1.get_capture(len(pkts))
1741 self.verify_capture_out(capture)
1744 pkts = self.create_stream_out(self.pg1)
1745 self.pg1.add_stream(pkts)
1746 self.pg_enable_capture(self.pg_interfaces)
1748 capture = self.pg0.get_capture(len(pkts))
1749 self.verify_capture_in(capture, self.pg0)
1751 def test_static_with_port_out(self):
1752 """ 1:1 NAPT initialized from outside network """
1754 self.tcp_port_out = 30606
1755 self.udp_port_out = 30607
1756 self.icmp_id_out = 30608
1758 self.nat44_add_address(self.nat_addr)
1759 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1760 self.tcp_port_in, self.tcp_port_out,
1761 proto=IP_PROTOS.tcp)
1762 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1763 self.udp_port_in, self.udp_port_out,
1764 proto=IP_PROTOS.udp)
1765 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1766 self.icmp_id_in, self.icmp_id_out,
1767 proto=IP_PROTOS.icmp)
1768 flags = self.config_flags.NAT_IS_INSIDE
1769 self.vapi.nat44_interface_add_del_feature(
1770 sw_if_index=self.pg0.sw_if_index,
1771 flags=flags, is_add=1)
1772 self.vapi.nat44_interface_add_del_feature(
1773 sw_if_index=self.pg1.sw_if_index,
1777 pkts = self.create_stream_out(self.pg1)
1778 self.pg1.add_stream(pkts)
1779 self.pg_enable_capture(self.pg_interfaces)
1781 capture = self.pg0.get_capture(len(pkts))
1782 self.verify_capture_in(capture, self.pg0)
1785 pkts = self.create_stream_in(self.pg0, self.pg1)
1786 self.pg0.add_stream(pkts)
1787 self.pg_enable_capture(self.pg_interfaces)
1789 capture = self.pg1.get_capture(len(pkts))
1790 self.verify_capture_out(capture)
1792 def test_static_vrf_aware(self):
1793 """ 1:1 NAT VRF awareness """
1795 nat_ip1 = "10.0.0.30"
1796 nat_ip2 = "10.0.0.40"
1797 self.tcp_port_out = 6303
1798 self.udp_port_out = 6304
1799 self.icmp_id_out = 6305
1801 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1803 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1805 flags = self.config_flags.NAT_IS_INSIDE
1806 self.vapi.nat44_interface_add_del_feature(
1807 sw_if_index=self.pg3.sw_if_index,
1809 self.vapi.nat44_interface_add_del_feature(
1810 sw_if_index=self.pg0.sw_if_index,
1811 flags=flags, is_add=1)
1812 self.vapi.nat44_interface_add_del_feature(
1813 sw_if_index=self.pg4.sw_if_index,
1814 flags=flags, is_add=1)
1816 # inside interface VRF match NAT44 static mapping VRF
1817 pkts = self.create_stream_in(self.pg4, self.pg3)
1818 self.pg4.add_stream(pkts)
1819 self.pg_enable_capture(self.pg_interfaces)
1821 capture = self.pg3.get_capture(len(pkts))
1822 self.verify_capture_out(capture, nat_ip1, True)
1824 # inside interface VRF don't match NAT44 static mapping VRF (packets
1826 pkts = self.create_stream_in(self.pg0, self.pg3)
1827 self.pg0.add_stream(pkts)
1828 self.pg_enable_capture(self.pg_interfaces)
1830 self.pg3.assert_nothing_captured()
1832 def test_dynamic_to_static(self):
1833 """ Switch from dynamic translation to 1:1NAT """
1834 nat_ip = "10.0.0.10"
1835 self.tcp_port_out = 6303
1836 self.udp_port_out = 6304
1837 self.icmp_id_out = 6305
1839 self.nat44_add_address(self.nat_addr)
1840 flags = self.config_flags.NAT_IS_INSIDE
1841 self.vapi.nat44_interface_add_del_feature(
1842 sw_if_index=self.pg0.sw_if_index,
1843 flags=flags, is_add=1)
1844 self.vapi.nat44_interface_add_del_feature(
1845 sw_if_index=self.pg1.sw_if_index,
1849 pkts = self.create_stream_in(self.pg0, self.pg1)
1850 self.pg0.add_stream(pkts)
1851 self.pg_enable_capture(self.pg_interfaces)
1853 capture = self.pg1.get_capture(len(pkts))
1854 self.verify_capture_out(capture)
1857 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1858 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
1859 self.assertEqual(len(sessions), 0)
1860 pkts = self.create_stream_in(self.pg0, self.pg1)
1861 self.pg0.add_stream(pkts)
1862 self.pg_enable_capture(self.pg_interfaces)
1864 capture = self.pg1.get_capture(len(pkts))
1865 self.verify_capture_out(capture, nat_ip, True)
1867 def test_identity_nat(self):
1868 """ Identity NAT """
1869 flags = self.config_flags.NAT_IS_ADDR_ONLY
1870 self.vapi.nat44_add_del_identity_mapping(
1871 ip_address=self.pg0.remote_ip4, sw_if_index=0xFFFFFFFF,
1872 flags=flags, is_add=1)
1873 flags = self.config_flags.NAT_IS_INSIDE
1874 self.vapi.nat44_interface_add_del_feature(
1875 sw_if_index=self.pg0.sw_if_index,
1876 flags=flags, is_add=1)
1877 self.vapi.nat44_interface_add_del_feature(
1878 sw_if_index=self.pg1.sw_if_index,
1881 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1882 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1883 TCP(sport=12345, dport=56789))
1884 self.pg1.add_stream(p)
1885 self.pg_enable_capture(self.pg_interfaces)
1887 capture = self.pg0.get_capture(1)
1892 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1893 self.assertEqual(ip.src, self.pg1.remote_ip4)
1894 self.assertEqual(tcp.dport, 56789)
1895 self.assertEqual(tcp.sport, 12345)
1896 self.assert_packet_checksums_valid(p)
1898 self.logger.error(ppp("Unexpected or invalid packet:", p))
1901 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
1902 self.assertEqual(len(sessions), 0)
1903 flags = self.config_flags.NAT_IS_ADDR_ONLY
1904 self.vapi.nat44_add_del_identity_mapping(
1905 ip_address=self.pg0.remote_ip4, sw_if_index=0xFFFFFFFF,
1906 flags=flags, vrf_id=1, is_add=1)
1907 identity_mappings = self.vapi.nat44_identity_mapping_dump()
1908 self.assertEqual(len(identity_mappings), 2)
1910 def test_multiple_inside_interfaces(self):
1911 """ NAT44 multiple non-overlapping address space inside interfaces """
1913 self.nat44_add_address(self.nat_addr)
1914 flags = self.config_flags.NAT_IS_INSIDE
1915 self.vapi.nat44_interface_add_del_feature(
1916 sw_if_index=self.pg0.sw_if_index,
1917 flags=flags, is_add=1)
1918 self.vapi.nat44_interface_add_del_feature(
1919 sw_if_index=self.pg1.sw_if_index,
1920 flags=flags, is_add=1)
1921 self.vapi.nat44_interface_add_del_feature(
1922 sw_if_index=self.pg3.sw_if_index,
1925 # between two NAT44 inside interfaces (no translation)
1926 pkts = self.create_stream_in(self.pg0, self.pg1)
1927 self.pg0.add_stream(pkts)
1928 self.pg_enable_capture(self.pg_interfaces)
1930 capture = self.pg1.get_capture(len(pkts))
1931 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1933 # from NAT44 inside to interface without NAT44 feature (no translation)
1934 pkts = self.create_stream_in(self.pg0, self.pg2)
1935 self.pg0.add_stream(pkts)
1936 self.pg_enable_capture(self.pg_interfaces)
1938 capture = self.pg2.get_capture(len(pkts))
1939 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1941 # in2out 1st interface
1942 pkts = self.create_stream_in(self.pg0, self.pg3)
1943 self.pg0.add_stream(pkts)
1944 self.pg_enable_capture(self.pg_interfaces)
1946 capture = self.pg3.get_capture(len(pkts))
1947 self.verify_capture_out(capture)
1949 # out2in 1st interface
1950 pkts = self.create_stream_out(self.pg3)
1951 self.pg3.add_stream(pkts)
1952 self.pg_enable_capture(self.pg_interfaces)
1954 capture = self.pg0.get_capture(len(pkts))
1955 self.verify_capture_in(capture, self.pg0)
1957 # in2out 2nd interface
1958 pkts = self.create_stream_in(self.pg1, self.pg3)
1959 self.pg1.add_stream(pkts)
1960 self.pg_enable_capture(self.pg_interfaces)
1962 capture = self.pg3.get_capture(len(pkts))
1963 self.verify_capture_out(capture)
1965 # out2in 2nd interface
1966 pkts = self.create_stream_out(self.pg3)
1967 self.pg3.add_stream(pkts)
1968 self.pg_enable_capture(self.pg_interfaces)
1970 capture = self.pg1.get_capture(len(pkts))
1971 self.verify_capture_in(capture, self.pg1)
1973 def test_inside_overlapping_interfaces(self):
1974 """ NAT44 multiple inside interfaces with overlapping address space """
1976 static_nat_ip = "10.0.0.10"
1977 self.nat44_add_address(self.nat_addr)
1978 flags = self.config_flags.NAT_IS_INSIDE
1979 self.vapi.nat44_interface_add_del_feature(
1980 sw_if_index=self.pg3.sw_if_index,
1982 self.vapi.nat44_interface_add_del_feature(
1983 sw_if_index=self.pg4.sw_if_index,
1984 flags=flags, is_add=1)
1985 self.vapi.nat44_interface_add_del_feature(
1986 sw_if_index=self.pg5.sw_if_index,
1987 flags=flags, is_add=1)
1988 self.vapi.nat44_interface_add_del_feature(
1989 sw_if_index=self.pg6.sw_if_index,
1990 flags=flags, is_add=1)
1991 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1994 # between NAT44 inside interfaces with same VRF (no translation)
1995 pkts = self.create_stream_in(self.pg4, self.pg5)
1996 self.pg4.add_stream(pkts)
1997 self.pg_enable_capture(self.pg_interfaces)
1999 capture = self.pg5.get_capture(len(pkts))
2000 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
2002 # between NAT44 inside interfaces with different VRF (hairpinning)
2003 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
2004 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
2005 TCP(sport=1234, dport=5678))
2006 self.pg4.add_stream(p)
2007 self.pg_enable_capture(self.pg_interfaces)
2009 capture = self.pg6.get_capture(1)
2014 self.assertEqual(ip.src, self.nat_addr)
2015 self.assertEqual(ip.dst, self.pg6.remote_ip4)
2016 self.assertNotEqual(tcp.sport, 1234)
2017 self.assertEqual(tcp.dport, 5678)
2019 self.logger.error(ppp("Unexpected or invalid packet:", p))
2022 # in2out 1st interface
2023 pkts = self.create_stream_in(self.pg4, self.pg3)
2024 self.pg4.add_stream(pkts)
2025 self.pg_enable_capture(self.pg_interfaces)
2027 capture = self.pg3.get_capture(len(pkts))
2028 self.verify_capture_out(capture)
2030 # out2in 1st interface
2031 pkts = self.create_stream_out(self.pg3)
2032 self.pg3.add_stream(pkts)
2033 self.pg_enable_capture(self.pg_interfaces)
2035 capture = self.pg4.get_capture(len(pkts))
2036 self.verify_capture_in(capture, self.pg4)
2038 # in2out 2nd interface
2039 pkts = self.create_stream_in(self.pg5, self.pg3)
2040 self.pg5.add_stream(pkts)
2041 self.pg_enable_capture(self.pg_interfaces)
2043 capture = self.pg3.get_capture(len(pkts))
2044 self.verify_capture_out(capture)
2046 # out2in 2nd interface
2047 pkts = self.create_stream_out(self.pg3)
2048 self.pg3.add_stream(pkts)
2049 self.pg_enable_capture(self.pg_interfaces)
2051 capture = self.pg5.get_capture(len(pkts))
2052 self.verify_capture_in(capture, self.pg5)
2055 addresses = self.vapi.nat44_address_dump()
2056 self.assertEqual(len(addresses), 1)
2057 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4, 10)
2058 self.assertEqual(len(sessions), 3)
2059 for session in sessions:
2060 self.assertFalse(session.flags & self.config_flags.NAT_IS_STATIC)
2061 self.assertEqual(str(session.inside_ip_address),
2062 self.pg5.remote_ip4)
2063 self.assertEqual(session.outside_ip_address,
2064 addresses[0].ip_address)
2065 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
2066 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
2067 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
2068 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
2069 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
2070 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
2071 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
2072 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
2073 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
2075 # in2out 3rd interface
2076 pkts = self.create_stream_in(self.pg6, self.pg3)
2077 self.pg6.add_stream(pkts)
2078 self.pg_enable_capture(self.pg_interfaces)
2080 capture = self.pg3.get_capture(len(pkts))
2081 self.verify_capture_out(capture, static_nat_ip, True)
2083 # out2in 3rd interface
2084 pkts = self.create_stream_out(self.pg3, static_nat_ip)
2085 self.pg3.add_stream(pkts)
2086 self.pg_enable_capture(self.pg_interfaces)
2088 capture = self.pg6.get_capture(len(pkts))
2089 self.verify_capture_in(capture, self.pg6)
2091 # general user and session dump verifications
2092 users = self.vapi.nat44_user_dump()
2093 self.assertGreaterEqual(len(users), 3)
2094 addresses = self.vapi.nat44_address_dump()
2095 self.assertEqual(len(addresses), 1)
2097 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
2099 for session in sessions:
2100 self.assertEqual(user.ip_address, session.inside_ip_address)
2101 self.assertTrue(session.total_bytes > session.total_pkts > 0)
2102 self.assertTrue(session.protocol in
2103 [IP_PROTOS.tcp, IP_PROTOS.udp,
2105 self.assertFalse(session.flags &
2106 self.config_flags.NAT_IS_EXT_HOST_VALID)
2109 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4, 10)
2110 self.assertGreaterEqual(len(sessions), 4)
2111 for session in sessions:
2112 self.assertFalse(session.flags & self.config_flags.NAT_IS_STATIC)
2113 self.assertEqual(str(session.inside_ip_address),
2114 self.pg4.remote_ip4)
2115 self.assertEqual(session.outside_ip_address,
2116 addresses[0].ip_address)
2119 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4, 20)
2120 self.assertGreaterEqual(len(sessions), 3)
2121 for session in sessions:
2122 self.assertTrue(session.flags & self.config_flags.NAT_IS_STATIC)
2123 self.assertEqual(str(session.inside_ip_address),
2124 self.pg6.remote_ip4)
2125 self.assertEqual(str(session.outside_ip_address),
2127 self.assertTrue(session.inside_port in
2128 [self.tcp_port_in, self.udp_port_in,
2131 def test_hairpinning(self):
2132 """ NAT44 hairpinning - 1:1 NAPT """
2134 host = self.pg0.remote_hosts[0]
2135 server = self.pg0.remote_hosts[1]
2138 server_in_port = 5678
2139 server_out_port = 8765
2141 self.nat44_add_address(self.nat_addr)
2142 flags = self.config_flags.NAT_IS_INSIDE
2143 self.vapi.nat44_interface_add_del_feature(
2144 sw_if_index=self.pg0.sw_if_index,
2145 flags=flags, is_add=1)
2146 self.vapi.nat44_interface_add_del_feature(
2147 sw_if_index=self.pg1.sw_if_index,
2150 # add static mapping for server
2151 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2152 server_in_port, server_out_port,
2153 proto=IP_PROTOS.tcp)
2155 cnt = self.statistics.get_counter('/nat44/hairpinning')[0]
2156 # send packet from host to server
2157 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2158 IP(src=host.ip4, dst=self.nat_addr) /
2159 TCP(sport=host_in_port, dport=server_out_port))
2160 self.pg0.add_stream(p)
2161 self.pg_enable_capture(self.pg_interfaces)
2163 capture = self.pg0.get_capture(1)
2168 self.assertEqual(ip.src, self.nat_addr)
2169 self.assertEqual(ip.dst, server.ip4)
2170 self.assertNotEqual(tcp.sport, host_in_port)
2171 self.assertEqual(tcp.dport, server_in_port)
2172 self.assert_packet_checksums_valid(p)
2173 host_out_port = tcp.sport
2175 self.logger.error(ppp("Unexpected or invalid packet:", p))
2178 after = self.statistics.get_counter('/nat44/hairpinning')[0]
2179 if_idx = self.pg0.sw_if_index
2180 self.assertEqual(after[if_idx] - cnt[if_idx], 1)
2182 # send reply from server to host
2183 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2184 IP(src=server.ip4, dst=self.nat_addr) /
2185 TCP(sport=server_in_port, dport=host_out_port))
2186 self.pg0.add_stream(p)
2187 self.pg_enable_capture(self.pg_interfaces)
2189 capture = self.pg0.get_capture(1)
2194 self.assertEqual(ip.src, self.nat_addr)
2195 self.assertEqual(ip.dst, host.ip4)
2196 self.assertEqual(tcp.sport, server_out_port)
2197 self.assertEqual(tcp.dport, host_in_port)
2198 self.assert_packet_checksums_valid(p)
2200 self.logger.error(ppp("Unexpected or invalid packet:", p))
2203 after = self.statistics.get_counter('/nat44/hairpinning')[0]
2204 if_idx = self.pg0.sw_if_index
2205 self.assertEqual(after[if_idx] - cnt[if_idx], 2)
2207 def test_hairpinning2(self):
2208 """ NAT44 hairpinning - 1:1 NAT"""
2210 server1_nat_ip = "10.0.0.10"
2211 server2_nat_ip = "10.0.0.11"
2212 host = self.pg0.remote_hosts[0]
2213 server1 = self.pg0.remote_hosts[1]
2214 server2 = self.pg0.remote_hosts[2]
2215 server_tcp_port = 22
2216 server_udp_port = 20
2218 self.nat44_add_address(self.nat_addr)
2219 flags = self.config_flags.NAT_IS_INSIDE
2220 self.vapi.nat44_interface_add_del_feature(
2221 sw_if_index=self.pg0.sw_if_index,
2222 flags=flags, is_add=1)
2223 self.vapi.nat44_interface_add_del_feature(
2224 sw_if_index=self.pg1.sw_if_index,
2227 # add static mapping for servers
2228 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2229 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2233 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2234 IP(src=host.ip4, dst=server1_nat_ip) /
2235 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2237 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2238 IP(src=host.ip4, dst=server1_nat_ip) /
2239 UDP(sport=self.udp_port_in, dport=server_udp_port))
2241 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2242 IP(src=host.ip4, dst=server1_nat_ip) /
2243 ICMP(id=self.icmp_id_in, type='echo-request'))
2245 self.pg0.add_stream(pkts)
2246 self.pg_enable_capture(self.pg_interfaces)
2248 capture = self.pg0.get_capture(len(pkts))
2249 for packet in capture:
2251 self.assertEqual(packet[IP].src, self.nat_addr)
2252 self.assertEqual(packet[IP].dst, server1.ip4)
2253 if packet.haslayer(TCP):
2254 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2255 self.assertEqual(packet[TCP].dport, server_tcp_port)
2256 self.tcp_port_out = packet[TCP].sport
2257 self.assert_packet_checksums_valid(packet)
2258 elif packet.haslayer(UDP):
2259 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2260 self.assertEqual(packet[UDP].dport, server_udp_port)
2261 self.udp_port_out = packet[UDP].sport
2263 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2264 self.icmp_id_out = packet[ICMP].id
2266 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2271 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2272 IP(src=server1.ip4, dst=self.nat_addr) /
2273 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2275 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2276 IP(src=server1.ip4, dst=self.nat_addr) /
2277 UDP(sport=server_udp_port, dport=self.udp_port_out))
2279 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2280 IP(src=server1.ip4, dst=self.nat_addr) /
2281 ICMP(id=self.icmp_id_out, type='echo-reply'))
2283 self.pg0.add_stream(pkts)
2284 self.pg_enable_capture(self.pg_interfaces)
2286 capture = self.pg0.get_capture(len(pkts))
2287 for packet in capture:
2289 self.assertEqual(packet[IP].src, server1_nat_ip)
2290 self.assertEqual(packet[IP].dst, host.ip4)
2291 if packet.haslayer(TCP):
2292 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2293 self.assertEqual(packet[TCP].sport, server_tcp_port)
2294 self.assert_packet_checksums_valid(packet)
2295 elif packet.haslayer(UDP):
2296 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2297 self.assertEqual(packet[UDP].sport, server_udp_port)
2299 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2301 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2304 # server2 to server1
2306 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2307 IP(src=server2.ip4, dst=server1_nat_ip) /
2308 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2310 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2311 IP(src=server2.ip4, dst=server1_nat_ip) /
2312 UDP(sport=self.udp_port_in, dport=server_udp_port))
2314 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2315 IP(src=server2.ip4, dst=server1_nat_ip) /
2316 ICMP(id=self.icmp_id_in, type='echo-request'))
2318 self.pg0.add_stream(pkts)
2319 self.pg_enable_capture(self.pg_interfaces)
2321 capture = self.pg0.get_capture(len(pkts))
2322 for packet in capture:
2324 self.assertEqual(packet[IP].src, server2_nat_ip)
2325 self.assertEqual(packet[IP].dst, server1.ip4)
2326 if packet.haslayer(TCP):
2327 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2328 self.assertEqual(packet[TCP].dport, server_tcp_port)
2329 self.tcp_port_out = packet[TCP].sport
2330 self.assert_packet_checksums_valid(packet)
2331 elif packet.haslayer(UDP):
2332 self.assertEqual(packet[UDP].sport, self.udp_port_in)
2333 self.assertEqual(packet[UDP].dport, server_udp_port)
2334 self.udp_port_out = packet[UDP].sport
2336 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2337 self.icmp_id_out = packet[ICMP].id
2339 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2342 # server1 to server2
2344 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2345 IP(src=server1.ip4, dst=server2_nat_ip) /
2346 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2348 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2349 IP(src=server1.ip4, dst=server2_nat_ip) /
2350 UDP(sport=server_udp_port, dport=self.udp_port_out))
2352 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2353 IP(src=server1.ip4, dst=server2_nat_ip) /
2354 ICMP(id=self.icmp_id_out, type='echo-reply'))
2356 self.pg0.add_stream(pkts)
2357 self.pg_enable_capture(self.pg_interfaces)
2359 capture = self.pg0.get_capture(len(pkts))
2360 for packet in capture:
2362 self.assertEqual(packet[IP].src, server1_nat_ip)
2363 self.assertEqual(packet[IP].dst, server2.ip4)
2364 if packet.haslayer(TCP):
2365 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2366 self.assertEqual(packet[TCP].sport, server_tcp_port)
2367 self.assert_packet_checksums_valid(packet)
2368 elif packet.haslayer(UDP):
2369 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2370 self.assertEqual(packet[UDP].sport, server_udp_port)
2372 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2374 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2377 def test_interface_addr(self):
2378 """ Acquire NAT44 addresses from interface """
2379 self.vapi.nat44_add_del_interface_addr(
2381 sw_if_index=self.pg7.sw_if_index)
2383 # no address in NAT pool
2384 addresses = self.vapi.nat44_address_dump()
2385 self.assertEqual(0, len(addresses))
2387 # configure interface address and check NAT address pool
2388 self.pg7.config_ip4()
2389 addresses = self.vapi.nat44_address_dump()
2390 self.assertEqual(1, len(addresses))
2391 self.assertEqual(str(addresses[0].ip_address), self.pg7.local_ip4)
2393 # remove interface address and check NAT address pool
2394 self.pg7.unconfig_ip4()
2395 addresses = self.vapi.nat44_address_dump()
2396 self.assertEqual(0, len(addresses))
2398 def test_interface_addr_static_mapping(self):
2399 """ Static mapping with addresses from interface """
2402 self.vapi.nat44_add_del_interface_addr(
2404 sw_if_index=self.pg7.sw_if_index)
2405 self.nat44_add_static_mapping(
2407 external_sw_if_index=self.pg7.sw_if_index,
2410 # static mappings with external interface
2411 static_mappings = self.vapi.nat44_static_mapping_dump()
2412 self.assertEqual(1, len(static_mappings))
2413 self.assertEqual(self.pg7.sw_if_index,
2414 static_mappings[0].external_sw_if_index)
2415 self.assertEqual(static_mappings[0].tag, tag)
2417 # configure interface address and check static mappings
2418 self.pg7.config_ip4()
2419 static_mappings = self.vapi.nat44_static_mapping_dump()
2420 self.assertEqual(2, len(static_mappings))
2422 for sm in static_mappings:
2423 if sm.external_sw_if_index == 0xFFFFFFFF:
2424 self.assertEqual(str(sm.external_ip_address),
2426 self.assertEqual(sm.tag, tag)
2428 self.assertTrue(resolved)
2430 # remove interface address and check static mappings
2431 self.pg7.unconfig_ip4()
2432 static_mappings = self.vapi.nat44_static_mapping_dump()
2433 self.assertEqual(1, len(static_mappings))
2434 self.assertEqual(self.pg7.sw_if_index,
2435 static_mappings[0].external_sw_if_index)
2436 self.assertEqual(static_mappings[0].tag, tag)
2438 # configure interface address again and check static mappings
2439 self.pg7.config_ip4()
2440 static_mappings = self.vapi.nat44_static_mapping_dump()
2441 self.assertEqual(2, len(static_mappings))
2443 for sm in static_mappings:
2444 if sm.external_sw_if_index == 0xFFFFFFFF:
2445 self.assertEqual(str(sm.external_ip_address),
2447 self.assertEqual(sm.tag, tag)
2449 self.assertTrue(resolved)
2451 # remove static mapping
2452 self.nat44_add_static_mapping(
2454 external_sw_if_index=self.pg7.sw_if_index,
2457 static_mappings = self.vapi.nat44_static_mapping_dump()
2458 self.assertEqual(0, len(static_mappings))
2460 def test_interface_addr_identity_nat(self):
2461 """ Identity NAT with addresses from interface """
2464 self.vapi.nat44_add_del_interface_addr(
2466 sw_if_index=self.pg7.sw_if_index)
2467 self.vapi.nat44_add_del_identity_mapping(
2469 sw_if_index=self.pg7.sw_if_index,
2471 protocol=IP_PROTOS.tcp,
2474 # identity mappings with external interface
2475 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2476 self.assertEqual(1, len(identity_mappings))
2477 self.assertEqual(self.pg7.sw_if_index,
2478 identity_mappings[0].sw_if_index)
2480 # configure interface address and check identity mappings
2481 self.pg7.config_ip4()
2482 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2484 self.assertEqual(2, len(identity_mappings))
2485 for sm in identity_mappings:
2486 if sm.sw_if_index == 0xFFFFFFFF:
2487 self.assertEqual(str(identity_mappings[0].ip_address),
2489 self.assertEqual(port, identity_mappings[0].port)
2490 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2492 self.assertTrue(resolved)
2494 # remove interface address and check identity mappings
2495 self.pg7.unconfig_ip4()
2496 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2497 self.assertEqual(1, len(identity_mappings))
2498 self.assertEqual(self.pg7.sw_if_index,
2499 identity_mappings[0].sw_if_index)
2501 def test_ipfix_nat44_sess(self):
2502 """ IPFIX logging NAT44 session created/deleted """
2503 self.ipfix_domain_id = 10
2504 self.ipfix_src_port = 20202
2505 collector_port = 30303
2506 bind_layers(UDP, IPFIX, dport=30303)
2507 self.nat44_add_address(self.nat_addr)
2508 flags = self.config_flags.NAT_IS_INSIDE
2509 self.vapi.nat44_interface_add_del_feature(
2510 sw_if_index=self.pg0.sw_if_index,
2511 flags=flags, is_add=1)
2512 self.vapi.nat44_interface_add_del_feature(
2513 sw_if_index=self.pg1.sw_if_index,
2515 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
2516 src_address=self.pg3.local_ip4,
2518 template_interval=10,
2519 collector_port=collector_port)
2520 self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
2521 src_port=self.ipfix_src_port,
2524 pkts = self.create_stream_in(self.pg0, self.pg1)
2525 self.pg0.add_stream(pkts)
2526 self.pg_enable_capture(self.pg_interfaces)
2528 capture = self.pg1.get_capture(len(pkts))
2529 self.verify_capture_out(capture)
2530 self.nat44_add_address(self.nat_addr, is_add=0)
2531 self.vapi.ipfix_flush()
2532 capture = self.pg3.get_capture(7)
2533 ipfix = IPFIXDecoder()
2534 # first load template
2536 self.assertTrue(p.haslayer(IPFIX))
2537 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2538 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2539 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2540 self.assertEqual(p[UDP].dport, collector_port)
2541 self.assertEqual(p[IPFIX].observationDomainID,
2542 self.ipfix_domain_id)
2543 if p.haslayer(Template):
2544 ipfix.add_template(p.getlayer(Template))
2545 # verify events in data set
2547 if p.haslayer(Data):
2548 data = ipfix.decode_data_set(p.getlayer(Set))
2549 self.verify_ipfix_nat44_ses(data)
2551 def test_ipfix_addr_exhausted(self):
2552 """ IPFIX logging NAT addresses exhausted """
2553 flags = self.config_flags.NAT_IS_INSIDE
2554 self.vapi.nat44_interface_add_del_feature(
2555 sw_if_index=self.pg0.sw_if_index,
2556 flags=flags, is_add=1)
2557 self.vapi.nat44_interface_add_del_feature(
2558 sw_if_index=self.pg1.sw_if_index,
2560 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
2561 src_address=self.pg3.local_ip4,
2563 template_interval=10)
2564 self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
2565 src_port=self.ipfix_src_port,
2568 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2569 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2571 self.pg0.add_stream(p)
2572 self.pg_enable_capture(self.pg_interfaces)
2574 self.pg1.assert_nothing_captured()
2576 self.vapi.ipfix_flush()
2577 capture = self.pg3.get_capture(7)
2578 ipfix = IPFIXDecoder()
2579 # first load template
2581 self.assertTrue(p.haslayer(IPFIX))
2582 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2583 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2584 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2585 self.assertEqual(p[UDP].dport, 4739)
2586 self.assertEqual(p[IPFIX].observationDomainID,
2587 self.ipfix_domain_id)
2588 if p.haslayer(Template):
2589 ipfix.add_template(p.getlayer(Template))
2590 # verify events in data set
2592 if p.haslayer(Data):
2593 data = ipfix.decode_data_set(p.getlayer(Set))
2594 self.verify_ipfix_addr_exhausted(data)
2596 def test_ipfix_max_sessions(self):
2597 """ IPFIX logging maximum session entries exceeded """
2598 self.nat44_add_address(self.nat_addr)
2599 flags = self.config_flags.NAT_IS_INSIDE
2600 self.vapi.nat44_interface_add_del_feature(
2601 sw_if_index=self.pg0.sw_if_index,
2602 flags=flags, is_add=1)
2603 self.vapi.nat44_interface_add_del_feature(
2604 sw_if_index=self.pg1.sw_if_index,
2607 max_sessions = self.max_translations
2610 for i in range(0, max_sessions):
2611 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2612 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2613 IP(src=src, dst=self.pg1.remote_ip4) /
2616 self.pg0.add_stream(pkts)
2617 self.pg_enable_capture(self.pg_interfaces)
2620 self.pg1.get_capture(max_sessions)
2621 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
2622 src_address=self.pg3.local_ip4,
2624 template_interval=10)
2625 self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
2626 src_port=self.ipfix_src_port,
2629 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2630 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2632 self.pg0.add_stream(p)
2633 self.pg_enable_capture(self.pg_interfaces)
2635 self.pg1.assert_nothing_captured()
2637 self.vapi.ipfix_flush()
2638 capture = self.pg3.get_capture(7)
2639 ipfix = IPFIXDecoder()
2640 # first load template
2642 self.assertTrue(p.haslayer(IPFIX))
2643 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2644 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2645 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2646 self.assertEqual(p[UDP].dport, 4739)
2647 self.assertEqual(p[IPFIX].observationDomainID,
2648 self.ipfix_domain_id)
2649 if p.haslayer(Template):
2650 ipfix.add_template(p.getlayer(Template))
2651 # verify events in data set
2653 if p.haslayer(Data):
2654 data = ipfix.decode_data_set(p.getlayer(Set))
2655 self.verify_ipfix_max_sessions(data, max_sessions)
2657 def test_syslog_apmap(self):
2658 """ Test syslog address and port mapping creation and deletion """
2659 self.vapi.syslog_set_filter(
2660 self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
2661 self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
2662 self.nat44_add_address(self.nat_addr)
2663 flags = self.config_flags.NAT_IS_INSIDE
2664 self.vapi.nat44_interface_add_del_feature(
2665 sw_if_index=self.pg0.sw_if_index,
2666 flags=flags, is_add=1)
2667 self.vapi.nat44_interface_add_del_feature(
2668 sw_if_index=self.pg1.sw_if_index,
2671 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2672 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2673 TCP(sport=self.tcp_port_in, dport=20))
2674 self.pg0.add_stream(p)
2675 self.pg_enable_capture(self.pg_interfaces)
2677 capture = self.pg1.get_capture(1)
2678 self.tcp_port_out = capture[0][TCP].sport
2679 capture = self.pg3.get_capture(1)
2680 self.verify_syslog_apmap(capture[0][Raw].load)
2682 self.pg_enable_capture(self.pg_interfaces)
2684 self.nat44_add_address(self.nat_addr, is_add=0)
2685 capture = self.pg3.get_capture(1)
2686 self.verify_syslog_apmap(capture[0][Raw].load, False)
2688 def test_pool_addr_fib(self):
2689 """ NAT44 add pool addresses to FIB """
2690 static_addr = '10.0.0.10'
2691 self.nat44_add_address(self.nat_addr)
2692 flags = self.config_flags.NAT_IS_INSIDE
2693 self.vapi.nat44_interface_add_del_feature(
2694 sw_if_index=self.pg0.sw_if_index,
2695 flags=flags, is_add=1)
2696 self.vapi.nat44_interface_add_del_feature(
2697 sw_if_index=self.pg1.sw_if_index,
2699 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2702 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2703 ARP(op=ARP.who_has, pdst=self.nat_addr,
2704 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2705 self.pg1.add_stream(p)
2706 self.pg_enable_capture(self.pg_interfaces)
2708 capture = self.pg1.get_capture(1)
2709 self.assertTrue(capture[0].haslayer(ARP))
2710 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2713 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2714 ARP(op=ARP.who_has, pdst=static_addr,
2715 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2716 self.pg1.add_stream(p)
2717 self.pg_enable_capture(self.pg_interfaces)
2719 capture = self.pg1.get_capture(1)
2720 self.assertTrue(capture[0].haslayer(ARP))
2721 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2723 # send ARP to non-NAT44 interface
2724 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2725 ARP(op=ARP.who_has, pdst=self.nat_addr,
2726 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2727 self.pg2.add_stream(p)
2728 self.pg_enable_capture(self.pg_interfaces)
2730 self.pg1.assert_nothing_captured()
2732 # remove addresses and verify
2733 self.nat44_add_address(self.nat_addr, is_add=0)
2734 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2737 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2738 ARP(op=ARP.who_has, pdst=self.nat_addr,
2739 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2740 self.pg1.add_stream(p)
2741 self.pg_enable_capture(self.pg_interfaces)
2743 self.pg1.assert_nothing_captured()
2745 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2746 ARP(op=ARP.who_has, pdst=static_addr,
2747 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2748 self.pg1.add_stream(p)
2749 self.pg_enable_capture(self.pg_interfaces)
2751 self.pg1.assert_nothing_captured()
2753 def test_vrf_mode(self):
2754 """ NAT44 tenant VRF aware address pool mode """
2758 nat_ip1 = "10.0.0.10"
2759 nat_ip2 = "10.0.0.11"
2761 self.pg0.unconfig_ip4()
2762 self.pg1.unconfig_ip4()
2763 self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id1})
2764 self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id2})
2765 self.pg0.set_table_ip4(vrf_id1)
2766 self.pg1.set_table_ip4(vrf_id2)
2767 self.pg0.config_ip4()
2768 self.pg1.config_ip4()
2769 self.pg0.resolve_arp()
2770 self.pg1.resolve_arp()
2772 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2773 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2774 flags = self.config_flags.NAT_IS_INSIDE
2775 self.vapi.nat44_interface_add_del_feature(
2776 sw_if_index=self.pg0.sw_if_index,
2777 flags=flags, is_add=1)
2778 self.vapi.nat44_interface_add_del_feature(
2779 sw_if_index=self.pg1.sw_if_index,
2780 flags=flags, is_add=1)
2781 self.vapi.nat44_interface_add_del_feature(
2782 sw_if_index=self.pg2.sw_if_index,
2787 pkts = self.create_stream_in(self.pg0, self.pg2)
2788 self.pg0.add_stream(pkts)
2789 self.pg_enable_capture(self.pg_interfaces)
2791 capture = self.pg2.get_capture(len(pkts))
2792 self.verify_capture_out(capture, nat_ip1)
2795 pkts = self.create_stream_in(self.pg1, self.pg2)
2796 self.pg1.add_stream(pkts)
2797 self.pg_enable_capture(self.pg_interfaces)
2799 capture = self.pg2.get_capture(len(pkts))
2800 self.verify_capture_out(capture, nat_ip2)
2803 self.pg0.unconfig_ip4()
2804 self.pg1.unconfig_ip4()
2805 self.pg0.set_table_ip4(0)
2806 self.pg1.set_table_ip4(0)
2807 self.pg0.config_ip4()
2808 self.pg1.config_ip4()
2809 self.pg0.resolve_arp()
2810 self.pg1.resolve_arp()
2811 self.vapi.ip_table_add_del(is_add=0, table={'table_id': vrf_id1})
2812 self.vapi.ip_table_add_del(is_add=0, table={'table_id': vrf_id2})
2814 def test_vrf_feature_independent(self):
2815 """ NAT44 tenant VRF independent address pool mode """
2817 nat_ip1 = "10.0.0.10"
2818 nat_ip2 = "10.0.0.11"
2820 self.nat44_add_address(nat_ip1)
2821 self.nat44_add_address(nat_ip2, vrf_id=99)
2822 flags = self.config_flags.NAT_IS_INSIDE
2823 self.vapi.nat44_interface_add_del_feature(
2824 sw_if_index=self.pg0.sw_if_index,
2825 flags=flags, is_add=1)
2826 self.vapi.nat44_interface_add_del_feature(
2827 sw_if_index=self.pg1.sw_if_index,
2828 flags=flags, is_add=1)
2829 self.vapi.nat44_interface_add_del_feature(
2830 sw_if_index=self.pg2.sw_if_index,
2834 pkts = self.create_stream_in(self.pg0, self.pg2)
2835 self.pg0.add_stream(pkts)
2836 self.pg_enable_capture(self.pg_interfaces)
2838 capture = self.pg2.get_capture(len(pkts))
2839 self.verify_capture_out(capture, nat_ip1)
2842 pkts = self.create_stream_in(self.pg1, self.pg2)
2843 self.pg1.add_stream(pkts)
2844 self.pg_enable_capture(self.pg_interfaces)
2846 capture = self.pg2.get_capture(len(pkts))
2847 self.verify_capture_out(capture, nat_ip1)
2849 def create_routes_and_neigbors(self):
2850 r1 = VppIpRoute(self, self.pg7.remote_ip4, 32,
2851 [VppRoutePath(self.pg7.remote_ip4,
2852 self.pg7.sw_if_index)])
2853 r2 = VppIpRoute(self, self.pg8.remote_ip4, 32,
2854 [VppRoutePath(self.pg8.remote_ip4,
2855 self.pg8.sw_if_index)])
2859 n1 = VppNeighbor(self,
2860 self.pg7.sw_if_index,
2861 self.pg7.remote_mac,
2862 self.pg7.remote_ip4,
2864 n2 = VppNeighbor(self,
2865 self.pg8.sw_if_index,
2866 self.pg8.remote_mac,
2867 self.pg8.remote_ip4,
2872 def test_dynamic_ipless_interfaces(self):
2873 """ NAT44 interfaces without configured IP address """
2874 self.create_routes_and_neigbors()
2875 self.nat44_add_address(self.nat_addr)
2876 flags = self.config_flags.NAT_IS_INSIDE
2877 self.vapi.nat44_interface_add_del_feature(
2878 sw_if_index=self.pg7.sw_if_index,
2879 flags=flags, is_add=1)
2880 self.vapi.nat44_interface_add_del_feature(
2881 sw_if_index=self.pg8.sw_if_index,
2885 pkts = self.create_stream_in(self.pg7, self.pg8)
2886 self.pg7.add_stream(pkts)
2887 self.pg_enable_capture(self.pg_interfaces)
2889 capture = self.pg8.get_capture(len(pkts))
2890 self.verify_capture_out(capture)
2893 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2894 self.pg8.add_stream(pkts)
2895 self.pg_enable_capture(self.pg_interfaces)
2897 capture = self.pg7.get_capture(len(pkts))
2898 self.verify_capture_in(capture, self.pg7)
2900 def test_static_ipless_interfaces(self):
2901 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2903 self.create_routes_and_neigbors()
2904 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2905 flags = self.config_flags.NAT_IS_INSIDE
2906 self.vapi.nat44_interface_add_del_feature(
2907 sw_if_index=self.pg7.sw_if_index,
2908 flags=flags, is_add=1)
2909 self.vapi.nat44_interface_add_del_feature(
2910 sw_if_index=self.pg8.sw_if_index,
2914 pkts = self.create_stream_out(self.pg8)
2915 self.pg8.add_stream(pkts)
2916 self.pg_enable_capture(self.pg_interfaces)
2918 capture = self.pg7.get_capture(len(pkts))
2919 self.verify_capture_in(capture, self.pg7)
2922 pkts = self.create_stream_in(self.pg7, self.pg8)
2923 self.pg7.add_stream(pkts)
2924 self.pg_enable_capture(self.pg_interfaces)
2926 capture = self.pg8.get_capture(len(pkts))
2927 self.verify_capture_out(capture, self.nat_addr, True)
2929 def test_static_with_port_ipless_interfaces(self):
2930 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2932 self.tcp_port_out = 30606
2933 self.udp_port_out = 30607
2934 self.icmp_id_out = 30608
2936 self.create_routes_and_neigbors()
2937 self.nat44_add_address(self.nat_addr)
2938 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2939 self.tcp_port_in, self.tcp_port_out,
2940 proto=IP_PROTOS.tcp)
2941 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2942 self.udp_port_in, self.udp_port_out,
2943 proto=IP_PROTOS.udp)
2944 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2945 self.icmp_id_in, self.icmp_id_out,
2946 proto=IP_PROTOS.icmp)
2947 flags = self.config_flags.NAT_IS_INSIDE
2948 self.vapi.nat44_interface_add_del_feature(
2949 sw_if_index=self.pg7.sw_if_index,
2950 flags=flags, is_add=1)
2951 self.vapi.nat44_interface_add_del_feature(
2952 sw_if_index=self.pg8.sw_if_index,
2956 pkts = self.create_stream_out(self.pg8)
2957 self.pg8.add_stream(pkts)
2958 self.pg_enable_capture(self.pg_interfaces)
2960 capture = self.pg7.get_capture(len(pkts))
2961 self.verify_capture_in(capture, self.pg7)
2964 pkts = self.create_stream_in(self.pg7, self.pg8)
2965 self.pg7.add_stream(pkts)
2966 self.pg_enable_capture(self.pg_interfaces)
2968 capture = self.pg8.get_capture(len(pkts))
2969 self.verify_capture_out(capture)
2971 def test_static_unknown_proto(self):
2972 """ 1:1 NAT translate packet with unknown protocol """
2973 nat_ip = "10.0.0.10"
2974 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2975 flags = self.config_flags.NAT_IS_INSIDE
2976 self.vapi.nat44_interface_add_del_feature(
2977 sw_if_index=self.pg0.sw_if_index,
2978 flags=flags, is_add=1)
2979 self.vapi.nat44_interface_add_del_feature(
2980 sw_if_index=self.pg1.sw_if_index,
2984 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2985 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2987 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2988 TCP(sport=1234, dport=1234))
2989 self.pg0.add_stream(p)
2990 self.pg_enable_capture(self.pg_interfaces)
2992 p = self.pg1.get_capture(1)
2995 self.assertEqual(packet[IP].src, nat_ip)
2996 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2997 self.assertEqual(packet.haslayer(GRE), 1)
2998 self.assert_packet_checksums_valid(packet)
3000 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3004 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3005 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3007 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3008 TCP(sport=1234, dport=1234))
3009 self.pg1.add_stream(p)
3010 self.pg_enable_capture(self.pg_interfaces)
3012 p = self.pg0.get_capture(1)
3015 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3016 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3017 self.assertEqual(packet.haslayer(GRE), 1)
3018 self.assert_packet_checksums_valid(packet)
3020 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3023 def test_hairpinning_static_unknown_proto(self):
3024 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
3026 host = self.pg0.remote_hosts[0]
3027 server = self.pg0.remote_hosts[1]
3029 host_nat_ip = "10.0.0.10"
3030 server_nat_ip = "10.0.0.11"
3032 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
3033 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3034 flags = self.config_flags.NAT_IS_INSIDE
3035 self.vapi.nat44_interface_add_del_feature(
3036 sw_if_index=self.pg0.sw_if_index,
3037 flags=flags, is_add=1)
3038 self.vapi.nat44_interface_add_del_feature(
3039 sw_if_index=self.pg1.sw_if_index,
3043 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3044 IP(src=host.ip4, dst=server_nat_ip) /
3046 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3047 TCP(sport=1234, dport=1234))
3048 self.pg0.add_stream(p)
3049 self.pg_enable_capture(self.pg_interfaces)
3051 p = self.pg0.get_capture(1)
3054 self.assertEqual(packet[IP].src, host_nat_ip)
3055 self.assertEqual(packet[IP].dst, server.ip4)
3056 self.assertEqual(packet.haslayer(GRE), 1)
3057 self.assert_packet_checksums_valid(packet)
3059 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3063 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3064 IP(src=server.ip4, dst=host_nat_ip) /
3066 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3067 TCP(sport=1234, dport=1234))
3068 self.pg0.add_stream(p)
3069 self.pg_enable_capture(self.pg_interfaces)
3071 p = self.pg0.get_capture(1)
3074 self.assertEqual(packet[IP].src, server_nat_ip)
3075 self.assertEqual(packet[IP].dst, host.ip4)
3076 self.assertEqual(packet.haslayer(GRE), 1)
3077 self.assert_packet_checksums_valid(packet)
3079 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3082 def test_output_feature(self):
3083 """ NAT44 interface output feature (in2out postrouting) """
3084 self.nat44_add_address(self.nat_addr)
3085 flags = self.config_flags.NAT_IS_INSIDE
3086 self.vapi.nat44_interface_add_del_output_feature(
3087 is_add=1, flags=flags,
3088 sw_if_index=self.pg0.sw_if_index)
3089 self.vapi.nat44_interface_add_del_output_feature(
3090 is_add=1, flags=flags,
3091 sw_if_index=self.pg1.sw_if_index)
3092 self.vapi.nat44_interface_add_del_output_feature(
3094 sw_if_index=self.pg3.sw_if_index)
3097 pkts = self.create_stream_in(self.pg0, self.pg3)
3098 self.pg0.add_stream(pkts)
3099 self.pg_enable_capture(self.pg_interfaces)
3101 capture = self.pg3.get_capture(len(pkts))
3102 self.verify_capture_out(capture)
3105 pkts = self.create_stream_out(self.pg3)
3106 self.pg3.add_stream(pkts)
3107 self.pg_enable_capture(self.pg_interfaces)
3109 capture = self.pg0.get_capture(len(pkts))
3110 self.verify_capture_in(capture, self.pg0)
3112 # from non-NAT interface to NAT inside interface
3113 pkts = self.create_stream_in(self.pg2, self.pg0)
3114 self.pg2.add_stream(pkts)
3115 self.pg_enable_capture(self.pg_interfaces)
3117 capture = self.pg0.get_capture(len(pkts))
3118 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3120 def test_output_feature_vrf_aware(self):
3121 """ NAT44 interface output feature VRF aware (in2out postrouting) """
3122 nat_ip_vrf10 = "10.0.0.10"
3123 nat_ip_vrf20 = "10.0.0.20"
3125 r1 = VppIpRoute(self, self.pg3.remote_ip4, 32,
3126 [VppRoutePath(self.pg3.remote_ip4,
3127 self.pg3.sw_if_index)],
3129 r2 = VppIpRoute(self, self.pg3.remote_ip4, 32,
3130 [VppRoutePath(self.pg3.remote_ip4,
3131 self.pg3.sw_if_index)],
3136 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3137 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3138 flags = self.config_flags.NAT_IS_INSIDE
3139 self.vapi.nat44_interface_add_del_output_feature(
3140 is_add=1, flags=flags,
3141 sw_if_index=self.pg4.sw_if_index)
3142 self.vapi.nat44_interface_add_del_output_feature(
3143 is_add=1, flags=flags,
3144 sw_if_index=self.pg6.sw_if_index)
3145 self.vapi.nat44_interface_add_del_output_feature(
3147 sw_if_index=self.pg3.sw_if_index)
3150 pkts = self.create_stream_in(self.pg4, self.pg3)
3151 self.pg4.add_stream(pkts)
3152 self.pg_enable_capture(self.pg_interfaces)
3154 capture = self.pg3.get_capture(len(pkts))
3155 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3158 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3159 self.pg3.add_stream(pkts)
3160 self.pg_enable_capture(self.pg_interfaces)
3162 capture = self.pg4.get_capture(len(pkts))
3163 self.verify_capture_in(capture, self.pg4)
3166 pkts = self.create_stream_in(self.pg6, self.pg3)
3167 self.pg6.add_stream(pkts)
3168 self.pg_enable_capture(self.pg_interfaces)
3170 capture = self.pg3.get_capture(len(pkts))
3171 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3174 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3175 self.pg3.add_stream(pkts)
3176 self.pg_enable_capture(self.pg_interfaces)
3178 capture = self.pg6.get_capture(len(pkts))
3179 self.verify_capture_in(capture, self.pg6)
3181 def test_output_feature_hairpinning(self):
3182 """ NAT44 interface output feature hairpinning (in2out postrouting) """
3183 host = self.pg0.remote_hosts[0]
3184 server = self.pg0.remote_hosts[1]
3187 server_in_port = 5678
3188 server_out_port = 8765
3190 self.nat44_add_address(self.nat_addr)
3191 flags = self.config_flags.NAT_IS_INSIDE
3192 self.vapi.nat44_interface_add_del_output_feature(
3193 is_add=1, flags=flags,
3194 sw_if_index=self.pg0.sw_if_index)
3195 self.vapi.nat44_interface_add_del_output_feature(
3197 sw_if_index=self.pg1.sw_if_index)
3199 # add static mapping for server
3200 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3201 server_in_port, server_out_port,
3202 proto=IP_PROTOS.tcp)
3204 # send packet from host to server
3205 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3206 IP(src=host.ip4, dst=self.nat_addr) /
3207 TCP(sport=host_in_port, dport=server_out_port))
3208 self.pg0.add_stream(p)
3209 self.pg_enable_capture(self.pg_interfaces)
3211 capture = self.pg0.get_capture(1)
3216 self.assertEqual(ip.src, self.nat_addr)
3217 self.assertEqual(ip.dst, server.ip4)
3218 self.assertNotEqual(tcp.sport, host_in_port)
3219 self.assertEqual(tcp.dport, server_in_port)
3220 self.assert_packet_checksums_valid(p)
3221 host_out_port = tcp.sport
3223 self.logger.error(ppp("Unexpected or invalid packet:", p))
3226 # send reply from server to host
3227 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3228 IP(src=server.ip4, dst=self.nat_addr) /
3229 TCP(sport=server_in_port, dport=host_out_port))
3230 self.pg0.add_stream(p)
3231 self.pg_enable_capture(self.pg_interfaces)
3233 capture = self.pg0.get_capture(1)
3238 self.assertEqual(ip.src, self.nat_addr)
3239 self.assertEqual(ip.dst, host.ip4)
3240 self.assertEqual(tcp.sport, server_out_port)
3241 self.assertEqual(tcp.dport, host_in_port)
3242 self.assert_packet_checksums_valid(p)
3244 self.logger.error(ppp("Unexpected or invalid packet:", p))
3247 def test_one_armed_nat44(self):
3248 """ One armed NAT44 """
3249 remote_host = self.pg9.remote_hosts[0]
3250 local_host = self.pg9.remote_hosts[1]
3253 self.nat44_add_address(self.nat_addr)
3254 flags = self.config_flags.NAT_IS_INSIDE
3255 self.vapi.nat44_interface_add_del_feature(
3256 sw_if_index=self.pg9.sw_if_index,
3258 self.vapi.nat44_interface_add_del_feature(
3259 sw_if_index=self.pg9.sw_if_index,
3260 flags=flags, is_add=1)
3263 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3264 IP(src=local_host.ip4, dst=remote_host.ip4) /
3265 TCP(sport=12345, dport=80))
3266 self.pg9.add_stream(p)
3267 self.pg_enable_capture(self.pg_interfaces)
3269 capture = self.pg9.get_capture(1)
3274 self.assertEqual(ip.src, self.nat_addr)
3275 self.assertEqual(ip.dst, remote_host.ip4)
3276 self.assertNotEqual(tcp.sport, 12345)
3277 external_port = tcp.sport
3278 self.assertEqual(tcp.dport, 80)
3279 self.assert_packet_checksums_valid(p)
3281 self.logger.error(ppp("Unexpected or invalid packet:", p))
3285 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3286 IP(src=remote_host.ip4, dst=self.nat_addr) /
3287 TCP(sport=80, dport=external_port))
3288 self.pg9.add_stream(p)
3289 self.pg_enable_capture(self.pg_interfaces)
3291 capture = self.pg9.get_capture(1)
3296 self.assertEqual(ip.src, remote_host.ip4)
3297 self.assertEqual(ip.dst, local_host.ip4)
3298 self.assertEqual(tcp.sport, 80)
3299 self.assertEqual(tcp.dport, 12345)
3300 self.assert_packet_checksums_valid(p)
3302 self.logger.error(ppp("Unexpected or invalid packet:", p))
3305 err = self.statistics.get_err_counter(
3306 '/err/nat44-classify/next in2out')
3307 self.assertEqual(err, 1)
3308 err = self.statistics.get_err_counter(
3309 '/err/nat44-classify/next out2in')
3310 self.assertEqual(err, 1)
3312 def test_del_session(self):
3313 """ Delete NAT44 session """
3314 self.nat44_add_address(self.nat_addr)
3315 flags = self.config_flags.NAT_IS_INSIDE
3316 self.vapi.nat44_interface_add_del_feature(
3317 sw_if_index=self.pg0.sw_if_index,
3318 flags=flags, is_add=1)
3319 self.vapi.nat44_interface_add_del_feature(
3320 sw_if_index=self.pg1.sw_if_index,
3323 pkts = self.create_stream_in(self.pg0, self.pg1)
3324 self.pg0.add_stream(pkts)
3325 self.pg_enable_capture(self.pg_interfaces)
3327 self.pg1.get_capture(len(pkts))
3329 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3330 nsessions = len(sessions)
3332 self.vapi.nat44_del_session(address=sessions[0].inside_ip_address,
3333 port=sessions[0].inside_port,
3334 protocol=sessions[0].protocol,
3335 flags=self.config_flags.NAT_IS_INSIDE)
3336 self.vapi.nat44_del_session(address=sessions[1].outside_ip_address,
3337 port=sessions[1].outside_port,
3338 protocol=sessions[1].protocol)
3340 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3341 self.assertEqual(nsessions - len(sessions), 2)
3343 self.vapi.nat44_del_session(address=sessions[0].inside_ip_address,
3344 port=sessions[0].inside_port,
3345 protocol=sessions[0].protocol,
3346 flags=self.config_flags.NAT_IS_INSIDE)
3348 self.verify_no_nat44_user()
3350 def test_frag_in_order(self):
3351 """ NAT44 translate fragments arriving in order """
3353 self.nat44_add_address(self.nat_addr)
3354 flags = self.config_flags.NAT_IS_INSIDE
3355 self.vapi.nat44_interface_add_del_feature(
3356 sw_if_index=self.pg0.sw_if_index,
3357 flags=flags, is_add=1)
3358 self.vapi.nat44_interface_add_del_feature(
3359 sw_if_index=self.pg1.sw_if_index,
3362 self.frag_in_order(proto=IP_PROTOS.tcp)
3363 self.frag_in_order(proto=IP_PROTOS.udp)
3364 self.frag_in_order(proto=IP_PROTOS.icmp)
3366 def test_frag_forwarding(self):
3367 """ NAT44 forwarding fragment test """
3368 self.vapi.nat44_add_del_interface_addr(
3370 sw_if_index=self.pg1.sw_if_index)
3371 flags = self.config_flags.NAT_IS_INSIDE
3372 self.vapi.nat44_interface_add_del_feature(
3373 sw_if_index=self.pg0.sw_if_index,
3374 flags=flags, is_add=1)
3375 self.vapi.nat44_interface_add_del_feature(
3376 sw_if_index=self.pg1.sw_if_index,
3378 self.vapi.nat44_forwarding_enable_disable(enable=1)
3380 data = b"A" * 16 + b"B" * 16 + b"C" * 3
3381 pkts = self.create_stream_frag(self.pg1,
3382 self.pg0.remote_ip4,
3386 proto=IP_PROTOS.udp)
3387 self.pg1.add_stream(pkts)
3388 self.pg_enable_capture(self.pg_interfaces)
3390 frags = self.pg0.get_capture(len(pkts))
3391 p = self.reass_frags_and_verify(frags,
3392 self.pg1.remote_ip4,
3393 self.pg0.remote_ip4)
3394 self.assertEqual(p[UDP].sport, 4789)
3395 self.assertEqual(p[UDP].dport, 4789)
3396 self.assertEqual(data, p[Raw].load)
3398 def test_reass_hairpinning(self):
3399 """ NAT44 fragments hairpinning """
3401 self.server = self.pg0.remote_hosts[1]
3402 self.host_in_port = random.randint(1025, 65535)
3403 self.server_in_port = random.randint(1025, 65535)
3404 self.server_out_port = random.randint(1025, 65535)
3406 self.nat44_add_address(self.nat_addr)
3407 flags = self.config_flags.NAT_IS_INSIDE
3408 self.vapi.nat44_interface_add_del_feature(
3409 sw_if_index=self.pg0.sw_if_index,
3410 flags=flags, is_add=1)
3411 self.vapi.nat44_interface_add_del_feature(
3412 sw_if_index=self.pg1.sw_if_index,
3414 # add static mapping for server
3415 self.nat44_add_static_mapping(self.server.ip4, self.nat_addr,
3416 self.server_in_port,
3417 self.server_out_port,
3418 proto=IP_PROTOS.tcp)
3419 self.nat44_add_static_mapping(self.server.ip4, self.nat_addr,
3420 self.server_in_port,
3421 self.server_out_port,
3422 proto=IP_PROTOS.udp)
3423 self.nat44_add_static_mapping(self.server.ip4, self.nat_addr)
3425 self.reass_hairpinning(proto=IP_PROTOS.tcp)
3426 self.reass_hairpinning(proto=IP_PROTOS.udp)
3427 self.reass_hairpinning(proto=IP_PROTOS.icmp)
3429 def test_frag_out_of_order(self):
3430 """ NAT44 translate fragments arriving out of order """
3432 self.nat44_add_address(self.nat_addr)
3433 flags = self.config_flags.NAT_IS_INSIDE
3434 self.vapi.nat44_interface_add_del_feature(
3435 sw_if_index=self.pg0.sw_if_index,
3436 flags=flags, is_add=1)
3437 self.vapi.nat44_interface_add_del_feature(
3438 sw_if_index=self.pg1.sw_if_index,
3441 self.frag_out_of_order(proto=IP_PROTOS.tcp)
3442 self.frag_out_of_order(proto=IP_PROTOS.udp)
3443 self.frag_out_of_order(proto=IP_PROTOS.icmp)
3445 def test_port_restricted(self):
3446 """ Port restricted NAT44 (MAP-E CE) """
3447 self.nat44_add_address(self.nat_addr)
3448 flags = self.config_flags.NAT_IS_INSIDE
3449 self.vapi.nat44_interface_add_del_feature(
3450 sw_if_index=self.pg0.sw_if_index,
3451 flags=flags, is_add=1)
3452 self.vapi.nat44_interface_add_del_feature(
3453 sw_if_index=self.pg1.sw_if_index,
3455 self.vapi.nat_set_addr_and_port_alloc_alg(alg=1,
3460 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3461 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3462 TCP(sport=4567, dport=22))
3463 self.pg0.add_stream(p)
3464 self.pg_enable_capture(self.pg_interfaces)
3466 capture = self.pg1.get_capture(1)
3471 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3472 self.assertEqual(ip.src, self.nat_addr)
3473 self.assertEqual(tcp.dport, 22)
3474 self.assertNotEqual(tcp.sport, 4567)
3475 self.assertEqual((tcp.sport >> 6) & 63, 10)
3476 self.assert_packet_checksums_valid(p)
3478 self.logger.error(ppp("Unexpected or invalid packet:", p))
3481 def test_port_range(self):
3482 """ External address port range """
3483 self.nat44_add_address(self.nat_addr)
3484 flags = self.config_flags.NAT_IS_INSIDE
3485 self.vapi.nat44_interface_add_del_feature(
3486 sw_if_index=self.pg0.sw_if_index,
3487 flags=flags, is_add=1)
3488 self.vapi.nat44_interface_add_del_feature(
3489 sw_if_index=self.pg1.sw_if_index,
3491 self.vapi.nat_set_addr_and_port_alloc_alg(alg=2,
3496 for port in range(0, 5):
3497 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3498 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3499 TCP(sport=1125 + port))
3501 self.pg0.add_stream(pkts)
3502 self.pg_enable_capture(self.pg_interfaces)
3504 capture = self.pg1.get_capture(3)
3507 self.assertGreaterEqual(tcp.sport, 1025)
3508 self.assertLessEqual(tcp.sport, 1027)
3510 def test_multiple_outside_vrf(self):
3511 """ Multiple outside VRF """
3515 self.pg1.unconfig_ip4()
3516 self.pg2.unconfig_ip4()
3517 self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id1})
3518 self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id2})
3519 self.pg1.set_table_ip4(vrf_id1)
3520 self.pg2.set_table_ip4(vrf_id2)
3521 self.pg1.config_ip4()
3522 self.pg2.config_ip4()
3523 self.pg1.resolve_arp()
3524 self.pg2.resolve_arp()
3526 self.nat44_add_address(self.nat_addr)
3527 flags = self.config_flags.NAT_IS_INSIDE
3528 self.vapi.nat44_interface_add_del_feature(
3529 sw_if_index=self.pg0.sw_if_index,
3530 flags=flags, is_add=1)
3531 self.vapi.nat44_interface_add_del_feature(
3532 sw_if_index=self.pg1.sw_if_index,
3534 self.vapi.nat44_interface_add_del_feature(
3535 sw_if_index=self.pg2.sw_if_index,
3540 pkts = self.create_stream_in(self.pg0, self.pg1)
3541 self.pg0.add_stream(pkts)
3542 self.pg_enable_capture(self.pg_interfaces)
3544 capture = self.pg1.get_capture(len(pkts))
3545 self.verify_capture_out(capture, self.nat_addr)
3547 pkts = self.create_stream_out(self.pg1, self.nat_addr)
3548 self.pg1.add_stream(pkts)
3549 self.pg_enable_capture(self.pg_interfaces)
3551 capture = self.pg0.get_capture(len(pkts))
3552 self.verify_capture_in(capture, self.pg0)
3554 self.tcp_port_in = 60303
3555 self.udp_port_in = 60304
3556 self.icmp_id_in = 60305
3559 pkts = self.create_stream_in(self.pg0, self.pg2)
3560 self.pg0.add_stream(pkts)
3561 self.pg_enable_capture(self.pg_interfaces)
3563 capture = self.pg2.get_capture(len(pkts))
3564 self.verify_capture_out(capture, self.nat_addr)
3566 pkts = self.create_stream_out(self.pg2, self.nat_addr)
3567 self.pg2.add_stream(pkts)
3568 self.pg_enable_capture(self.pg_interfaces)
3570 capture = self.pg0.get_capture(len(pkts))
3571 self.verify_capture_in(capture, self.pg0)
3574 self.nat44_add_address(self.nat_addr, is_add=0)
3575 self.pg1.unconfig_ip4()
3576 self.pg2.unconfig_ip4()
3577 self.pg1.set_table_ip4(0)
3578 self.pg2.set_table_ip4(0)
3579 self.pg1.config_ip4()
3580 self.pg2.config_ip4()
3581 self.pg1.resolve_arp()
3582 self.pg2.resolve_arp()
3584 def test_mss_clamping(self):
3585 """ TCP MSS clamping """
3586 self.nat44_add_address(self.nat_addr)
3587 flags = self.config_flags.NAT_IS_INSIDE
3588 self.vapi.nat44_interface_add_del_feature(
3589 sw_if_index=self.pg0.sw_if_index,
3590 flags=flags, is_add=1)
3591 self.vapi.nat44_interface_add_del_feature(
3592 sw_if_index=self.pg1.sw_if_index,
3595 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3596 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3597 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3598 flags="S", options=[('MSS', 1400)]))
3600 self.vapi.nat_set_mss_clamping(enable=1, mss_value=1000)
3601 self.pg0.add_stream(p)
3602 self.pg_enable_capture(self.pg_interfaces)
3604 capture = self.pg1.get_capture(1)
3605 # Negotiated MSS value greater than configured - changed
3606 self.verify_mss_value(capture[0], 1000)
3608 self.vapi.nat_set_mss_clamping(enable=0, mss_value=1500)
3609 self.pg0.add_stream(p)
3610 self.pg_enable_capture(self.pg_interfaces)
3612 capture = self.pg1.get_capture(1)
3613 # MSS clamping disabled - negotiated MSS unchanged
3614 self.verify_mss_value(capture[0], 1400)
3616 self.vapi.nat_set_mss_clamping(enable=1, mss_value=1500)
3617 self.pg0.add_stream(p)
3618 self.pg_enable_capture(self.pg_interfaces)
3620 capture = self.pg1.get_capture(1)
3621 # Negotiated MSS value smaller than configured - unchanged
3622 self.verify_mss_value(capture[0], 1400)
3624 def test_ha_send(self):
3625 """ Send HA session synchronization events (active) """
3626 flags = self.config_flags.NAT_IS_INSIDE
3627 self.vapi.nat44_interface_add_del_feature(
3628 sw_if_index=self.pg0.sw_if_index,
3629 flags=flags, is_add=1)
3630 self.vapi.nat44_interface_add_del_feature(
3631 sw_if_index=self.pg1.sw_if_index,
3633 self.nat44_add_address(self.nat_addr)
3635 self.vapi.nat_ha_set_listener(ip_address=self.pg3.local_ip4,
3638 self.vapi.nat_ha_set_failover(ip_address=self.pg3.remote_ip4,
3639 port=12346, session_refresh_interval=10)
3640 bind_layers(UDP, HANATStateSync, sport=12345)
3643 pkts = self.create_stream_in(self.pg0, self.pg1)
3644 self.pg0.add_stream(pkts)
3645 self.pg_enable_capture(self.pg_interfaces)
3647 capture = self.pg1.get_capture(len(pkts))
3648 self.verify_capture_out(capture)
3649 # active send HA events
3650 self.vapi.nat_ha_flush()
3651 stats = self.statistics.get_counter('/nat44/ha/add-event-send')
3652 self.assertEqual(stats[0][0], 3)
3653 capture = self.pg3.get_capture(1)
3655 self.assert_packet_checksums_valid(p)
3659 hanat = p[HANATStateSync]
3661 self.logger.error(ppp("Invalid packet:", p))
3664 self.assertEqual(ip.src, self.pg3.local_ip4)
3665 self.assertEqual(ip.dst, self.pg3.remote_ip4)
3666 self.assertEqual(udp.sport, 12345)
3667 self.assertEqual(udp.dport, 12346)
3668 self.assertEqual(hanat.version, 1)
3669 self.assertEqual(hanat.thread_index, 0)
3670 self.assertEqual(hanat.count, 3)
3671 seq = hanat.sequence_number
3672 for event in hanat.events:
3673 self.assertEqual(event.event_type, 1)
3674 self.assertEqual(event.in_addr, self.pg0.remote_ip4)
3675 self.assertEqual(event.out_addr, self.nat_addr)
3676 self.assertEqual(event.fib_index, 0)
3678 # ACK received events
3679 ack = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
3680 IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
3681 UDP(sport=12346, dport=12345) /
3682 HANATStateSync(sequence_number=seq, flags='ACK'))
3683 self.pg3.add_stream(ack)
3685 stats = self.statistics.get_counter('/nat44/ha/ack-recv')
3686 self.assertEqual(stats[0][0], 1)
3688 # delete one session
3689 self.pg_enable_capture(self.pg_interfaces)
3690 self.vapi.nat44_del_session(address=self.pg0.remote_ip4,
3691 port=self.tcp_port_in,
3692 protocol=IP_PROTOS.tcp,
3693 flags=self.config_flags.NAT_IS_INSIDE)
3694 self.vapi.nat_ha_flush()
3695 stats = self.statistics.get_counter('/nat44/ha/del-event-send')
3696 self.assertEqual(stats[0][0], 1)
3697 capture = self.pg3.get_capture(1)
3700 hanat = p[HANATStateSync]
3702 self.logger.error(ppp("Invalid packet:", p))
3705 self.assertGreater(hanat.sequence_number, seq)
3707 # do not send ACK, active retry send HA event again
3708 self.pg_enable_capture(self.pg_interfaces)
3710 stats = self.statistics.get_counter('/nat44/ha/retry-count')
3711 self.assertEqual(stats[0][0], 3)
3712 stats = self.statistics.get_counter('/nat44/ha/missed-count')
3713 self.assertEqual(stats[0][0], 1)
3714 capture = self.pg3.get_capture(3)
3715 for packet in capture:
3716 self.assertEqual(packet, p)
3718 # session counters refresh
3719 pkts = self.create_stream_out(self.pg1)
3720 self.pg1.add_stream(pkts)
3721 self.pg_enable_capture(self.pg_interfaces)
3723 self.pg0.get_capture(2)
3724 self.vapi.nat_ha_flush()
3725 stats = self.statistics.get_counter('/nat44/ha/refresh-event-send')
3726 self.assertEqual(stats[0][0], 2)
3727 capture = self.pg3.get_capture(1)
3729 self.assert_packet_checksums_valid(p)
3733 hanat = p[HANATStateSync]
3735 self.logger.error(ppp("Invalid packet:", p))
3738 self.assertEqual(ip.src, self.pg3.local_ip4)
3739 self.assertEqual(ip.dst, self.pg3.remote_ip4)
3740 self.assertEqual(udp.sport, 12345)
3741 self.assertEqual(udp.dport, 12346)
3742 self.assertEqual(hanat.version, 1)
3743 self.assertEqual(hanat.count, 2)
3744 seq = hanat.sequence_number
3745 for event in hanat.events:
3746 self.assertEqual(event.event_type, 3)
3747 self.assertEqual(event.out_addr, self.nat_addr)
3748 self.assertEqual(event.fib_index, 0)
3749 self.assertEqual(event.total_pkts, 2)
3750 self.assertGreater(event.total_bytes, 0)
3752 ack = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
3753 IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
3754 UDP(sport=12346, dport=12345) /
3755 HANATStateSync(sequence_number=seq, flags='ACK'))
3756 self.pg3.add_stream(ack)
3758 stats = self.statistics.get_counter('/nat44/ha/ack-recv')
3759 self.assertEqual(stats[0][0], 2)
3761 def test_ha_recv(self):
3762 """ Receive HA session synchronization events (passive) """
3763 self.nat44_add_address(self.nat_addr)
3764 flags = self.config_flags.NAT_IS_INSIDE
3765 self.vapi.nat44_interface_add_del_feature(
3766 sw_if_index=self.pg0.sw_if_index,
3767 flags=flags, is_add=1)
3768 self.vapi.nat44_interface_add_del_feature(
3769 sw_if_index=self.pg1.sw_if_index,
3771 self.vapi.nat_ha_set_listener(ip_address=self.pg3.local_ip4,
3774 bind_layers(UDP, HANATStateSync, sport=12345)
3776 self.tcp_port_out = random.randint(1025, 65535)
3777 self.udp_port_out = random.randint(1025, 65535)
3779 # send HA session add events to failover/passive
3780 p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
3781 IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
3782 UDP(sport=12346, dport=12345) /
3783 HANATStateSync(sequence_number=1, events=[
3784 Event(event_type='add', protocol='tcp',
3785 in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
3786 in_port=self.tcp_port_in, out_port=self.tcp_port_out,
3787 eh_addr=self.pg1.remote_ip4,
3788 ehn_addr=self.pg1.remote_ip4,
3789 eh_port=self.tcp_external_port,
3790 ehn_port=self.tcp_external_port, fib_index=0),
3791 Event(event_type='add', protocol='udp',
3792 in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
3793 in_port=self.udp_port_in, out_port=self.udp_port_out,
3794 eh_addr=self.pg1.remote_ip4,
3795 ehn_addr=self.pg1.remote_ip4,
3796 eh_port=self.udp_external_port,
3797 ehn_port=self.udp_external_port, fib_index=0)]))
3799 self.pg3.add_stream(p)
3800 self.pg_enable_capture(self.pg_interfaces)
3803 capture = self.pg3.get_capture(1)
3806 hanat = p[HANATStateSync]
3808 self.logger.error(ppp("Invalid packet:", p))
3811 self.assertEqual(hanat.sequence_number, 1)
3812 self.assertEqual(hanat.flags, 'ACK')
3813 self.assertEqual(hanat.version, 1)
3814 self.assertEqual(hanat.thread_index, 0)
3815 stats = self.statistics.get_counter('/nat44/ha/ack-send')
3816 self.assertEqual(stats[0][0], 1)
3817 stats = self.statistics.get_counter('/nat44/ha/add-event-recv')
3818 self.assertEqual(stats[0][0], 2)
3819 users = self.statistics.get_counter('/nat44/total-users')
3820 self.assertEqual(users[0][0], 1)
3821 sessions = self.statistics.get_counter('/nat44/total-sessions')
3822 self.assertEqual(sessions[0][0], 2)
3823 users = self.vapi.nat44_user_dump()
3824 self.assertEqual(len(users), 1)
3825 self.assertEqual(str(users[0].ip_address),
3826 self.pg0.remote_ip4)
3827 # there should be 2 sessions created by HA
3828 sessions = self.vapi.nat44_user_session_dump(users[0].ip_address,
3830 self.assertEqual(len(sessions), 2)
3831 for session in sessions:
3832 self.assertEqual(str(session.inside_ip_address),
3833 self.pg0.remote_ip4)
3834 self.assertEqual(str(session.outside_ip_address),
3836 self.assertIn(session.inside_port,
3837 [self.tcp_port_in, self.udp_port_in])
3838 self.assertIn(session.outside_port,
3839 [self.tcp_port_out, self.udp_port_out])
3840 self.assertIn(session.protocol, [IP_PROTOS.tcp, IP_PROTOS.udp])
3842 # send HA session delete event to failover/passive
3843 p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
3844 IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
3845 UDP(sport=12346, dport=12345) /
3846 HANATStateSync(sequence_number=2, events=[
3847 Event(event_type='del', protocol='udp',
3848 in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
3849 in_port=self.udp_port_in, out_port=self.udp_port_out,
3850 eh_addr=self.pg1.remote_ip4,
3851 ehn_addr=self.pg1.remote_ip4,
3852 eh_port=self.udp_external_port,
3853 ehn_port=self.udp_external_port, fib_index=0)]))
3855 self.pg3.add_stream(p)
3856 self.pg_enable_capture(self.pg_interfaces)
3859 capture = self.pg3.get_capture(1)
3862 hanat = p[HANATStateSync]
3864 self.logger.error(ppp("Invalid packet:", p))
3867 self.assertEqual(hanat.sequence_number, 2)
3868 self.assertEqual(hanat.flags, 'ACK')
3869 self.assertEqual(hanat.version, 1)
3870 users = self.vapi.nat44_user_dump()
3871 self.assertEqual(len(users), 1)
3872 self.assertEqual(str(users[0].ip_address),
3873 self.pg0.remote_ip4)
3874 # now we should have only 1 session, 1 deleted by HA
3875 sessions = self.vapi.nat44_user_session_dump(users[0].ip_address,
3877 self.assertEqual(len(sessions), 1)
3878 stats = self.statistics.get_counter('/nat44/ha/del-event-recv')
3879 self.assertEqual(stats[0][0], 1)
3881 stats = self.statistics.get_err_counter('/err/nat-ha/pkts-processed')
3882 self.assertEqual(stats, 2)
3884 # send HA session refresh event to failover/passive
3885 p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
3886 IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
3887 UDP(sport=12346, dport=12345) /
3888 HANATStateSync(sequence_number=3, events=[
3889 Event(event_type='refresh', protocol='tcp',
3890 in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
3891 in_port=self.tcp_port_in, out_port=self.tcp_port_out,
3892 eh_addr=self.pg1.remote_ip4,
3893 ehn_addr=self.pg1.remote_ip4,
3894 eh_port=self.tcp_external_port,
3895 ehn_port=self.tcp_external_port, fib_index=0,
3896 total_bytes=1024, total_pkts=2)]))
3897 self.pg3.add_stream(p)
3898 self.pg_enable_capture(self.pg_interfaces)
3901 capture = self.pg3.get_capture(1)
3904 hanat = p[HANATStateSync]
3906 self.logger.error(ppp("Invalid packet:", p))
3909 self.assertEqual(hanat.sequence_number, 3)
3910 self.assertEqual(hanat.flags, 'ACK')
3911 self.assertEqual(hanat.version, 1)
3912 users = self.vapi.nat44_user_dump()
3913 self.assertEqual(len(users), 1)
3914 self.assertEqual(str(users[0].ip_address),
3915 self.pg0.remote_ip4)
3916 sessions = self.vapi.nat44_user_session_dump(users[0].ip_address,
3918 self.assertEqual(len(sessions), 1)
3919 session = sessions[0]
3920 self.assertEqual(session.total_bytes, 1024)
3921 self.assertEqual(session.total_pkts, 2)
3922 stats = self.statistics.get_counter('/nat44/ha/refresh-event-recv')
3923 self.assertEqual(stats[0][0], 1)
3925 stats = self.statistics.get_err_counter('/err/nat-ha/pkts-processed')
3926 self.assertEqual(stats, 3)
3928 # send packet to test session created by HA
3929 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3930 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3931 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out))
3932 self.pg1.add_stream(p)
3933 self.pg_enable_capture(self.pg_interfaces)
3935 capture = self.pg0.get_capture(1)
3941 self.logger.error(ppp("Invalid packet:", p))
3944 self.assertEqual(ip.src, self.pg1.remote_ip4)
3945 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3946 self.assertEqual(tcp.sport, self.tcp_external_port)
3947 self.assertEqual(tcp.dport, self.tcp_port_in)
3949 def show_commands_at_teardown(self):
3950 self.logger.info(self.vapi.cli("show nat44 addresses"))
3951 self.logger.info(self.vapi.cli("show nat44 interfaces"))
3952 self.logger.info(self.vapi.cli("show nat44 static mappings"))
3953 self.logger.info(self.vapi.cli("show nat44 interface address"))
3954 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3955 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
3956 self.logger.info(self.vapi.cli("show nat timeouts"))
3958 self.vapi.cli("show nat addr-port-assignment-alg"))
3959 self.logger.info(self.vapi.cli("show nat ha"))
3962 class TestNAT44EndpointDependent2(MethodHolder):
3963 """ Endpoint-Dependent mapping and filtering test cases """
3966 def tearDownClass(cls):
3967 super(TestNAT44EndpointDependent2, cls).tearDownClass()
3970 super(TestNAT44EndpointDependent2, self).tearDown()
3973 def create_and_add_ip4_table(cls, i, table_id):
3974 cls.vapi.ip_table_add_del(is_add=1, table={'table_id': table_id})
3975 i.set_table_ip4(table_id)
3978 def setUpClass(cls):
3979 super(TestNAT44EndpointDependent2, cls).setUpClass()
3981 cls.create_pg_interfaces(range(3))
3982 cls.interfaces = list(cls.pg_interfaces)
3984 cls.create_and_add_ip4_table(cls.pg1, 10)
3986 for i in cls.interfaces:
3991 i.generate_remote_hosts(1)
3992 i.configure_ipv4_neighbors()
3995 super(TestNAT44EndpointDependent2, self).setUp()
3996 flags = self.nat44_config_flags.NAT44_IS_ENDPOINT_DEPENDENT
3997 self.vapi.nat44_plugin_enable_disable(enable=1, flags=flags)
4000 super(TestNAT44EndpointDependent2, self).tearDown()
4001 if not self.vpp_dead:
4002 self.vapi.nat44_plugin_enable_disable(enable=0)
4003 self.vapi.cli("clear logging")
4005 def nat_add_inside_interface(self, i):
4006 self.vapi.nat44_interface_add_del_feature(
4007 flags=self.config_flags.NAT_IS_INSIDE,
4008 sw_if_index=i.sw_if_index, is_add=1)
4010 def nat_add_outside_interface(self, i):
4011 self.vapi.nat44_interface_add_del_feature(
4012 flags=self.config_flags.NAT_IS_OUTSIDE,
4013 sw_if_index=i.sw_if_index, is_add=1)
4015 def nat_add_interface_address(self, i):
4016 self.nat_addr = i.local_ip4
4017 self.vapi.nat44_add_del_interface_addr(
4018 sw_if_index=i.sw_if_index, is_add=1)
4020 def nat_add_address(self, address, vrf_id=0xFFFFFFFF):
4021 self.nat_addr = address
4022 self.nat44_add_address(address, vrf_id=vrf_id)
4024 def cli(self, command):
4025 result = self.vapi.cli(command)
4026 self.logger.info(result)
4029 def show_configuration(self):
4030 self.cli("show interface")
4031 self.cli("show interface address")
4032 self.cli("show nat44 addresses")
4033 self.cli("show nat44 interfaces")
4035 def create_tcp_stream(self, in_if, out_if, count):
4037 Create tcp packet stream
4039 :param in_if: Inside interface
4040 :param out_if: Outside interface
4041 :param count: count of packets to generate
4046 for i in range(count):
4047 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4048 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=64) /
4049 TCP(sport=port + i, dport=20))
4054 def test_session_limit_per_vrf(self):
4057 inside_vrf10 = self.pg1
4062 # 2 interfaces pg0, pg1 (vrf10, limit 1 tcp session)
4063 # non existing vrf_id makes process core dump
4064 self.vapi.nat44_set_session_limit(session_limit=limit, vrf_id=10)
4066 self.nat_add_inside_interface(inside)
4067 self.nat_add_inside_interface(inside_vrf10)
4068 self.nat_add_outside_interface(outside)
4071 self.nat_add_interface_address(outside)
4073 # BUG: causing core dump - when bad vrf_id is specified
4074 # self.nat44_add_address(outside.local_ip4, vrf_id=20)
4076 self.show_configuration()
4078 stream = self.create_tcp_stream(inside_vrf10, outside, limit * 2)
4079 inside_vrf10.add_stream(stream)
4081 self.pg_enable_capture(self.pg_interfaces)
4084 capture = outside.get_capture(limit)
4086 stream = self.create_tcp_stream(inside, outside, limit * 2)
4087 inside.add_stream(stream)
4089 self.pg_enable_capture(self.pg_interfaces)
4092 capture = outside.get_capture(len(stream))
4095 class TestNAT44EndpointDependent(MethodHolder):
4096 """ Endpoint-Dependent mapping and filtering test cases """
4099 def setUpClass(cls):
4100 super(TestNAT44EndpointDependent, cls).setUpClass()
4101 cls.vapi.cli("set log class nat level debug")
4103 cls.tcp_port_in = 6303
4104 cls.tcp_port_out = 6303
4105 cls.udp_port_in = 6304
4106 cls.udp_port_out = 6304
4107 cls.icmp_id_in = 6305
4108 cls.icmp_id_out = 6305
4109 cls.nat_addr = '10.0.0.3'
4110 cls.ipfix_src_port = 4739
4111 cls.ipfix_domain_id = 1
4112 cls.tcp_external_port = 80
4114 cls.create_pg_interfaces(range(9))
4115 cls.interfaces = list(cls.pg_interfaces[0:3])
4117 for i in cls.interfaces:
4122 cls.pg0.generate_remote_hosts(3)
4123 cls.pg0.configure_ipv4_neighbors()
4127 cls.pg4.generate_remote_hosts(2)
4128 cls.pg4.config_ip4()
4129 cls.vapi.sw_interface_add_del_address(
4130 sw_if_index=cls.pg4.sw_if_index,
4131 prefix="10.0.0.1/24")
4134 cls.pg4.resolve_arp()
4135 cls.pg4._remote_hosts[1]._ip4 = cls.pg4._remote_hosts[0]._ip4
4136 cls.pg4.resolve_arp()
4138 zero_ip4 = socket.inet_pton(socket.AF_INET, "0.0.0.0")
4139 cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 1})
4141 cls.pg5._local_ip4 = "10.1.1.1"
4142 cls.pg5._remote_hosts[0]._ip4 = "10.1.1.2"
4143 cls.pg5.set_table_ip4(1)
4144 cls.pg5.config_ip4()
4146 r1 = VppIpRoute(cls, cls.pg5.remote_ip4, 32,
4147 [VppRoutePath("0.0.0.0",
4148 cls.pg5.sw_if_index)],
4153 cls.pg6._local_ip4 = "10.1.2.1"
4154 cls.pg6._remote_hosts[0]._ip4 = "10.1.2.2"
4155 cls.pg6.set_table_ip4(1)
4156 cls.pg6.config_ip4()
4159 r2 = VppIpRoute(cls, cls.pg6.remote_ip4, 32,
4160 [VppRoutePath("0.0.0.0",
4161 cls.pg6.sw_if_index)],
4164 r3 = VppIpRoute(cls, cls.pg6.remote_ip4, 16,
4165 [VppRoutePath("0.0.0.0",
4170 r4 = VppIpRoute(cls, "0.0.0.0", 0,
4171 [VppRoutePath("0.0.0.0", 0xffffffff,
4175 r5 = VppIpRoute(cls, "0.0.0.0", 0,
4176 [VppRoutePath(cls.pg1.local_ip4,
4177 cls.pg1.sw_if_index)],
4184 cls.pg5.resolve_arp()
4185 cls.pg6.resolve_arp()
4188 cls.pg7.config_ip4()
4189 cls.pg7.resolve_arp()
4190 cls.pg7.generate_remote_hosts(3)
4191 cls.pg7.configure_ipv4_neighbors()
4194 cls.pg8.config_ip4()
4195 cls.pg8.resolve_arp()
4198 def tearDownClass(cls):
4199 super(TestNAT44EndpointDependent, cls).tearDownClass()
4202 super(TestNAT44EndpointDependent, self).setUp()
4203 flags = self.nat44_config_flags.NAT44_IS_ENDPOINT_DEPENDENT
4204 self.vapi.nat44_plugin_enable_disable(enable=1, flags=flags)
4205 self.vapi.nat_set_timeouts(
4206 udp=300, tcp_established=7440,
4207 tcp_transitory=240, icmp=60)
4210 super(TestNAT44EndpointDependent, self).tearDown()
4211 if not self.vpp_dead:
4212 self.vapi.nat44_plugin_enable_disable(enable=0)
4213 self.vapi.cli("clear logging")
4215 def test_frag_in_order(self):
4216 """ NAT44 translate fragments arriving in order """
4217 self.nat44_add_address(self.nat_addr)
4218 flags = self.config_flags.NAT_IS_INSIDE
4219 self.vapi.nat44_interface_add_del_feature(
4220 sw_if_index=self.pg0.sw_if_index,
4221 flags=flags, is_add=1)
4222 self.vapi.nat44_interface_add_del_feature(
4223 sw_if_index=self.pg1.sw_if_index,
4225 self.frag_in_order(proto=IP_PROTOS.tcp, ignore_port=True)
4226 self.frag_in_order(proto=IP_PROTOS.udp, ignore_port=True)
4227 self.frag_in_order(proto=IP_PROTOS.icmp, ignore_port=True)
4229 def test_frag_in_order_dont_translate(self):
4230 """ NAT44 don't translate fragments arriving in order """
4231 flags = self.config_flags.NAT_IS_INSIDE
4232 self.vapi.nat44_interface_add_del_feature(
4233 sw_if_index=self.pg0.sw_if_index,
4234 flags=flags, is_add=1)
4235 self.vapi.nat44_interface_add_del_feature(
4236 sw_if_index=self.pg1.sw_if_index,
4238 self.vapi.nat44_forwarding_enable_disable(enable=True)
4239 self.frag_in_order(proto=IP_PROTOS.tcp, dont_translate=True)
4241 def test_frag_out_of_order(self):
4242 """ NAT44 translate fragments arriving out of order """
4243 self.nat44_add_address(self.nat_addr)
4244 flags = self.config_flags.NAT_IS_INSIDE
4245 self.vapi.nat44_interface_add_del_feature(
4246 sw_if_index=self.pg0.sw_if_index,
4247 flags=flags, is_add=1)
4248 self.vapi.nat44_interface_add_del_feature(
4249 sw_if_index=self.pg1.sw_if_index,
4251 self.frag_out_of_order(proto=IP_PROTOS.tcp, ignore_port=True)
4252 self.frag_out_of_order(proto=IP_PROTOS.udp, ignore_port=True)
4253 self.frag_out_of_order(proto=IP_PROTOS.icmp, ignore_port=True)
4255 def test_frag_out_of_order_dont_translate(self):
4256 """ NAT44 don't translate fragments arriving out of order """
4257 flags = self.config_flags.NAT_IS_INSIDE
4258 self.vapi.nat44_interface_add_del_feature(
4259 sw_if_index=self.pg0.sw_if_index,
4260 flags=flags, is_add=1)
4261 self.vapi.nat44_interface_add_del_feature(
4262 sw_if_index=self.pg1.sw_if_index,
4264 self.vapi.nat44_forwarding_enable_disable(enable=True)
4265 self.frag_out_of_order(proto=IP_PROTOS.tcp, dont_translate=True)
4267 def test_frag_in_order_in_plus_out(self):
4268 """ in+out interface fragments in order """
4269 flags = self.config_flags.NAT_IS_INSIDE
4270 self.vapi.nat44_interface_add_del_feature(
4271 sw_if_index=self.pg0.sw_if_index,
4273 self.vapi.nat44_interface_add_del_feature(
4274 sw_if_index=self.pg0.sw_if_index,
4275 flags=flags, is_add=1)
4276 self.vapi.nat44_interface_add_del_feature(
4277 sw_if_index=self.pg1.sw_if_index,
4279 self.vapi.nat44_interface_add_del_feature(
4280 sw_if_index=self.pg1.sw_if_index,
4281 flags=flags, is_add=1)
4283 self.server = self.pg1.remote_hosts[0]
4285 self.server_in_addr = self.server.ip4
4286 self.server_out_addr = '11.11.11.11'
4287 self.server_in_port = random.randint(1025, 65535)
4288 self.server_out_port = random.randint(1025, 65535)
4290 self.nat44_add_address(self.server_out_addr)
4292 # add static mappings for server
4293 self.nat44_add_static_mapping(self.server_in_addr,
4294 self.server_out_addr,
4295 self.server_in_port,
4296 self.server_out_port,
4297 proto=IP_PROTOS.tcp)
4298 self.nat44_add_static_mapping(self.server_in_addr,
4299 self.server_out_addr,
4300 self.server_in_port,
4301 self.server_out_port,
4302 proto=IP_PROTOS.udp)
4303 self.nat44_add_static_mapping(self.server_in_addr,
4304 self.server_out_addr,
4305 proto=IP_PROTOS.icmp)
4307 self.frag_in_order_in_plus_out(proto=IP_PROTOS.tcp)
4308 self.frag_in_order_in_plus_out(proto=IP_PROTOS.udp)
4309 self.frag_in_order_in_plus_out(proto=IP_PROTOS.icmp)
4311 def test_frag_out_of_order_in_plus_out(self):
4312 """ in+out interface fragments out of order """
4313 flags = self.config_flags.NAT_IS_INSIDE
4314 self.vapi.nat44_interface_add_del_feature(
4315 sw_if_index=self.pg0.sw_if_index,
4317 self.vapi.nat44_interface_add_del_feature(
4318 sw_if_index=self.pg0.sw_if_index,
4319 flags=flags, is_add=1)
4320 self.vapi.nat44_interface_add_del_feature(
4321 sw_if_index=self.pg1.sw_if_index,
4323 self.vapi.nat44_interface_add_del_feature(
4324 sw_if_index=self.pg1.sw_if_index,
4325 flags=flags, is_add=1)
4327 self.server = self.pg1.remote_hosts[0]
4329 self.server_in_addr = self.server.ip4
4330 self.server_out_addr = '11.11.11.11'
4331 self.server_in_port = random.randint(1025, 65535)
4332 self.server_out_port = random.randint(1025, 65535)
4334 self.nat44_add_address(self.server_out_addr)
4336 # add static mappings for server
4337 self.nat44_add_static_mapping(self.server_in_addr,
4338 self.server_out_addr,
4339 self.server_in_port,
4340 self.server_out_port,
4341 proto=IP_PROTOS.tcp)
4342 self.nat44_add_static_mapping(self.server_in_addr,
4343 self.server_out_addr,
4344 self.server_in_port,
4345 self.server_out_port,
4346 proto=IP_PROTOS.udp)
4347 self.nat44_add_static_mapping(self.server_in_addr,
4348 self.server_out_addr,
4349 proto=IP_PROTOS.icmp)
4351 self.frag_out_of_order_in_plus_out(proto=IP_PROTOS.tcp)
4352 self.frag_out_of_order_in_plus_out(proto=IP_PROTOS.udp)
4353 self.frag_out_of_order_in_plus_out(proto=IP_PROTOS.icmp)
4355 def test_reass_hairpinning(self):
4356 """ NAT44 fragments hairpinning """
4357 self.server = self.pg0.remote_hosts[1]
4358 self.host_in_port = random.randint(1025, 65535)
4359 self.server_in_port = random.randint(1025, 65535)
4360 self.server_out_port = random.randint(1025, 65535)
4362 self.nat44_add_address(self.nat_addr)
4363 flags = self.config_flags.NAT_IS_INSIDE
4364 self.vapi.nat44_interface_add_del_feature(
4365 sw_if_index=self.pg0.sw_if_index,
4366 flags=flags, is_add=1)
4367 self.vapi.nat44_interface_add_del_feature(
4368 sw_if_index=self.pg1.sw_if_index,
4370 # add static mapping for server
4371 self.nat44_add_static_mapping(self.server.ip4, self.nat_addr,
4372 self.server_in_port,
4373 self.server_out_port,
4374 proto=IP_PROTOS.tcp)
4375 self.nat44_add_static_mapping(self.server.ip4, self.nat_addr,
4376 self.server_in_port,
4377 self.server_out_port,
4378 proto=IP_PROTOS.udp)
4379 self.nat44_add_static_mapping(self.server.ip4, self.nat_addr)
4381 self.reass_hairpinning(proto=IP_PROTOS.tcp, ignore_port=True)
4382 self.reass_hairpinning(proto=IP_PROTOS.udp, ignore_port=True)
4383 self.reass_hairpinning(proto=IP_PROTOS.icmp, ignore_port=True)
4385 def test_clear_sessions(self):
4386 """ NAT44 ED session clearing test """
4388 self.nat44_add_address(self.nat_addr)
4389 flags = self.config_flags.NAT_IS_INSIDE
4390 self.vapi.nat44_interface_add_del_feature(
4391 sw_if_index=self.pg0.sw_if_index,
4392 flags=flags, is_add=1)
4393 self.vapi.nat44_interface_add_del_feature(
4394 sw_if_index=self.pg1.sw_if_index,
4397 nat_config = self.vapi.nat_show_config()
4398 self.assertEqual(1, nat_config.endpoint_dependent)
4400 pkts = self.create_stream_in(self.pg0, self.pg1)
4401 self.pg0.add_stream(pkts)
4402 self.pg_enable_capture(self.pg_interfaces)
4404 capture = self.pg1.get_capture(len(pkts))
4405 self.verify_capture_out(capture, ignore_port=True)
4407 sessions = self.statistics.get_counter('/nat44/total-sessions')
4408 self.assertTrue(sessions[0][0] > 0)
4409 self.logger.info("sessions before clearing: %s" % sessions[0][0])
4411 # just for testing purposes
4412 self.logger.info(self.vapi.cli("show nat44 summary"))
4414 self.vapi.cli("clear nat44 sessions")
4416 self.logger.info(self.vapi.cli("show nat44 summary"))
4418 sessions = self.statistics.get_counter('/nat44/total-sessions')
4419 self.assertEqual(sessions[0][0], 0)
4420 self.logger.info("sessions after clearing: %s" % sessions[0][0])
4422 def test_dynamic(self):
4423 """ NAT44 dynamic translation test """
4425 self.nat44_add_address(self.nat_addr)
4426 flags = self.config_flags.NAT_IS_INSIDE
4427 self.vapi.nat44_interface_add_del_feature(
4428 sw_if_index=self.pg0.sw_if_index,
4429 flags=flags, is_add=1)
4430 self.vapi.nat44_interface_add_del_feature(
4431 sw_if_index=self.pg1.sw_if_index,
4434 nat_config = self.vapi.nat_show_config()
4435 self.assertEqual(1, nat_config.endpoint_dependent)
4438 tcpn = self.statistics.get_counter('/nat44/ed/in2out/slowpath/tcp')[0]
4439 udpn = self.statistics.get_counter('/nat44/ed/in2out/slowpath/udp')[0]
4440 icmpn = self.statistics.get_counter(
4441 '/nat44/ed/in2out/slowpath/icmp')[0]
4442 drops = self.statistics.get_counter(
4443 '/nat44/ed/in2out/slowpath/drops')[0]
4445 pkts = self.create_stream_in(self.pg0, self.pg1)
4446 self.pg0.add_stream(pkts)
4447 self.pg_enable_capture(self.pg_interfaces)
4449 capture = self.pg1.get_capture(len(pkts))
4450 self.verify_capture_out(capture, ignore_port=True)
4452 if_idx = self.pg0.sw_if_index
4453 cnt = self.statistics.get_counter('/nat44/ed/in2out/slowpath/tcp')[0]
4454 self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
4455 cnt = self.statistics.get_counter('/nat44/ed/in2out/slowpath/udp')[0]
4456 self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
4457 cnt = self.statistics.get_counter('/nat44/ed/in2out/slowpath/icmp')[0]
4458 self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
4459 cnt = self.statistics.get_counter('/nat44/ed/in2out/slowpath/drops')[0]
4460 self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
4463 tcpn = self.statistics.get_counter('/nat44/ed/out2in/fastpath/tcp')[0]
4464 udpn = self.statistics.get_counter('/nat44/ed/out2in/fastpath/udp')[0]
4465 icmpn = self.statistics.get_counter(
4466 '/nat44/ed/out2in/slowpath/icmp')[0]
4467 drops = self.statistics.get_counter(
4468 '/nat44/ed/out2in/fastpath/drops')[0]
4470 pkts = self.create_stream_out(self.pg1)
4471 self.pg1.add_stream(pkts)
4472 self.pg_enable_capture(self.pg_interfaces)
4474 capture = self.pg0.get_capture(len(pkts))
4475 self.verify_capture_in(capture, self.pg0)
4477 if_idx = self.pg1.sw_if_index
4478 cnt = self.statistics.get_counter('/nat44/ed/out2in/fastpath/tcp')[0]
4479 self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
4480 cnt = self.statistics.get_counter('/nat44/ed/out2in/fastpath/udp')[0]
4481 self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
4482 cnt = self.statistics.get_counter('/nat44/ed/out2in/slowpath/icmp')[0]
4483 self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
4484 cnt = self.statistics.get_counter('/nat44/ed/out2in/fastpath/drops')[0]
4485 self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
4487 sessions = self.statistics.get_counter('/nat44/total-sessions')
4488 self.assertEqual(sessions[0][0], 3)
4490 def test_dynamic_out_of_ports(self):
4491 """ NAT44 dynamic translation test: out of ports """
4493 flags = self.config_flags.NAT_IS_INSIDE
4494 self.vapi.nat44_interface_add_del_feature(
4495 sw_if_index=self.pg0.sw_if_index,
4496 flags=flags, is_add=1)
4497 self.vapi.nat44_interface_add_del_feature(
4498 sw_if_index=self.pg1.sw_if_index,
4501 nat_config = self.vapi.nat_show_config()
4502 self.assertEqual(1, nat_config.endpoint_dependent)
4504 # in2out and no NAT addresses added
4505 err_old = self.statistics.get_err_counter(
4506 '/err/nat44-ed-in2out-slowpath/out of ports')
4508 pkts = self.create_stream_in(self.pg0, self.pg1)
4509 self.pg0.add_stream(pkts)
4510 self.pg_enable_capture(self.pg_interfaces)
4512 self.pg1.get_capture(0, timeout=1)
4514 err_new = self.statistics.get_err_counter(
4515 '/err/nat44-ed-in2out-slowpath/out of ports')
4517 self.assertEqual(err_new - err_old, len(pkts))
4519 # in2out after NAT addresses added
4520 self.nat44_add_address(self.nat_addr)
4522 err_old = self.statistics.get_err_counter(
4523 '/err/nat44-ed-in2out-slowpath/out of ports')
4525 pkts = self.create_stream_in(self.pg0, self.pg1)
4526 self.pg0.add_stream(pkts)
4527 self.pg_enable_capture(self.pg_interfaces)
4529 capture = self.pg1.get_capture(len(pkts))
4530 self.verify_capture_out(capture, ignore_port=True)
4532 err_new = self.statistics.get_err_counter(
4533 '/err/nat44-ed-in2out-slowpath/out of ports')
4535 self.assertEqual(err_new, err_old)
4537 def test_dynamic_output_feature_vrf(self):
4538 """ NAT44 dynamic translation test: output-feature, VRF"""
4540 # other then default (0)
4543 self.nat44_add_address(self.nat_addr)
4544 flags = self.config_flags.NAT_IS_INSIDE
4545 self.vapi.nat44_interface_add_del_output_feature(
4546 sw_if_index=self.pg7.sw_if_index,
4547 flags=flags, is_add=1)
4548 self.vapi.nat44_interface_add_del_output_feature(
4549 sw_if_index=self.pg8.sw_if_index,
4553 self.vapi.ip_table_add_del(is_add=1,
4554 table={'table_id': new_vrf_id})
4556 self.pg7.unconfig_ip4()
4557 self.pg7.set_table_ip4(new_vrf_id)
4558 self.pg7.config_ip4()
4559 self.pg7.resolve_arp()
4561 self.pg8.unconfig_ip4()
4562 self.pg8.set_table_ip4(new_vrf_id)
4563 self.pg8.config_ip4()
4564 self.pg8.resolve_arp()
4566 nat_config = self.vapi.nat_show_config()
4567 self.assertEqual(1, nat_config.endpoint_dependent)
4570 tcpn = self.statistics.get_counter(
4571 '/nat44/ed/in2out/slowpath/tcp')[0]
4572 udpn = self.statistics.get_counter(
4573 '/nat44/ed/in2out/slowpath/udp')[0]
4574 icmpn = self.statistics.get_counter(
4575 '/nat44/ed/in2out/slowpath/icmp')[0]
4576 drops = self.statistics.get_counter(
4577 '/nat44/ed/in2out/slowpath/drops')[0]
4579 pkts = self.create_stream_in(self.pg7, self.pg8)
4580 self.pg7.add_stream(pkts)
4581 self.pg_enable_capture(self.pg_interfaces)
4583 capture = self.pg8.get_capture(len(pkts))
4584 self.verify_capture_out(capture, ignore_port=True)
4586 if_idx = self.pg7.sw_if_index
4587 cnt = self.statistics.get_counter(
4588 '/nat44/ed/in2out/slowpath/tcp')[0]
4589 self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
4590 cnt = self.statistics.get_counter(
4591 '/nat44/ed/in2out/slowpath/udp')[0]
4592 self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
4593 cnt = self.statistics.get_counter(
4594 '/nat44/ed/in2out/slowpath/icmp')[0]
4595 self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
4596 cnt = self.statistics.get_counter(
4597 '/nat44/ed/in2out/slowpath/drops')[0]
4598 self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
4601 tcpn = self.statistics.get_counter(
4602 '/nat44/ed/out2in/fastpath/tcp')[0]
4603 udpn = self.statistics.get_counter(
4604 '/nat44/ed/out2in/fastpath/udp')[0]
4605 icmpn = self.statistics.get_counter(
4606 '/nat44/ed/out2in/slowpath/icmp')[0]
4607 drops = self.statistics.get_counter(
4608 '/nat44/ed/out2in/fastpath/drops')[0]
4610 pkts = self.create_stream_out(self.pg8)
4611 self.pg8.add_stream(pkts)
4612 self.pg_enable_capture(self.pg_interfaces)
4614 capture = self.pg7.get_capture(len(pkts))
4615 self.verify_capture_in(capture, self.pg7)
4617 if_idx = self.pg8.sw_if_index
4618 cnt = self.statistics.get_counter(
4619 '/nat44/ed/out2in/fastpath/tcp')[0]
4620 self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
4621 cnt = self.statistics.get_counter(
4622 '/nat44/ed/out2in/fastpath/udp')[0]
4623 self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
4624 cnt = self.statistics.get_counter(
4625 '/nat44/ed/out2in/slowpath/icmp')[0]
4626 self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
4627 cnt = self.statistics.get_counter(
4628 '/nat44/ed/out2in/fastpath/drops')[0]
4629 self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
4631 sessions = self.statistics.get_counter('/nat44/total-sessions')
4632 self.assertEqual(sessions[0][0], 3)
4635 self.pg7.unconfig_ip4()
4636 self.pg7.set_table_ip4(1)
4637 self.pg7.config_ip4()
4638 self.pg7.resolve_arp()
4640 self.pg8.unconfig_ip4()
4641 self.pg8.set_table_ip4(1)
4642 self.pg8.config_ip4()
4643 self.pg8.resolve_arp()
4645 self.vapi.ip_table_add_del(is_add=0,
4646 table={'table_id': new_vrf_id})
4648 def test_forwarding(self):
4649 """ NAT44 forwarding test """
4651 flags = self.config_flags.NAT_IS_INSIDE
4652 self.vapi.nat44_interface_add_del_feature(
4653 sw_if_index=self.pg0.sw_if_index,
4654 flags=flags, is_add=1)
4655 self.vapi.nat44_interface_add_del_feature(
4656 sw_if_index=self.pg1.sw_if_index,
4658 self.vapi.nat44_forwarding_enable_disable(enable=1)
4660 real_ip = self.pg0.remote_ip4
4661 alias_ip = self.nat_addr
4662 flags = self.config_flags.NAT_IS_ADDR_ONLY
4663 self.vapi.nat44_add_del_static_mapping(is_add=1,
4664 local_ip_address=real_ip,
4665 external_ip_address=alias_ip,
4666 external_sw_if_index=0xFFFFFFFF,
4670 # in2out - static mapping match
4672 pkts = self.create_stream_out(self.pg1)
4673 self.pg1.add_stream(pkts)
4674 self.pg_enable_capture(self.pg_interfaces)
4676 capture = self.pg0.get_capture(len(pkts))
4677 self.verify_capture_in(capture, self.pg0)
4679 pkts = self.create_stream_in(self.pg0, self.pg1)
4680 self.pg0.add_stream(pkts)
4681 self.pg_enable_capture(self.pg_interfaces)
4683 capture = self.pg1.get_capture(len(pkts))
4684 self.verify_capture_out(capture, same_port=True)
4686 # in2out - no static mapping match
4688 host0 = self.pg0.remote_hosts[0]
4689 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
4691 pkts = self.create_stream_out(self.pg1,
4692 dst_ip=self.pg0.remote_ip4,
4693 use_inside_ports=True)
4694 self.pg1.add_stream(pkts)
4695 self.pg_enable_capture(self.pg_interfaces)
4697 capture = self.pg0.get_capture(len(pkts))
4698 self.verify_capture_in(capture, self.pg0)
4700 pkts = self.create_stream_in(self.pg0, self.pg1)
4701 self.pg0.add_stream(pkts)
4702 self.pg_enable_capture(self.pg_interfaces)
4704 capture = self.pg1.get_capture(len(pkts))
4705 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
4708 self.pg0.remote_hosts[0] = host0
4710 user = self.pg0.remote_hosts[1]
4711 sessions = self.vapi.nat44_user_session_dump(user.ip4, 0)
4712 self.assertEqual(len(sessions), 3)
4713 self.assertTrue(sessions[0].flags &
4714 self.config_flags.NAT_IS_EXT_HOST_VALID)
4715 self.vapi.nat44_del_session(
4716 address=sessions[0].inside_ip_address,
4717 port=sessions[0].inside_port,
4718 protocol=sessions[0].protocol,
4719 flags=(self.config_flags.NAT_IS_INSIDE |
4720 self.config_flags.NAT_IS_EXT_HOST_VALID),
4721 ext_host_address=sessions[0].ext_host_address,
4722 ext_host_port=sessions[0].ext_host_port)
4723 sessions = self.vapi.nat44_user_session_dump(user.ip4, 0)
4724 self.assertEqual(len(sessions), 2)
4727 self.vapi.nat44_forwarding_enable_disable(enable=0)
4728 flags = self.config_flags.NAT_IS_ADDR_ONLY
4729 self.vapi.nat44_add_del_static_mapping(
4731 local_ip_address=real_ip,
4732 external_ip_address=alias_ip,
4733 external_sw_if_index=0xFFFFFFFF,
4736 def test_static_lb(self):
4737 """ NAT44 local service load balancing """
4738 external_addr_n = self.nat_addr
4741 server1 = self.pg0.remote_hosts[0]
4742 server2 = self.pg0.remote_hosts[1]
4744 locals = [{'addr': server1.ip4,
4748 {'addr': server2.ip4,
4753 self.nat44_add_address(self.nat_addr)
4754 self.vapi.nat44_add_del_lb_static_mapping(
4756 external_addr=external_addr_n,
4757 external_port=external_port,
4758 protocol=IP_PROTOS.tcp,
4759 local_num=len(locals),
4761 flags = self.config_flags.NAT_IS_INSIDE
4762 self.vapi.nat44_interface_add_del_feature(
4763 sw_if_index=self.pg0.sw_if_index,
4764 flags=flags, is_add=1)
4765 self.vapi.nat44_interface_add_del_feature(
4766 sw_if_index=self.pg1.sw_if_index,
4769 # from client to service
4770 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4771 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4772 TCP(sport=12345, dport=external_port))
4773 self.pg1.add_stream(p)
4774 self.pg_enable_capture(self.pg_interfaces)
4776 capture = self.pg0.get_capture(1)
4782 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
4783 if ip.dst == server1.ip4:
4787 self.assertEqual(tcp.dport, local_port)
4788 self.assert_packet_checksums_valid(p)
4790 self.logger.error(ppp("Unexpected or invalid packet:", p))
4793 # from service back to client
4794 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
4795 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
4796 TCP(sport=local_port, dport=12345))
4797 self.pg0.add_stream(p)
4798 self.pg_enable_capture(self.pg_interfaces)
4800 capture = self.pg1.get_capture(1)
4805 self.assertEqual(ip.src, self.nat_addr)
4806 self.assertEqual(tcp.sport, external_port)
4807 self.assert_packet_checksums_valid(p)
4809 self.logger.error(ppp("Unexpected or invalid packet:", p))
4812 sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
4813 self.assertEqual(len(sessions), 1)
4814 self.assertTrue(sessions[0].flags &
4815 self.config_flags.NAT_IS_EXT_HOST_VALID)
4816 self.vapi.nat44_del_session(
4817 address=sessions[0].inside_ip_address,
4818 port=sessions[0].inside_port,
4819 protocol=sessions[0].protocol,
4820 flags=(self.config_flags.NAT_IS_INSIDE |
4821 self.config_flags.NAT_IS_EXT_HOST_VALID),
4822 ext_host_address=sessions[0].ext_host_address,
4823 ext_host_port=sessions[0].ext_host_port)
4824 sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
4825 self.assertEqual(len(sessions), 0)
4827 def test_static_lb_multi_clients(self):
4828 """ NAT44 local service load balancing - multiple clients"""
4830 external_addr = self.nat_addr
4833 server1 = self.pg0.remote_hosts[0]
4834 server2 = self.pg0.remote_hosts[1]
4835 server3 = self.pg0.remote_hosts[2]
4837 locals = [{'addr': server1.ip4,
4841 {'addr': server2.ip4,
4846 flags = self.config_flags.NAT_IS_INSIDE
4847 self.vapi.nat44_interface_add_del_feature(
4848 sw_if_index=self.pg0.sw_if_index,
4849 flags=flags, is_add=1)
4850 self.vapi.nat44_interface_add_del_feature(
4851 sw_if_index=self.pg1.sw_if_index,
4854 self.nat44_add_address(self.nat_addr)
4855 self.vapi.nat44_add_del_lb_static_mapping(is_add=1,
4856 external_addr=external_addr,
4857 external_port=external_port,
4858 protocol=IP_PROTOS.tcp,
4859 local_num=len(locals),
4864 clients = ip4_range(self.pg1.remote_ip4, 10, 50)
4866 for client in clients:
4867 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4868 IP(src=client, dst=self.nat_addr) /
4869 TCP(sport=12345, dport=external_port))
4871 self.pg1.add_stream(pkts)
4872 self.pg_enable_capture(self.pg_interfaces)
4874 capture = self.pg0.get_capture(len(pkts))
4876 if p[IP].dst == server1.ip4:
4880 self.assertGreater(server1_n, server2_n)
4883 'addr': server3.ip4,
4890 self.vapi.nat44_lb_static_mapping_add_del_local(
4892 external_addr=external_addr,
4893 external_port=external_port,
4895 protocol=IP_PROTOS.tcp)
4899 clients = ip4_range(self.pg1.remote_ip4, 60, 110)
4901 for client in clients:
4902 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4903 IP(src=client, dst=self.nat_addr) /
4904 TCP(sport=12346, dport=external_port))
4906 self.assertGreater(len(pkts), 0)
4907 self.pg1.add_stream(pkts)
4908 self.pg_enable_capture(self.pg_interfaces)
4910 capture = self.pg0.get_capture(len(pkts))
4912 if p[IP].dst == server1.ip4:
4914 elif p[IP].dst == server2.ip4:
4918 self.assertGreater(server1_n, 0)
4919 self.assertGreater(server2_n, 0)
4920 self.assertGreater(server3_n, 0)
4923 'addr': server2.ip4,
4929 # remove one back-end
4930 self.vapi.nat44_lb_static_mapping_add_del_local(
4932 external_addr=external_addr,
4933 external_port=external_port,
4935 protocol=IP_PROTOS.tcp)
4939 self.pg1.add_stream(pkts)
4940 self.pg_enable_capture(self.pg_interfaces)
4942 capture = self.pg0.get_capture(len(pkts))
4944 if p[IP].dst == server1.ip4:
4946 elif p[IP].dst == server2.ip4:
4950 self.assertGreater(server1_n, 0)
4951 self.assertEqual(server2_n, 0)
4952 self.assertGreater(server3_n, 0)
4954 def test_static_lb_2(self):
4955 """ NAT44 local service load balancing (asymmetrical rule) """
4956 external_addr = self.nat_addr
4959 server1 = self.pg0.remote_hosts[0]
4960 server2 = self.pg0.remote_hosts[1]
4962 locals = [{'addr': server1.ip4,
4966 {'addr': server2.ip4,
4971 self.vapi.nat44_forwarding_enable_disable(enable=1)
4972 flags = self.config_flags.NAT_IS_OUT2IN_ONLY
4973 self.vapi.nat44_add_del_lb_static_mapping(is_add=1, flags=flags,
4974 external_addr=external_addr,
4975 external_port=external_port,
4976 protocol=IP_PROTOS.tcp,
4977 local_num=len(locals),
4979 flags = self.config_flags.NAT_IS_INSIDE
4980 self.vapi.nat44_interface_add_del_feature(
4981 sw_if_index=self.pg0.sw_if_index,
4982 flags=flags, is_add=1)
4983 self.vapi.nat44_interface_add_del_feature(
4984 sw_if_index=self.pg1.sw_if_index,
4987 # from client to service
4988 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4989 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4990 TCP(sport=12345, dport=external_port))
4991 self.pg1.add_stream(p)
4992 self.pg_enable_capture(self.pg_interfaces)
4994 capture = self.pg0.get_capture(1)
5000 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
5001 if ip.dst == server1.ip4:
5005 self.assertEqual(tcp.dport, local_port)
5006 self.assert_packet_checksums_valid(p)
5008 self.logger.error(ppp("Unexpected or invalid packet:", p))
5011 # from service back to client
5012 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
5013 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
5014 TCP(sport=local_port, dport=12345))
5015 self.pg0.add_stream(p)
5016 self.pg_enable_capture(self.pg_interfaces)
5018 capture = self.pg1.get_capture(1)
5023 self.assertEqual(ip.src, self.nat_addr)
5024 self.assertEqual(tcp.sport, external_port)
5025 self.assert_packet_checksums_valid(p)
5027 self.logger.error(ppp("Unexpected or invalid packet:", p))
5030 # from client to server (no translation)
5031 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5032 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
5033 TCP(sport=12346, dport=local_port))
5034 self.pg1.add_stream(p)
5035 self.pg_enable_capture(self.pg_interfaces)
5037 capture = self.pg0.get_capture(1)
5043 self.assertEqual(ip.dst, server1.ip4)
5044 self.assertEqual(tcp.dport, local_port)
5045 self.assert_packet_checksums_valid(p)
5047 self.logger.error(ppp("Unexpected or invalid packet:", p))
5050 # from service back to client (no translation)
5051 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
5052 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
5053 TCP(sport=local_port, dport=12346))
5054 self.pg0.add_stream(p)
5055 self.pg_enable_capture(self.pg_interfaces)
5057 capture = self.pg1.get_capture(1)
5062 self.assertEqual(ip.src, server1.ip4)
5063 self.assertEqual(tcp.sport, local_port)
5064 self.assert_packet_checksums_valid(p)
5066 self.logger.error(ppp("Unexpected or invalid packet:", p))
5069 def test_lb_affinity(self):
5070 """ NAT44 local service load balancing affinity """
5071 external_addr = self.nat_addr
5074 server1 = self.pg0.remote_hosts[0]
5075 server2 = self.pg0.remote_hosts[1]
5077 locals = [{'addr': server1.ip4,
5081 {'addr': server2.ip4,
5086 self.nat44_add_address(self.nat_addr)
5087 self.vapi.nat44_add_del_lb_static_mapping(is_add=1,
5088 external_addr=external_addr,
5089 external_port=external_port,
5090 protocol=IP_PROTOS.tcp,
5092 local_num=len(locals),
5094 flags = self.config_flags.NAT_IS_INSIDE
5095 self.vapi.nat44_interface_add_del_feature(
5096 sw_if_index=self.pg0.sw_if_index,
5097 flags=flags, is_add=1)
5098 self.vapi.nat44_interface_add_del_feature(
5099 sw_if_index=self.pg1.sw_if_index,
5102 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5103 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5104 TCP(sport=1025, dport=external_port))
5105 self.pg1.add_stream(p)
5106 self.pg_enable_capture(self.pg_interfaces)
5108 capture = self.pg0.get_capture(1)
5109 backend = capture[0][IP].dst
5111 sessions = self.vapi.nat44_user_session_dump(backend, 0)
5112 self.assertEqual(len(sessions), 1)
5113 self.assertTrue(sessions[0].flags &
5114 self.config_flags.NAT_IS_EXT_HOST_VALID)
5115 self.vapi.nat44_del_session(
5116 address=sessions[0].inside_ip_address,
5117 port=sessions[0].inside_port,
5118 protocol=sessions[0].protocol,
5119 flags=(self.config_flags.NAT_IS_INSIDE |
5120 self.config_flags.NAT_IS_EXT_HOST_VALID),
5121 ext_host_address=sessions[0].ext_host_address,
5122 ext_host_port=sessions[0].ext_host_port)
5125 for port in range(1030, 1100):
5126 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5127 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5128 TCP(sport=port, dport=external_port))
5130 self.pg1.add_stream(pkts)
5131 self.pg_enable_capture(self.pg_interfaces)
5133 capture = self.pg0.get_capture(len(pkts))
5135 self.assertEqual(p[IP].dst, backend)
5137 def test_unknown_proto(self):
5138 """ NAT44 translate packet with unknown protocol """
5139 self.nat44_add_address(self.nat_addr)
5140 flags = self.config_flags.NAT_IS_INSIDE
5141 self.vapi.nat44_interface_add_del_feature(
5142 sw_if_index=self.pg0.sw_if_index,
5143 flags=flags, is_add=1)
5144 self.vapi.nat44_interface_add_del_feature(
5145 sw_if_index=self.pg1.sw_if_index,
5149 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5150 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5151 TCP(sport=self.tcp_port_in, dport=20))
5152 self.pg0.add_stream(p)
5153 self.pg_enable_capture(self.pg_interfaces)
5155 p = self.pg1.get_capture(1)
5157 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5158 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5160 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
5161 TCP(sport=1234, dport=1234))
5162 self.pg0.add_stream(p)
5163 self.pg_enable_capture(self.pg_interfaces)
5165 p = self.pg1.get_capture(1)
5168 self.assertEqual(packet[IP].src, self.nat_addr)
5169 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5170 self.assertEqual(packet.haslayer(GRE), 1)
5171 self.assert_packet_checksums_valid(packet)
5173 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5177 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5178 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5180 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
5181 TCP(sport=1234, dport=1234))
5182 self.pg1.add_stream(p)
5183 self.pg_enable_capture(self.pg_interfaces)
5185 p = self.pg0.get_capture(1)
5188 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
5189 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
5190 self.assertEqual(packet.haslayer(GRE), 1)
5191 self.assert_packet_checksums_valid(packet)
5193 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5196 def test_hairpinning_unknown_proto(self):
5197 """ NAT44 translate packet with unknown protocol - hairpinning """
5198 host = self.pg0.remote_hosts[0]
5199 server = self.pg0.remote_hosts[1]
5201 server_out_port = 8765
5202 server_nat_ip = "10.0.0.11"
5204 self.nat44_add_address(self.nat_addr)
5205 flags = self.config_flags.NAT_IS_INSIDE
5206 self.vapi.nat44_interface_add_del_feature(
5207 sw_if_index=self.pg0.sw_if_index,
5208 flags=flags, is_add=1)
5209 self.vapi.nat44_interface_add_del_feature(
5210 sw_if_index=self.pg1.sw_if_index,
5213 # add static mapping for server
5214 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
5217 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
5218 IP(src=host.ip4, dst=server_nat_ip) /
5219 TCP(sport=host_in_port, dport=server_out_port))
5220 self.pg0.add_stream(p)
5221 self.pg_enable_capture(self.pg_interfaces)
5223 self.pg0.get_capture(1)
5225 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
5226 IP(src=host.ip4, dst=server_nat_ip) /
5228 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
5229 TCP(sport=1234, dport=1234))
5230 self.pg0.add_stream(p)
5231 self.pg_enable_capture(self.pg_interfaces)
5233 p = self.pg0.get_capture(1)
5236 self.assertEqual(packet[IP].src, self.nat_addr)
5237 self.assertEqual(packet[IP].dst, server.ip4)
5238 self.assertEqual(packet.haslayer(GRE), 1)
5239 self.assert_packet_checksums_valid(packet)
5241 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5245 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
5246 IP(src=server.ip4, dst=self.nat_addr) /
5248 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
5249 TCP(sport=1234, dport=1234))
5250 self.pg0.add_stream(p)
5251 self.pg_enable_capture(self.pg_interfaces)
5253 p = self.pg0.get_capture(1)
5256 self.assertEqual(packet[IP].src, server_nat_ip)
5257 self.assertEqual(packet[IP].dst, host.ip4)
5258 self.assertEqual(packet.haslayer(GRE), 1)
5259 self.assert_packet_checksums_valid(packet)
5261 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5264 def test_output_feature_and_service(self):
5265 """ NAT44 interface output feature and services """
5266 external_addr = '1.2.3.4'
5270 self.vapi.nat44_forwarding_enable_disable(enable=1)
5271 self.nat44_add_address(self.nat_addr)
5272 flags = self.config_flags.NAT_IS_ADDR_ONLY
5273 self.vapi.nat44_add_del_identity_mapping(
5274 ip_address=self.pg1.remote_ip4, sw_if_index=0xFFFFFFFF,
5275 flags=flags, is_add=1)
5276 flags = self.config_flags.NAT_IS_OUT2IN_ONLY
5277 self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
5278 local_port, external_port,
5279 proto=IP_PROTOS.tcp, flags=flags)
5280 flags = self.config_flags.NAT_IS_INSIDE
5281 self.vapi.nat44_interface_add_del_feature(
5282 sw_if_index=self.pg0.sw_if_index,
5284 self.vapi.nat44_interface_add_del_feature(
5285 sw_if_index=self.pg0.sw_if_index,
5286 flags=flags, is_add=1)
5287 self.vapi.nat44_interface_add_del_output_feature(
5289 sw_if_index=self.pg1.sw_if_index)
5291 # from client to service
5292 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5293 IP(src=self.pg1.remote_ip4, dst=external_addr) /
5294 TCP(sport=12345, dport=external_port))
5295 self.pg1.add_stream(p)
5296 self.pg_enable_capture(self.pg_interfaces)
5298 capture = self.pg0.get_capture(1)
5303 self.assertEqual(ip.dst, self.pg0.remote_ip4)
5304 self.assertEqual(tcp.dport, local_port)
5305 self.assert_packet_checksums_valid(p)
5307 self.logger.error(ppp("Unexpected or invalid packet:", p))
5310 # from service back to client
5311 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5312 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5313 TCP(sport=local_port, dport=12345))
5314 self.pg0.add_stream(p)
5315 self.pg_enable_capture(self.pg_interfaces)
5317 capture = self.pg1.get_capture(1)
5322 self.assertEqual(ip.src, external_addr)
5323 self.assertEqual(tcp.sport, external_port)
5324 self.assert_packet_checksums_valid(p)
5326 self.logger.error(ppp("Unexpected or invalid packet:", p))
5329 # from local network host to external network
5330 pkts = self.create_stream_in(self.pg0, self.pg1)
5331 self.pg0.add_stream(pkts)
5332 self.pg_enable_capture(self.pg_interfaces)
5334 capture = self.pg1.get_capture(len(pkts))
5335 self.verify_capture_out(capture, ignore_port=True)
5336 pkts = self.create_stream_in(self.pg0, self.pg1)
5337 self.pg0.add_stream(pkts)
5338 self.pg_enable_capture(self.pg_interfaces)
5340 capture = self.pg1.get_capture(len(pkts))
5341 self.verify_capture_out(capture, ignore_port=True)
5343 # from external network back to local network host
5344 pkts = self.create_stream_out(self.pg1)
5345 self.pg1.add_stream(pkts)
5346 self.pg_enable_capture(self.pg_interfaces)
5348 capture = self.pg0.get_capture(len(pkts))
5349 self.verify_capture_in(capture, self.pg0)
5351 def test_output_feature_and_service2(self):
5352 """ NAT44 interface output feature and service host direct access """
5353 self.vapi.nat44_forwarding_enable_disable(enable=1)
5354 self.nat44_add_address(self.nat_addr)
5355 self.vapi.nat44_interface_add_del_output_feature(
5357 sw_if_index=self.pg1.sw_if_index)
5359 # session initiated from service host - translate
5360 pkts = self.create_stream_in(self.pg0, self.pg1)
5361 self.pg0.add_stream(pkts)
5362 self.pg_enable_capture(self.pg_interfaces)
5364 capture = self.pg1.get_capture(len(pkts))
5365 self.verify_capture_out(capture, ignore_port=True)
5367 pkts = self.create_stream_out(self.pg1)
5368 self.pg1.add_stream(pkts)
5369 self.pg_enable_capture(self.pg_interfaces)
5371 capture = self.pg0.get_capture(len(pkts))
5372 self.verify_capture_in(capture, self.pg0)
5374 # session initiated from remote host - do not translate
5375 self.tcp_port_in = 60303
5376 self.udp_port_in = 60304
5377 self.icmp_id_in = 60305
5378 pkts = self.create_stream_out(self.pg1,
5379 self.pg0.remote_ip4,
5380 use_inside_ports=True)
5381 self.pg1.add_stream(pkts)
5382 self.pg_enable_capture(self.pg_interfaces)
5384 capture = self.pg0.get_capture(len(pkts))
5385 self.verify_capture_in(capture, self.pg0)
5387 pkts = self.create_stream_in(self.pg0, self.pg1)
5388 self.pg0.add_stream(pkts)
5389 self.pg_enable_capture(self.pg_interfaces)
5391 capture = self.pg1.get_capture(len(pkts))
5392 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
5395 def test_output_feature_and_service3(self):
5396 """ NAT44 interface output feature and DST NAT """
5397 external_addr = '1.2.3.4'
5401 self.vapi.nat44_forwarding_enable_disable(enable=1)
5402 self.nat44_add_address(self.nat_addr)
5403 flags = self.config_flags.NAT_IS_OUT2IN_ONLY
5404 self.nat44_add_static_mapping(self.pg1.remote_ip4, external_addr,
5405 local_port, external_port,
5406 proto=IP_PROTOS.tcp, flags=flags)
5407 flags = self.config_flags.NAT_IS_INSIDE
5408 self.vapi.nat44_interface_add_del_feature(
5409 sw_if_index=self.pg0.sw_if_index,
5411 self.vapi.nat44_interface_add_del_feature(
5412 sw_if_index=self.pg0.sw_if_index,
5413 flags=flags, is_add=1)
5414 self.vapi.nat44_interface_add_del_output_feature(
5416 sw_if_index=self.pg1.sw_if_index)
5418 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5419 IP(src=self.pg0.remote_ip4, dst=external_addr) /
5420 TCP(sport=12345, dport=external_port))
5421 self.pg0.add_stream(p)
5422 self.pg_enable_capture(self.pg_interfaces)
5424 capture = self.pg1.get_capture(1)
5429 self.assertEqual(ip.src, self.pg0.remote_ip4)
5430 self.assertEqual(tcp.sport, 12345)
5431 self.assertEqual(ip.dst, self.pg1.remote_ip4)
5432 self.assertEqual(tcp.dport, local_port)
5433 self.assert_packet_checksums_valid(p)
5435 self.logger.error(ppp("Unexpected or invalid packet:", p))
5438 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5439 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
5440 TCP(sport=local_port, dport=12345))
5441 self.pg1.add_stream(p)
5442 self.pg_enable_capture(self.pg_interfaces)
5444 capture = self.pg0.get_capture(1)
5449 self.assertEqual(ip.src, external_addr)
5450 self.assertEqual(tcp.sport, external_port)
5451 self.assertEqual(ip.dst, self.pg0.remote_ip4)
5452 self.assertEqual(tcp.dport, 12345)
5453 self.assert_packet_checksums_valid(p)
5455 self.logger.error(ppp("Unexpected or invalid packet:", p))
5458 def test_next_src_nat(self):
5459 """ On way back forward packet to nat44-in2out node. """
5460 twice_nat_addr = '10.0.1.3'
5463 post_twice_nat_port = 0
5465 self.vapi.nat44_forwarding_enable_disable(enable=1)
5466 self.nat44_add_address(twice_nat_addr, twice_nat=1)
5467 flags = (self.config_flags.NAT_IS_OUT2IN_ONLY |
5468 self.config_flags.NAT_IS_SELF_TWICE_NAT)
5469 self.nat44_add_static_mapping(self.pg6.remote_ip4, self.pg1.remote_ip4,
5470 local_port, external_port,
5471 proto=IP_PROTOS.tcp, vrf_id=1,
5473 self.vapi.nat44_interface_add_del_feature(
5474 sw_if_index=self.pg6.sw_if_index,
5477 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
5478 IP(src=self.pg6.remote_ip4, dst=self.pg1.remote_ip4) /
5479 TCP(sport=12345, dport=external_port))
5480 self.pg6.add_stream(p)
5481 self.pg_enable_capture(self.pg_interfaces)
5483 capture = self.pg6.get_capture(1)
5488 self.assertEqual(ip.src, twice_nat_addr)
5489 self.assertNotEqual(tcp.sport, 12345)
5490 post_twice_nat_port = tcp.sport
5491 self.assertEqual(ip.dst, self.pg6.remote_ip4)
5492 self.assertEqual(tcp.dport, local_port)
5493 self.assert_packet_checksums_valid(p)
5495 self.logger.error(ppp("Unexpected or invalid packet:", p))
5498 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
5499 IP(src=self.pg6.remote_ip4, dst=twice_nat_addr) /
5500 TCP(sport=local_port, dport=post_twice_nat_port))
5501 self.pg6.add_stream(p)
5502 self.pg_enable_capture(self.pg_interfaces)
5504 capture = self.pg6.get_capture(1)
5509 self.assertEqual(ip.src, self.pg1.remote_ip4)
5510 self.assertEqual(tcp.sport, external_port)
5511 self.assertEqual(ip.dst, self.pg6.remote_ip4)
5512 self.assertEqual(tcp.dport, 12345)
5513 self.assert_packet_checksums_valid(p)
5515 self.logger.error(ppp("Unexpected or invalid packet:", p))
5518 def twice_nat_common(self, self_twice_nat=False, same_pg=False, lb=False,
5520 twice_nat_addr = '10.0.1.3'
5528 port_in1 = port_in + 1
5529 port_in2 = port_in + 2
5534 server1 = self.pg0.remote_hosts[0]
5535 server2 = self.pg0.remote_hosts[1]
5547 eh_translate = ((not self_twice_nat) or (not lb and same_pg) or
5550 self.nat44_add_address(self.nat_addr)
5551 self.nat44_add_address(twice_nat_addr, twice_nat=1)
5555 flags |= self.config_flags.NAT_IS_SELF_TWICE_NAT
5557 flags |= self.config_flags.NAT_IS_TWICE_NAT
5560 self.nat44_add_static_mapping(pg0.remote_ip4, self.nat_addr,
5562 proto=IP_PROTOS.tcp,
5565 locals = [{'addr': server1.ip4,
5569 {'addr': server2.ip4,
5573 out_addr = self.nat_addr
5575 self.vapi.nat44_add_del_lb_static_mapping(is_add=1, flags=flags,
5576 external_addr=out_addr,
5577 external_port=port_out,
5578 protocol=IP_PROTOS.tcp,
5579 local_num=len(locals),
5581 flags = self.config_flags.NAT_IS_INSIDE
5582 self.vapi.nat44_interface_add_del_feature(
5583 sw_if_index=pg0.sw_if_index,
5584 flags=flags, is_add=1)
5585 self.vapi.nat44_interface_add_del_feature(
5586 sw_if_index=pg1.sw_if_index,
5593 assert client_id is not None
5595 client = self.pg0.remote_hosts[0]
5596 elif client_id == 2:
5597 client = self.pg0.remote_hosts[1]
5599 client = pg1.remote_hosts[0]
5600 p = (Ether(src=pg1.remote_mac, dst=pg1.local_mac) /
5601 IP(src=client.ip4, dst=self.nat_addr) /
5602 TCP(sport=eh_port_out, dport=port_out))
5604 self.pg_enable_capture(self.pg_interfaces)
5606 capture = pg0.get_capture(1)
5612 if ip.dst == server1.ip4:
5618 self.assertEqual(ip.dst, server.ip4)
5620 self.assertIn(tcp.dport, [port_in1, port_in2])
5622 self.assertEqual(tcp.dport, port_in)
5624 self.assertEqual(ip.src, twice_nat_addr)
5625 self.assertNotEqual(tcp.sport, eh_port_out)
5627 self.assertEqual(ip.src, client.ip4)
5628 self.assertEqual(tcp.sport, eh_port_out)
5630 eh_port_in = tcp.sport
5631 saved_port_in = tcp.dport
5632 self.assert_packet_checksums_valid(p)
5634 self.logger.error(ppp("Unexpected or invalid packet:", p))
5637 p = (Ether(src=server.mac, dst=pg0.local_mac) /
5638 IP(src=server.ip4, dst=eh_addr_in) /
5639 TCP(sport=saved_port_in, dport=eh_port_in))
5641 self.pg_enable_capture(self.pg_interfaces)
5643 capture = pg1.get_capture(1)
5648 self.assertEqual(ip.dst, client.ip4)
5649 self.assertEqual(ip.src, self.nat_addr)
5650 self.assertEqual(tcp.dport, eh_port_out)
5651 self.assertEqual(tcp.sport, port_out)
5652 self.assert_packet_checksums_valid(p)
5654 self.logger.error(ppp("Unexpected or invalid packet:", p))
5658 sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
5659 self.assertEqual(len(sessions), 1)
5660 self.assertTrue(sessions[0].flags &
5661 self.config_flags.NAT_IS_EXT_HOST_VALID)
5662 self.assertTrue(sessions[0].flags &
5663 self.config_flags.NAT_IS_TWICE_NAT)
5664 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
5665 self.vapi.nat44_del_session(
5666 address=sessions[0].inside_ip_address,
5667 port=sessions[0].inside_port,
5668 protocol=sessions[0].protocol,
5669 flags=(self.config_flags.NAT_IS_INSIDE |
5670 self.config_flags.NAT_IS_EXT_HOST_VALID),
5671 ext_host_address=sessions[0].ext_host_nat_address,
5672 ext_host_port=sessions[0].ext_host_nat_port)
5673 sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
5674 self.assertEqual(len(sessions), 0)
5676 def test_twice_nat(self):
5678 self.twice_nat_common()
5680 def test_self_twice_nat_positive(self):
5681 """ Self Twice NAT44 (positive test) """
5682 self.twice_nat_common(self_twice_nat=True, same_pg=True)
5684 def test_self_twice_nat_negative(self):
5685 """ Self Twice NAT44 (negative test) """
5686 self.twice_nat_common(self_twice_nat=True)
5688 def test_twice_nat_lb(self):
5689 """ Twice NAT44 local service load balancing """
5690 self.twice_nat_common(lb=True)
5692 def test_self_twice_nat_lb_positive(self):
5693 """ Self Twice NAT44 local service load balancing (positive test) """
5694 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
5697 def test_self_twice_nat_lb_negative(self):
5698 """ Self Twice NAT44 local service load balancing (negative test) """
5699 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
5702 def test_twice_nat_interface_addr(self):
5703 """ Acquire twice NAT44 addresses from interface """
5704 flags = self.config_flags.NAT_IS_TWICE_NAT
5705 self.vapi.nat44_add_del_interface_addr(
5707 sw_if_index=self.pg3.sw_if_index,
5710 # no address in NAT pool
5711 adresses = self.vapi.nat44_address_dump()
5712 self.assertEqual(0, len(adresses))
5714 # configure interface address and check NAT address pool
5715 self.pg3.config_ip4()
5716 adresses = self.vapi.nat44_address_dump()
5717 self.assertEqual(1, len(adresses))
5718 self.assertEqual(str(adresses[0].ip_address),
5720 self.assertEqual(adresses[0].flags, flags)
5722 # remove interface address and check NAT address pool
5723 self.pg3.unconfig_ip4()
5724 adresses = self.vapi.nat44_address_dump()
5725 self.assertEqual(0, len(adresses))
5727 def test_tcp_close(self):
5728 """ Close TCP session from inside network - output feature """
5729 old_timeouts = self.vapi.nat_get_timeouts()
5731 self.vapi.nat_set_timeouts(
5732 udp=old_timeouts.udp,
5733 tcp_established=old_timeouts.tcp_established,
5734 icmp=old_timeouts.icmp,
5735 tcp_transitory=new_transitory)
5737 self.vapi.nat44_forwarding_enable_disable(enable=1)
5738 self.nat44_add_address(self.pg1.local_ip4)
5739 twice_nat_addr = '10.0.1.3'
5740 service_ip = '192.168.16.150'
5741 self.nat44_add_address(twice_nat_addr, twice_nat=1)
5742 flags = self.config_flags.NAT_IS_INSIDE
5743 self.vapi.nat44_interface_add_del_feature(
5744 sw_if_index=self.pg0.sw_if_index,
5746 self.vapi.nat44_interface_add_del_feature(
5747 sw_if_index=self.pg0.sw_if_index,
5748 flags=flags, is_add=1)
5749 self.vapi.nat44_interface_add_del_output_feature(
5751 sw_if_index=self.pg1.sw_if_index)
5752 flags = (self.config_flags.NAT_IS_OUT2IN_ONLY |
5753 self.config_flags.NAT_IS_TWICE_NAT)
5754 self.nat44_add_static_mapping(self.pg0.remote_ip4,
5758 proto=IP_PROTOS.tcp,
5760 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
5761 start_sessnum = len(sessions)
5763 # SYN packet out->in
5764 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5765 IP(src=self.pg1.remote_ip4, dst=service_ip) /
5766 TCP(sport=33898, dport=80, flags="S"))
5767 self.pg1.add_stream(p)
5768 self.pg_enable_capture(self.pg_interfaces)
5770 capture = self.pg0.get_capture(1)
5772 tcp_port = p[TCP].sport
5774 # SYN + ACK packet in->out
5775 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5776 IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
5777 TCP(sport=80, dport=tcp_port, flags="SA"))
5778 self.pg0.add_stream(p)
5779 self.pg_enable_capture(self.pg_interfaces)
5781 self.pg1.get_capture(1)
5783 # ACK packet out->in
5784 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5785 IP(src=self.pg1.remote_ip4, dst=service_ip) /
5786 TCP(sport=33898, dport=80, flags="A"))
5787 self.pg1.add_stream(p)
5788 self.pg_enable_capture(self.pg_interfaces)
5790 self.pg0.get_capture(1)
5792 # FIN packet in -> out
5793 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5794 IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
5795 TCP(sport=80, dport=tcp_port, flags="FA", seq=100, ack=300))
5796 self.pg0.add_stream(p)
5797 self.pg_enable_capture(self.pg_interfaces)
5799 self.pg1.get_capture(1)
5801 # FIN+ACK packet out -> in
5802 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5803 IP(src=self.pg1.remote_ip4, dst=service_ip) /
5804 TCP(sport=33898, dport=80, flags="FA", seq=300, ack=101))
5805 self.pg1.add_stream(p)
5806 self.pg_enable_capture(self.pg_interfaces)
5808 self.pg0.get_capture(1)
5810 # ACK packet in -> out
5811 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5812 IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
5813 TCP(sport=80, dport=tcp_port, flags="A", seq=101, ack=301))
5814 self.pg0.add_stream(p)
5815 self.pg_enable_capture(self.pg_interfaces)
5817 self.pg1.get_capture(1)
5819 # session now in transitory timeout
5820 # try SYN packet out->in - should be dropped
5821 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5822 IP(src=self.pg1.remote_ip4, dst=service_ip) /
5823 TCP(sport=33898, dport=80, flags="S"))
5824 self.pg1.add_stream(p)
5825 self.pg_enable_capture(self.pg_interfaces)
5828 self.sleep(new_transitory, "wait for transitory timeout")
5829 self.pg0.assert_nothing_captured(0)
5831 # session should still exist
5832 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
5833 self.assertEqual(len(sessions) - start_sessnum, 1)
5835 # send FIN+ACK packet out -> in - will cause session to be wiped
5836 # but won't create a new session
5837 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5838 IP(src=self.pg1.remote_ip4, dst=service_ip) /
5839 TCP(sport=33898, dport=80, flags="FA", seq=300, ack=101))
5840 self.pg1.add_stream(p)
5841 self.pg_enable_capture(self.pg_interfaces)
5844 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
5845 self.assertEqual(len(sessions) - start_sessnum, 0)
5846 self.pg0.assert_nothing_captured(0)
5848 def test_tcp_session_close_in(self):
5849 """ Close TCP session from inside network """
5850 self.tcp_port_out = 10505
5851 self.nat44_add_address(self.nat_addr)
5852 flags = self.config_flags.NAT_IS_TWICE_NAT
5853 self.nat44_add_static_mapping(self.pg0.remote_ip4,
5857 proto=IP_PROTOS.tcp,
5859 flags = self.config_flags.NAT_IS_INSIDE
5860 self.vapi.nat44_interface_add_del_feature(
5861 sw_if_index=self.pg0.sw_if_index,
5862 flags=flags, is_add=1)
5863 self.vapi.nat44_interface_add_del_feature(
5864 sw_if_index=self.pg1.sw_if_index,
5867 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
5868 start_sessnum = len(sessions)
5870 self.vapi.nat_set_timeouts(udp=300, tcp_established=7440,
5871 tcp_transitory=2, icmp=5)
5873 self.initiate_tcp_session(self.pg0, self.pg1)
5875 # FIN packet in -> out
5876 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5877 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5878 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5879 flags="FA", seq=100, ack=300))
5880 self.pg0.add_stream(p)
5881 self.pg_enable_capture(self.pg_interfaces)
5883 self.pg1.get_capture(1)
5887 # ACK packet out -> in
5888 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5889 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5890 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5891 flags="A", seq=300, ack=101))
5894 # FIN packet out -> in
5895 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5896 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5897 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5898 flags="FA", seq=300, ack=101))
5901 self.pg1.add_stream(pkts)
5902 self.pg_enable_capture(self.pg_interfaces)
5904 self.pg0.get_capture(2)
5906 # ACK packet in -> out
5907 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5908 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5909 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5910 flags="A", seq=101, ack=301))
5911 self.pg0.add_stream(p)
5912 self.pg_enable_capture(self.pg_interfaces)
5914 self.pg1.get_capture(1)
5916 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
5917 self.assertEqual(len(sessions) - start_sessnum, 1)
5919 stats = self.statistics.get_counter(
5920 '/err/nat44-ed-out2in/drops due to TCP in transitory timeout')
5921 out2in_drops = stats[0]
5922 stats = self.statistics.get_counter(
5923 '/err/nat44-ed-in2out/drops due to TCP in transitory timeout')
5924 in2out_drops = stats[0]
5926 # extra FIN packet out -> in - this should be dropped
5927 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5928 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5929 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5930 flags="FA", seq=300, ack=101))
5932 self.pg1.add_stream(p)
5933 self.pg_enable_capture(self.pg_interfaces)
5935 self.pg0.assert_nothing_captured()
5937 # extra ACK packet in -> out - this should be dropped
5938 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5939 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5940 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5941 flags="A", seq=101, ack=301))
5942 self.pg0.add_stream(p)
5943 self.pg_enable_capture(self.pg_interfaces)
5945 self.pg1.assert_nothing_captured()
5947 stats = self.statistics.get_counter(
5948 '/err/nat44-ed-out2in/drops due to TCP in transitory timeout')
5949 self.assertEqual(stats[0] - out2in_drops, 1)
5950 stats = self.statistics.get_counter(
5951 '/err/nat44-ed-in2out/drops due to TCP in transitory timeout')
5952 self.assertEqual(stats[0] - in2out_drops, 1)
5955 # extra ACK packet in -> out - this will cause session to be wiped
5956 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5957 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5958 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5959 flags="A", seq=101, ack=301))
5960 self.pg0.add_stream(p)
5961 self.pg_enable_capture(self.pg_interfaces)
5963 self.pg1.assert_nothing_captured()
5964 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
5965 self.assertEqual(len(sessions) - start_sessnum, 0)
5967 def test_tcp_session_close_out(self):
5968 """ Close TCP session from outside network """
5969 self.tcp_port_out = 10505
5970 self.nat44_add_address(self.nat_addr)
5971 flags = self.config_flags.NAT_IS_TWICE_NAT
5972 self.nat44_add_static_mapping(self.pg0.remote_ip4,
5976 proto=IP_PROTOS.tcp,
5978 flags = self.config_flags.NAT_IS_INSIDE
5979 self.vapi.nat44_interface_add_del_feature(
5980 sw_if_index=self.pg0.sw_if_index,
5981 flags=flags, is_add=1)
5982 self.vapi.nat44_interface_add_del_feature(
5983 sw_if_index=self.pg1.sw_if_index,
5986 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
5987 start_sessnum = len(sessions)
5989 self.vapi.nat_set_timeouts(udp=300, tcp_established=7440,
5990 tcp_transitory=2, icmp=5)
5992 self.initiate_tcp_session(self.pg0, self.pg1)
5994 # FIN packet out -> in
5995 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5996 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5997 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5998 flags="FA", seq=100, ack=300))
5999 self.pg1.add_stream(p)
6000 self.pg_enable_capture(self.pg_interfaces)
6002 self.pg0.get_capture(1)
6004 # FIN+ACK packet in -> out
6005 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6006 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
6007 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
6008 flags="FA", seq=300, ack=101))
6010 self.pg0.add_stream(p)
6011 self.pg_enable_capture(self.pg_interfaces)
6013 self.pg1.get_capture(1)
6015 # ACK packet out -> in
6016 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
6017 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
6018 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
6019 flags="A", seq=101, ack=301))
6020 self.pg1.add_stream(p)
6021 self.pg_enable_capture(self.pg_interfaces)
6023 self.pg0.get_capture(1)
6025 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
6026 self.assertEqual(len(sessions) - start_sessnum, 1)
6028 stats = self.statistics.get_counter(
6029 '/err/nat44-ed-out2in/drops due to TCP in transitory timeout')
6030 out2in_drops = stats[0]
6031 stats = self.statistics.get_counter(
6032 '/err/nat44-ed-in2out/drops due to TCP in transitory timeout')
6033 in2out_drops = stats[0]
6035 # extra FIN packet out -> in - this should be dropped
6036 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
6037 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
6038 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
6039 flags="FA", seq=300, ack=101))
6041 self.pg1.add_stream(p)
6042 self.pg_enable_capture(self.pg_interfaces)
6044 self.pg0.assert_nothing_captured()
6046 # extra ACK packet in -> out - this should be dropped
6047 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6048 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
6049 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
6050 flags="A", seq=101, ack=301))
6051 self.pg0.add_stream(p)
6052 self.pg_enable_capture(self.pg_interfaces)
6054 self.pg1.assert_nothing_captured()
6056 stats = self.statistics.get_counter(
6057 '/err/nat44-ed-out2in/drops due to TCP in transitory timeout')
6058 self.assertEqual(stats[0] - out2in_drops, 1)
6059 stats = self.statistics.get_counter(
6060 '/err/nat44-ed-in2out/drops due to TCP in transitory timeout')
6061 self.assertEqual(stats[0] - in2out_drops, 1)
6064 # extra ACK packet in -> out - this will cause session to be wiped
6065 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6066 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
6067 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
6068 flags="A", seq=101, ack=301))
6069 self.pg0.add_stream(p)
6070 self.pg_enable_capture(self.pg_interfaces)
6072 self.pg1.assert_nothing_captured()
6073 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
6074 self.assertEqual(len(sessions) - start_sessnum, 0)
6076 def test_tcp_session_close_simultaneous(self):
6077 """ Close TCP session from inside network """
6078 self.tcp_port_out = 10505
6079 self.nat44_add_address(self.nat_addr)
6080 flags = self.config_flags.NAT_IS_TWICE_NAT
6081 self.nat44_add_static_mapping(self.pg0.remote_ip4,
6085 proto=IP_PROTOS.tcp,
6087 flags = self.config_flags.NAT_IS_INSIDE
6088 self.vapi.nat44_interface_add_del_feature(
6089 sw_if_index=self.pg0.sw_if_index,
6090 flags=flags, is_add=1)
6091 self.vapi.nat44_interface_add_del_feature(
6092 sw_if_index=self.pg1.sw_if_index,
6095 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
6096 start_sessnum = len(sessions)
6098 self.vapi.nat_set_timeouts(udp=300, tcp_established=7440,
6099 tcp_transitory=2, icmp=5)
6101 self.initiate_tcp_session(self.pg0, self.pg1)
6103 # FIN packet in -> out
6104 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6105 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
6106 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
6107 flags="FA", seq=100, ack=300))
6108 self.pg0.add_stream(p)
6109 self.pg_enable_capture(self.pg_interfaces)
6111 self.pg1.get_capture(1)
6113 # FIN packet out -> in
6114 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
6115 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
6116 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
6117 flags="FA", seq=300, ack=100))
6118 self.pg1.add_stream(p)
6119 self.pg_enable_capture(self.pg_interfaces)
6121 self.pg0.get_capture(1)
6123 # ACK packet in -> out
6124 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6125 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
6126 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
6127 flags="A", seq=101, ack=301))
6128 self.pg0.add_stream(p)
6129 self.pg_enable_capture(self.pg_interfaces)
6131 self.pg1.get_capture(1)
6133 # ACK packet out -> in
6134 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
6135 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
6136 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
6137 flags="A", seq=301, ack=101))
6138 self.pg1.add_stream(p)
6139 self.pg_enable_capture(self.pg_interfaces)
6141 self.pg0.get_capture(1)
6143 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
6144 self.assertEqual(len(sessions) - start_sessnum, 1)
6146 stats = self.statistics.get_counter(
6147 '/err/nat44-ed-out2in/drops due to TCP in transitory timeout')
6148 out2in_drops = stats[0]
6149 stats = self.statistics.get_counter(
6150 '/err/nat44-ed-in2out/drops due to TCP in transitory timeout')
6151 in2out_drops = stats[0]
6153 # extra FIN packet out -> in - this should be dropped
6154 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
6155 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
6156 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
6157 flags="FA", seq=300, ack=101))
6159 self.pg1.add_stream(p)
6160 self.pg_enable_capture(self.pg_interfaces)
6162 self.pg0.assert_nothing_captured()
6164 # extra ACK packet in -> out - this should be dropped
6165 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6166 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
6167 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
6168 flags="A", seq=101, ack=301))
6169 self.pg0.add_stream(p)
6170 self.pg_enable_capture(self.pg_interfaces)
6172 self.pg1.assert_nothing_captured()
6174 stats = self.statistics.get_counter(
6175 '/err/nat44-ed-out2in/drops due to TCP in transitory timeout')
6176 self.assertEqual(stats[0] - out2in_drops, 1)
6177 stats = self.statistics.get_counter(
6178 '/err/nat44-ed-in2out/drops due to TCP in transitory timeout')
6179 self.assertEqual(stats[0] - in2out_drops, 1)
6182 # extra ACK packet in -> out - this will cause session to be wiped
6183 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6184 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
6185 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
6186 flags="A", seq=101, ack=301))
6187 self.pg0.add_stream(p)
6188 self.pg_enable_capture(self.pg_interfaces)
6190 self.pg1.assert_nothing_captured()
6191 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
6192 self.assertEqual(len(sessions) - start_sessnum, 0)
6194 def test_one_armed_nat44_static(self):
6195 """ One armed NAT44 and 1:1 NAPT asymmetrical rule """
6196 remote_host = self.pg4.remote_hosts[0]
6197 local_host = self.pg4.remote_hosts[1]
6202 self.vapi.nat44_forwarding_enable_disable(enable=1)
6203 self.nat44_add_address(self.nat_addr, twice_nat=1)
6204 flags = (self.config_flags.NAT_IS_OUT2IN_ONLY |
6205 self.config_flags.NAT_IS_TWICE_NAT)
6206 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
6207 local_port, external_port,
6208 proto=IP_PROTOS.tcp, flags=flags)
6209 flags = self.config_flags.NAT_IS_INSIDE
6210 self.vapi.nat44_interface_add_del_feature(
6211 sw_if_index=self.pg4.sw_if_index,
6213 self.vapi.nat44_interface_add_del_feature(
6214 sw_if_index=self.pg4.sw_if_index,
6215 flags=flags, is_add=1)
6217 # from client to service
6218 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
6219 IP(src=remote_host.ip4, dst=self.nat_addr) /
6220 TCP(sport=12345, dport=external_port))
6221 self.pg4.add_stream(p)
6222 self.pg_enable_capture(self.pg_interfaces)
6224 capture = self.pg4.get_capture(1)
6229 self.assertEqual(ip.dst, local_host.ip4)
6230 self.assertEqual(ip.src, self.nat_addr)
6231 self.assertEqual(tcp.dport, local_port)
6232 self.assertNotEqual(tcp.sport, 12345)
6233 eh_port_in = tcp.sport
6234 self.assert_packet_checksums_valid(p)
6236 self.logger.error(ppp("Unexpected or invalid packet:", p))
6239 # from service back to client
6240 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
6241 IP(src=local_host.ip4, dst=self.nat_addr) /
6242 TCP(sport=local_port, dport=eh_port_in))
6243 self.pg4.add_stream(p)
6244 self.pg_enable_capture(self.pg_interfaces)
6246 capture = self.pg4.get_capture(1)
6251 self.assertEqual(ip.src, self.nat_addr)
6252 self.assertEqual(ip.dst, remote_host.ip4)
6253 self.assertEqual(tcp.sport, external_port)
6254 self.assertEqual(tcp.dport, 12345)
6255 self.assert_packet_checksums_valid(p)
6257 self.logger.error(ppp("Unexpected or invalid packet:", p))
6260 def test_static_with_port_out2(self):
6261 """ 1:1 NAPT asymmetrical rule """
6266 self.vapi.nat44_forwarding_enable_disable(enable=1)
6267 flags = self.config_flags.NAT_IS_OUT2IN_ONLY
6268 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
6269 local_port, external_port,
6270 proto=IP_PROTOS.tcp, flags=flags)
6271 flags = self.config_flags.NAT_IS_INSIDE
6272 self.vapi.nat44_interface_add_del_feature(
6273 sw_if_index=self.pg0.sw_if_index,
6274 flags=flags, is_add=1)
6275 self.vapi.nat44_interface_add_del_feature(
6276 sw_if_index=self.pg1.sw_if_index,
6279 # from client to service
6280 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
6281 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
6282 TCP(sport=12345, dport=external_port))
6283 self.pg1.add_stream(p)
6284 self.pg_enable_capture(self.pg_interfaces)
6286 capture = self.pg0.get_capture(1)
6291 self.assertEqual(ip.dst, self.pg0.remote_ip4)
6292 self.assertEqual(tcp.dport, local_port)
6293 self.assert_packet_checksums_valid(p)
6295 self.logger.error(ppp("Unexpected or invalid packet:", p))
6299 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6300 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
6301 ICMP(type=11) / capture[0][IP])
6302 self.pg0.add_stream(p)
6303 self.pg_enable_capture(self.pg_interfaces)
6305 capture = self.pg1.get_capture(1)
6308 self.assertEqual(p[IP].src, self.nat_addr)
6310 self.assertEqual(inner.dst, self.nat_addr)
6311 self.assertEqual(inner[TCPerror].dport, external_port)
6313 self.logger.error(ppp("Unexpected or invalid packet:", p))
6316 # from service back to client
6317 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6318 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
6319 TCP(sport=local_port, dport=12345))
6320 self.pg0.add_stream(p)
6321 self.pg_enable_capture(self.pg_interfaces)
6323 capture = self.pg1.get_capture(1)
6328 self.assertEqual(ip.src, self.nat_addr)
6329 self.assertEqual(tcp.sport, external_port)
6330 self.assert_packet_checksums_valid(p)
6332 self.logger.error(ppp("Unexpected or invalid packet:", p))
6336 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6337 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
6338 ICMP(type=11) / capture[0][IP])
6339 self.pg1.add_stream(p)
6340 self.pg_enable_capture(self.pg_interfaces)
6342 capture = self.pg0.get_capture(1)
6345 self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
6347 self.assertEqual(inner.src, self.pg0.remote_ip4)
6348 self.assertEqual(inner[TCPerror].sport, local_port)
6350 self.logger.error(ppp("Unexpected or invalid packet:", p))
6353 # from client to server (no translation)
6354 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
6355 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
6356 TCP(sport=12346, dport=local_port))
6357 self.pg1.add_stream(p)
6358 self.pg_enable_capture(self.pg_interfaces)
6360 capture = self.pg0.get_capture(1)
6365 self.assertEqual(ip.dst, self.pg0.remote_ip4)
6366 self.assertEqual(tcp.dport, local_port)
6367 self.assert_packet_checksums_valid(p)
6369 self.logger.error(ppp("Unexpected or invalid packet:", p))
6372 # from service back to client (no translation)
6373 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6374 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
6375 TCP(sport=local_port, dport=12346))
6376 self.pg0.add_stream(p)
6377 self.pg_enable_capture(self.pg_interfaces)
6379 capture = self.pg1.get_capture(1)
6384 self.assertEqual(ip.src, self.pg0.remote_ip4)
6385 self.assertEqual(tcp.sport, local_port)
6386 self.assert_packet_checksums_valid(p)
6388 self.logger.error(ppp("Unexpected or invalid packet:", p))
6391 def test_output_feature(self):
6392 """ NAT44 interface output feature (in2out postrouting) """
6393 self.vapi.nat44_forwarding_enable_disable(enable=1)
6394 self.nat44_add_address(self.nat_addr)
6395 self.vapi.nat44_interface_add_del_feature(
6396 sw_if_index=self.pg0.sw_if_index,
6398 self.vapi.nat44_interface_add_del_output_feature(
6400 sw_if_index=self.pg1.sw_if_index)
6403 pkts = self.create_stream_in(self.pg0, self.pg1)
6404 self.pg0.add_stream(pkts)
6405 self.pg_enable_capture(self.pg_interfaces)
6407 capture = self.pg1.get_capture(len(pkts))
6408 self.verify_capture_out(capture, ignore_port=True)
6411 pkts = self.create_stream_out(self.pg1)
6412 self.pg1.add_stream(pkts)
6413 self.pg_enable_capture(self.pg_interfaces)
6415 capture = self.pg0.get_capture(len(pkts))
6416 self.verify_capture_in(capture, self.pg0)
6418 def test_output_feature_stateful_acl(self):
6419 """ NAT44 endpoint-dependent output feature works with stateful ACL """
6420 self.nat44_add_address(self.nat_addr)
6421 self.vapi.nat44_interface_add_del_output_feature(
6422 sw_if_index=self.pg0.sw_if_index,
6423 flags=self.config_flags.NAT_IS_INSIDE,
6425 self.vapi.nat44_interface_add_del_output_feature(
6426 sw_if_index=self.pg1.sw_if_index,
6427 flags=self.config_flags.NAT_IS_OUTSIDE,
6430 # First ensure that the NAT is working sans ACL
6432 # send packets out2in, no sessions yet so packets should drop
6433 pkts_out2in = self.create_stream_out(self.pg1)
6434 self.send_and_assert_no_replies(self.pg1, pkts_out2in)
6436 # send packets into inside intf, ensure received via outside intf
6437 pkts_in2out = self.create_stream_in(self.pg0, self.pg1)
6438 capture = self.send_and_expect(self.pg0, pkts_in2out, self.pg1,
6440 self.verify_capture_out(capture, ignore_port=True)
6442 # send out2in again, with sessions created it should work now
6443 pkts_out2in = self.create_stream_out(self.pg1)
6444 capture = self.send_and_expect(self.pg1, pkts_out2in, self.pg0,
6446 self.verify_capture_in(capture, self.pg0)
6448 # Create an ACL blocking everything
6449 out2in_deny_rule = AclRule(is_permit=0)
6450 out2in_acl = VppAcl(self, rules=[out2in_deny_rule])
6451 out2in_acl.add_vpp_config()
6453 # create an ACL to permit/reflect everything
6454 in2out_reflect_rule = AclRule(is_permit=2)
6455 in2out_acl = VppAcl(self, rules=[in2out_reflect_rule])
6456 in2out_acl.add_vpp_config()
6458 # apply as input acl on interface and confirm it blocks everything
6459 acl_if = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index,
6460 n_input=1, acls=[out2in_acl])
6461 acl_if.add_vpp_config()
6462 self.send_and_assert_no_replies(self.pg1, pkts_out2in)
6465 acl_if.acls = [out2in_acl, in2out_acl]
6466 acl_if.add_vpp_config()
6467 # send in2out to generate ACL state (NAT state was created earlier)
6468 capture = self.send_and_expect(self.pg0, pkts_in2out, self.pg1,
6470 self.verify_capture_out(capture, ignore_port=True)
6472 # send out2in again. ACL state exists so it should work now.
6473 # TCP packets with the syn flag set also need the ack flag
6474 for p in pkts_out2in:
6475 if p.haslayer(TCP) and p[TCP].flags & 0x02:
6476 p[TCP].flags |= 0x10
6477 capture = self.send_and_expect(self.pg1, pkts_out2in, self.pg0,
6479 self.verify_capture_in(capture, self.pg0)
6480 self.logger.info(self.vapi.cli("show trace"))
6482 def test_multiple_vrf(self):
6483 """ Multiple VRF setup """
6484 external_addr = '1.2.3.4'
6489 self.vapi.nat44_forwarding_enable_disable(enable=1)
6490 self.nat44_add_address(self.nat_addr)
6491 flags = self.config_flags.NAT_IS_INSIDE
6492 self.vapi.nat44_interface_add_del_feature(
6493 sw_if_index=self.pg0.sw_if_index,
6495 self.vapi.nat44_interface_add_del_feature(
6496 sw_if_index=self.pg0.sw_if_index,
6497 is_add=1, flags=flags)
6498 self.vapi.nat44_interface_add_del_output_feature(
6499 sw_if_index=self.pg1.sw_if_index,
6501 self.vapi.nat44_interface_add_del_feature(
6502 sw_if_index=self.pg5.sw_if_index,
6504 self.vapi.nat44_interface_add_del_feature(
6505 sw_if_index=self.pg5.sw_if_index,
6506 is_add=1, flags=flags)
6507 self.vapi.nat44_interface_add_del_feature(
6508 sw_if_index=self.pg6.sw_if_index,
6510 flags = self.config_flags.NAT_IS_OUT2IN_ONLY
6511 self.nat44_add_static_mapping(self.pg5.remote_ip4, external_addr,
6512 local_port, external_port, vrf_id=1,
6513 proto=IP_PROTOS.tcp, flags=flags)
6514 self.nat44_add_static_mapping(
6515 self.pg0.remote_ip4,
6516 external_sw_if_index=self.pg0.sw_if_index,
6517 local_port=local_port,
6519 external_port=external_port,
6520 proto=IP_PROTOS.tcp,
6524 # from client to service (both VRF1)
6525 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
6526 IP(src=self.pg6.remote_ip4, dst=external_addr) /
6527 TCP(sport=12345, dport=external_port))
6528 self.pg6.add_stream(p)
6529 self.pg_enable_capture(self.pg_interfaces)
6531 capture = self.pg5.get_capture(1)
6536 self.assertEqual(ip.dst, self.pg5.remote_ip4)
6537 self.assertEqual(tcp.dport, local_port)
6538 self.assert_packet_checksums_valid(p)
6540 self.logger.error(ppp("Unexpected or invalid packet:", p))
6543 # from service back to client (both VRF1)
6544 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
6545 IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) /
6546 TCP(sport=local_port, dport=12345))
6547 self.pg5.add_stream(p)
6548 self.pg_enable_capture(self.pg_interfaces)
6550 capture = self.pg6.get_capture(1)
6555 self.assertEqual(ip.src, external_addr)
6556 self.assertEqual(tcp.sport, external_port)
6557 self.assert_packet_checksums_valid(p)
6559 self.logger.error(ppp("Unexpected or invalid packet:", p))
6562 # dynamic NAT from VRF1 to VRF0 (output-feature)
6563 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
6564 IP(src=self.pg5.remote_ip4, dst=self.pg1.remote_ip4) /
6565 TCP(sport=2345, dport=22))
6566 self.pg5.add_stream(p)
6567 self.pg_enable_capture(self.pg_interfaces)
6569 capture = self.pg1.get_capture(1)
6574 self.assertEqual(ip.src, self.nat_addr)
6575 self.assert_packet_checksums_valid(p)
6578 self.logger.error(ppp("Unexpected or invalid packet:", p))
6581 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
6582 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
6583 TCP(sport=22, dport=port))
6584 self.pg1.add_stream(p)
6585 self.pg_enable_capture(self.pg_interfaces)
6587 capture = self.pg5.get_capture(1)
6592 self.assertEqual(ip.dst, self.pg5.remote_ip4)
6593 self.assertEqual(tcp.dport, 2345)
6594 self.assert_packet_checksums_valid(p)
6596 self.logger.error(ppp("Unexpected or invalid packet:", p))
6599 # from client VRF1 to service VRF0
6600 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
6601 IP(src=self.pg6.remote_ip4, dst=self.pg0.local_ip4) /
6602 TCP(sport=12346, dport=external_port))
6603 self.pg6.add_stream(p)
6604 self.pg_enable_capture(self.pg_interfaces)
6606 capture = self.pg0.get_capture(1)
6611 self.assertEqual(ip.dst, self.pg0.remote_ip4)
6612 self.assertEqual(tcp.dport, local_port)
6613 self.assert_packet_checksums_valid(p)
6615 self.logger.error(ppp("Unexpected or invalid packet:", p))
6618 # from service VRF0 back to client VRF1
6619 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6620 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
6621 TCP(sport=local_port, dport=12346))
6622 self.pg0.add_stream(p)
6623 self.pg_enable_capture(self.pg_interfaces)
6625 capture = self.pg6.get_capture(1)
6630 self.assertEqual(ip.src, self.pg0.local_ip4)
6631 self.assertEqual(tcp.sport, external_port)
6632 self.assert_packet_checksums_valid(p)
6634 self.logger.error(ppp("Unexpected or invalid packet:", p))
6637 # from client VRF0 to service VRF1
6638 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6639 IP(src=self.pg0.remote_ip4, dst=external_addr) /
6640 TCP(sport=12347, dport=external_port))
6641 self.pg0.add_stream(p)
6642 self.pg_enable_capture(self.pg_interfaces)
6644 capture = self.pg5.get_capture(1)
6649 self.assertEqual(ip.dst, self.pg5.remote_ip4)
6650 self.assertEqual(tcp.dport, local_port)
6651 self.assert_packet_checksums_valid(p)
6653 self.logger.error(ppp("Unexpected or invalid packet:", p))
6656 # from service VRF1 back to client VRF0
6657 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
6658 IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) /
6659 TCP(sport=local_port, dport=12347))
6660 self.pg5.add_stream(p)
6661 self.pg_enable_capture(self.pg_interfaces)
6663 capture = self.pg0.get_capture(1)
6668 self.assertEqual(ip.src, external_addr)
6669 self.assertEqual(tcp.sport, external_port)
6670 self.assert_packet_checksums_valid(p)
6672 self.logger.error(ppp("Unexpected or invalid packet:", p))
6675 # from client to server (both VRF1, no translation)
6676 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
6677 IP(src=self.pg6.remote_ip4, dst=self.pg5.remote_ip4) /
6678 TCP(sport=12348, dport=local_port))
6679 self.pg6.add_stream(p)
6680 self.pg_enable_capture(self.pg_interfaces)
6682 capture = self.pg5.get_capture(1)
6687 self.assertEqual(ip.dst, self.pg5.remote_ip4)
6688 self.assertEqual(tcp.dport, local_port)
6689 self.assert_packet_checksums_valid(p)
6691 self.logger.error(ppp("Unexpected or invalid packet:", p))
6694 # from server back to client (both VRF1, no translation)
6695 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
6696 IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) /
6697 TCP(sport=local_port, dport=12348))
6698 self.pg5.add_stream(p)
6699 self.pg_enable_capture(self.pg_interfaces)
6701 capture = self.pg6.get_capture(1)
6706 self.assertEqual(ip.src, self.pg5.remote_ip4)
6707 self.assertEqual(tcp.sport, local_port)
6708 self.assert_packet_checksums_valid(p)
6710 self.logger.error(ppp("Unexpected or invalid packet:", p))
6713 # from client VRF1 to server VRF0 (no translation)
6714 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6715 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
6716 TCP(sport=local_port, dport=12349))
6717 self.pg0.add_stream(p)
6718 self.pg_enable_capture(self.pg_interfaces)
6720 capture = self.pg6.get_capture(1)
6725 self.assertEqual(ip.src, self.pg0.remote_ip4)
6726 self.assertEqual(tcp.sport, local_port)
6727 self.assert_packet_checksums_valid(p)
6729 self.logger.error(ppp("Unexpected or invalid packet:", p))
6732 # from server VRF0 back to client VRF1 (no translation)
6733 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6734 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
6735 TCP(sport=local_port, dport=12349))
6736 self.pg0.add_stream(p)
6737 self.pg_enable_capture(self.pg_interfaces)
6739 capture = self.pg6.get_capture(1)
6744 self.assertEqual(ip.src, self.pg0.remote_ip4)
6745 self.assertEqual(tcp.sport, local_port)
6746 self.assert_packet_checksums_valid(p)
6748 self.logger.error(ppp("Unexpected or invalid packet:", p))
6751 # from client VRF0 to server VRF1 (no translation)
6752 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6753 IP(src=self.pg0.remote_ip4, dst=self.pg5.remote_ip4) /
6754 TCP(sport=12344, dport=local_port))
6755 self.pg0.add_stream(p)
6756 self.pg_enable_capture(self.pg_interfaces)
6758 capture = self.pg5.get_capture(1)
6763 self.assertEqual(ip.dst, self.pg5.remote_ip4)
6764 self.assertEqual(tcp.dport, local_port)
6765 self.assert_packet_checksums_valid(p)
6767 self.logger.error(ppp("Unexpected or invalid packet:", p))
6770 # from server VRF1 back to client VRF0 (no translation)
6771 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
6772 IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) /
6773 TCP(sport=local_port, dport=12344))
6774 self.pg5.add_stream(p)
6775 self.pg_enable_capture(self.pg_interfaces)
6777 capture = self.pg0.get_capture(1)
6782 self.assertEqual(ip.src, self.pg5.remote_ip4)
6783 self.assertEqual(tcp.sport, local_port)
6784 self.assert_packet_checksums_valid(p)
6786 self.logger.error(ppp("Unexpected or invalid packet:", p))
6789 def test_session_rst_timeout(self):
6790 """ NAT44 session RST timeouts """
6791 self.nat44_add_address(self.nat_addr)
6792 flags = self.config_flags.NAT_IS_INSIDE
6793 self.vapi.nat44_interface_add_del_feature(
6794 sw_if_index=self.pg0.sw_if_index,
6795 flags=flags, is_add=1)
6796 self.vapi.nat44_interface_add_del_feature(
6797 sw_if_index=self.pg1.sw_if_index,
6799 self.vapi.nat_set_timeouts(udp=300, tcp_established=7440,
6800 tcp_transitory=5, icmp=60)
6802 self.initiate_tcp_session(self.pg0, self.pg1)
6803 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6804 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
6805 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
6807 self.pg0.add_stream(p)
6808 self.pg_enable_capture(self.pg_interfaces)
6810 self.pg1.get_capture(1)
6814 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6815 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
6816 TCP(sport=self.tcp_port_in + 1, dport=self.tcp_external_port + 1,
6818 self.pg0.add_stream(p)
6819 self.pg_enable_capture(self.pg_interfaces)
6821 self.pg1.get_capture(1)
6823 def test_syslog_sess(self):
6824 """ Test syslog session creation and deletion """
6825 self.vapi.syslog_set_filter(
6826 self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
6827 self.vapi.syslog_set_sender(self.pg2.local_ip4, self.pg2.remote_ip4)
6828 self.nat44_add_address(self.nat_addr)
6829 flags = self.config_flags.NAT_IS_INSIDE
6830 self.vapi.nat44_interface_add_del_feature(
6831 sw_if_index=self.pg0.sw_if_index,
6832 flags=flags, is_add=1)
6833 self.vapi.nat44_interface_add_del_feature(
6834 sw_if_index=self.pg1.sw_if_index,
6837 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6838 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
6839 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
6840 self.pg0.add_stream(p)
6841 self.pg_enable_capture(self.pg_interfaces)
6843 capture = self.pg1.get_capture(1)
6844 self.tcp_port_out = capture[0][TCP].sport
6845 capture = self.pg2.get_capture(1)
6846 self.verify_syslog_sess(capture[0][Raw].load)
6848 self.pg_enable_capture(self.pg_interfaces)
6850 self.nat44_add_address(self.nat_addr, is_add=0)
6851 capture = self.pg2.get_capture(1)
6852 self.verify_syslog_sess(capture[0][Raw].load, False)
6854 def test_ed_users_dump(self):
6855 """ API test - nat44_user_dump """
6856 flags = self.config_flags.NAT_IS_INSIDE
6857 self.vapi.nat44_interface_add_del_feature(
6858 sw_if_index=self.pg0.sw_if_index,
6859 flags=flags, is_add=1)
6860 self.vapi.nat44_interface_add_del_feature(
6861 sw_if_index=self.pg1.sw_if_index,
6863 self.vapi.nat44_forwarding_enable_disable(enable=1)
6865 real_ip = self.pg0.remote_ip4
6866 alias_ip = self.nat_addr
6867 flags = self.config_flags.NAT_IS_ADDR_ONLY
6868 self.vapi.nat44_add_del_static_mapping(is_add=1,
6869 local_ip_address=real_ip,
6870 external_ip_address=alias_ip,
6871 external_sw_if_index=0xFFFFFFFF,
6874 users = self.vapi.nat44_user_dump()
6875 self.assertEqual(len(users), 0)
6877 # in2out - static mapping match
6879 pkts = self.create_stream_out(self.pg1)
6880 self.pg1.add_stream(pkts)
6881 self.pg_enable_capture(self.pg_interfaces)
6883 capture = self.pg0.get_capture(len(pkts))
6884 self.verify_capture_in(capture, self.pg0)
6886 pkts = self.create_stream_in(self.pg0, self.pg1)
6887 self.pg0.add_stream(pkts)
6888 self.pg_enable_capture(self.pg_interfaces)
6890 capture = self.pg1.get_capture(len(pkts))
6891 self.verify_capture_out(capture, same_port=True)
6893 users = self.vapi.nat44_user_dump()
6894 self.assertEqual(len(users), 1)
6895 static_user = users[0]
6896 self.assertEqual(static_user.nstaticsessions, 3)
6897 self.assertEqual(static_user.nsessions, 0)
6899 # in2out - no static mapping match
6901 host0 = self.pg0.remote_hosts[0]
6902 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
6904 pkts = self.create_stream_out(self.pg1,
6905 dst_ip=self.pg0.remote_ip4,
6906 use_inside_ports=True)
6907 self.pg1.add_stream(pkts)
6908 self.pg_enable_capture(self.pg_interfaces)
6910 capture = self.pg0.get_capture(len(pkts))
6911 self.verify_capture_in(capture, self.pg0)
6913 pkts = self.create_stream_in(self.pg0, self.pg1)
6914 self.pg0.add_stream(pkts)
6915 self.pg_enable_capture(self.pg_interfaces)
6917 capture = self.pg1.get_capture(len(pkts))
6918 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
6921 self.pg0.remote_hosts[0] = host0
6923 users = self.vapi.nat44_user_dump()
6924 self.assertEqual(len(users), 2)
6925 if str(users[0].ip_address) == self.pg0.remote_hosts[0].ip4:
6926 non_static_user = users[1]
6927 static_user = users[0]
6929 non_static_user = users[0]
6930 static_user = users[1]
6931 self.assertEqual(static_user.nstaticsessions, 3)
6932 self.assertEqual(static_user.nsessions, 0)
6933 self.assertEqual(non_static_user.nstaticsessions, 0)
6934 self.assertEqual(non_static_user.nsessions, 3)
6936 users = self.vapi.nat44_user_dump()
6937 self.assertEqual(len(users), 2)
6938 if str(users[0].ip_address) == self.pg0.remote_hosts[0].ip4:
6939 non_static_user = users[1]
6940 static_user = users[0]
6942 non_static_user = users[0]
6943 static_user = users[1]
6944 self.assertEqual(static_user.nstaticsessions, 3)
6945 self.assertEqual(static_user.nsessions, 0)
6946 self.assertEqual(non_static_user.nstaticsessions, 0)
6947 self.assertEqual(non_static_user.nsessions, 3)
6950 self.vapi.nat44_forwarding_enable_disable(enable=0)
6951 flags = self.config_flags.NAT_IS_ADDR_ONLY
6952 self.vapi.nat44_add_del_static_mapping(
6954 local_ip_address=real_ip,
6955 external_ip_address=alias_ip,
6956 external_sw_if_index=0xFFFFFFFF,
6959 def show_commands_at_teardown(self):
6960 self.logger.info(self.vapi.cli("show errors"))
6961 self.logger.info(self.vapi.cli("show nat44 addresses"))
6962 self.logger.info(self.vapi.cli("show nat44 interfaces"))
6963 self.logger.info(self.vapi.cli("show nat44 static mappings"))
6964 self.logger.info(self.vapi.cli("show nat44 interface address"))
6965 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
6966 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
6967 self.logger.info(self.vapi.cli("show nat timeouts"))
6968 self.logger.info(self.vapi.cli("debug nat44 fib registration"))
6971 class TestNAT44EndpointDependent3(MethodHolder):
6972 """ Endpoint-Dependent mapping and filtering extra test cases """
6974 max_translations = 50
6977 def setUpClass(cls):
6978 super(TestNAT44EndpointDependent3, cls).setUpClass()
6979 cls.vapi.cli("set log class nat level debug")
6981 cls.nat_addr = '10.0.0.3'
6983 cls.create_pg_interfaces(range(2))
6985 for i in cls.pg_interfaces:
6991 super(TestNAT44EndpointDependent3, self).setUp()
6992 flags = self.nat44_config_flags.NAT44_IS_ENDPOINT_DEPENDENT
6993 self.vapi.nat44_plugin_enable_disable(
6994 sessions=self.max_translations,
6995 flags=flags, enable=1)
6996 self.vapi.nat_set_timeouts(
6997 udp=1, tcp_established=7440, tcp_transitory=30, icmp=1)
6999 self.nat44_add_address(self.nat_addr)
7000 flags = self.config_flags.NAT_IS_INSIDE
7001 self.vapi.nat44_interface_add_del_feature(
7002 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1)
7003 self.vapi.nat44_interface_add_del_feature(
7004 sw_if_index=self.pg1.sw_if_index, is_add=1)
7007 def tearDownClass(cls):
7008 super(TestNAT44EndpointDependent3, cls).tearDownClass()
7011 super(TestNAT44EndpointDependent3, self).tearDown()
7012 if not self.vpp_dead:
7013 self.vapi.nat44_plugin_enable_disable(enable=0)
7014 self.vapi.cli("clear logging")
7016 def init_tcp_session(self, in_if, out_if, sport, ext_dport):
7017 # SYN packet in->out
7018 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
7019 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
7020 TCP(sport=sport, dport=ext_dport, flags="S"))
7022 self.pg_enable_capture(self.pg_interfaces)
7024 capture = out_if.get_capture(1)
7026 tcp_port_out = p[TCP].sport
7028 # SYN + ACK packet out->in
7029 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
7030 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
7031 TCP(sport=ext_dport, dport=tcp_port_out, flags="SA"))
7032 out_if.add_stream(p)
7033 self.pg_enable_capture(self.pg_interfaces)
7035 in_if.get_capture(1)
7037 # ACK packet in->out
7038 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
7039 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
7040 TCP(sport=sport, dport=ext_dport, flags="A"))
7042 self.pg_enable_capture(self.pg_interfaces)
7044 out_if.get_capture(1)
7048 def test_lru_cleanup(self):
7049 """ LRU cleanup algorithm """
7050 tcp_port_out = self.init_tcp_session(self.pg0, self.pg1, 2000, 80)
7052 for i in range(0, self.max_translations - 1):
7053 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7054 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
7055 UDP(sport=7000+i, dport=80))
7058 self.pg0.add_stream(pkts)
7059 self.pg_enable_capture(self.pg_interfaces)
7061 self.pg1.get_capture(len(pkts))
7062 self.sleep(1.5, "wait for timeouts")
7065 for i in range(0, self.max_translations - 1):
7066 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7067 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
7068 ICMP(id=8000+i, type='echo-request'))
7071 self.pg0.add_stream(pkts)
7072 self.pg_enable_capture(self.pg_interfaces)
7074 self.pg1.get_capture(len(pkts))
7077 class TestNAT44Out2InDPO(MethodHolder):
7078 """ NAT44 Test Cases using out2in DPO """
7081 def setUpClass(cls):
7082 super(TestNAT44Out2InDPO, cls).setUpClass()
7083 cls.vapi.cli("set log class nat level debug")
7085 cls.tcp_port_in = 6303
7086 cls.tcp_port_out = 6303
7087 cls.udp_port_in = 6304
7088 cls.udp_port_out = 6304
7089 cls.icmp_id_in = 6305
7090 cls.icmp_id_out = 6305
7091 cls.nat_addr = '10.0.0.3'
7092 cls.dst_ip4 = '192.168.70.1'
7094 cls.create_pg_interfaces(range(2))
7097 cls.pg0.config_ip4()
7098 cls.pg0.resolve_arp()
7101 cls.pg1.config_ip6()
7102 cls.pg1.resolve_ndp()
7104 r1 = VppIpRoute(cls, "::", 0,
7105 [VppRoutePath(cls.pg1.remote_ip6,
7106 cls.pg1.sw_if_index)],
7111 def tearDownClass(cls):
7112 super(TestNAT44Out2InDPO, cls).tearDownClass()
7115 super(TestNAT44Out2InDPO, self).setUp()
7116 flags = self.nat44_config_flags.NAT44_API_IS_OUT2IN_DPO
7117 self.vapi.nat44_plugin_enable_disable(enable=1, flags=flags)
7120 super(TestNAT44Out2InDPO, self).tearDown()
7121 if not self.vpp_dead:
7122 self.vapi.nat44_plugin_enable_disable(enable=0)
7123 self.vapi.cli("clear logging")
7125 def configure_xlat(self):
7126 self.dst_ip6_pfx = '1:2:3::'
7127 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
7129 self.dst_ip6_pfx_len = 96
7130 self.src_ip6_pfx = '4:5:6::'
7131 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
7133 self.src_ip6_pfx_len = 96
7134 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
7135 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
7136 '\x00\x00\x00\x00', 0)
7138 @unittest.skip('Temporary disabled')
7139 def test_464xlat_ce(self):
7140 """ Test 464XLAT CE with NAT44 """
7142 nat_config = self.vapi.nat_show_config()
7143 self.assertEqual(1, nat_config.out2in_dpo)
7145 self.configure_xlat()
7147 flags = self.config_flags.NAT_IS_INSIDE
7148 self.vapi.nat44_interface_add_del_feature(
7149 sw_if_index=self.pg0.sw_if_index,
7150 flags=flags, is_add=1)
7151 self.vapi.nat44_add_del_address_range(first_ip_address=self.nat_addr_n,
7152 last_ip_address=self.nat_addr_n,
7153 vrf_id=0xFFFFFFFF, is_add=1)
7155 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
7156 self.dst_ip6_pfx_len)
7157 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
7158 self.src_ip6_pfx_len)
7161 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
7162 self.pg0.add_stream(pkts)
7163 self.pg_enable_capture(self.pg_interfaces)
7165 capture = self.pg1.get_capture(len(pkts))
7166 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
7169 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
7171 self.pg1.add_stream(pkts)
7172 self.pg_enable_capture(self.pg_interfaces)
7174 capture = self.pg0.get_capture(len(pkts))
7175 self.verify_capture_in(capture, self.pg0)
7177 self.vapi.nat44_interface_add_del_feature(
7178 sw_if_index=self.pg0.sw_if_index,
7180 self.vapi.nat44_add_del_address_range(
7181 first_ip_address=self.nat_addr_n,
7182 last_ip_address=self.nat_addr_n,
7185 @unittest.skip('Temporary disabled')
7186 def test_464xlat_ce_no_nat(self):
7187 """ Test 464XLAT CE without NAT44 """
7189 self.configure_xlat()
7191 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
7192 self.dst_ip6_pfx_len)
7193 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
7194 self.src_ip6_pfx_len)
7196 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
7197 self.pg0.add_stream(pkts)
7198 self.pg_enable_capture(self.pg_interfaces)
7200 capture = self.pg1.get_capture(len(pkts))
7201 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
7202 nat_ip=out_dst_ip6, same_port=True)
7204 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
7205 self.pg1.add_stream(pkts)
7206 self.pg_enable_capture(self.pg_interfaces)
7208 capture = self.pg0.get_capture(len(pkts))
7209 self.verify_capture_in(capture, self.pg0)
7212 if __name__ == '__main__':
7213 unittest.main(testRunner=VppTestRunner)