12 from framework import VppTestCase, VppTestRunner
13 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
14 from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
15 IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
17 from scapy.data import IP_PROTOS
18 from scapy.layers.inet import IP, TCP, UDP, ICMP
19 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
20 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
21 from scapy.layers.l2 import Ether, ARP, GRE
22 from scapy.packet import Raw
23 from syslog_rfc5424_parser import SyslogMessage, ParseError
24 from syslog_rfc5424_parser.constants import SyslogSeverity
26 from vpp_ip_route import VppIpRoute, VppRoutePath
27 from vpp_neighbor import VppNeighbor
28 from vpp_papi import VppEnum
31 # NAT HA protocol event data
34 fields_desc = [ByteEnumField("event_type", None,
35 {1: "add", 2: "del", 3: "refresh"}),
36 ByteEnumField("protocol", None,
37 {0: "other", 1: "udp", 2: "tcp", 3: "icmp"}),
38 ShortField("flags", 0),
39 IPField("in_addr", None),
40 IPField("out_addr", None),
41 ShortField("in_port", None),
42 ShortField("out_port", None),
43 IPField("eh_addr", None),
44 IPField("ehn_addr", None),
45 ShortField("eh_port", None),
46 ShortField("ehn_port", None),
47 IntField("fib_index", None),
48 IntField("total_pkts", 0),
49 LongField("total_bytes", 0)]
51 def extract_padding(self, s):
55 # NAT HA protocol header
56 class HANATStateSync(Packet):
57 name = "HA NAT state sync"
58 fields_desc = [XByteField("version", 1),
59 FlagsField("flags", 0, 8, ['ACK']),
60 FieldLenField("count", None, count_of="events"),
61 IntField("sequence_number", 1),
62 IntField("thread_index", 0),
63 PacketListField("events", [], Event,
64 count_from=lambda pkt: pkt.count)]
67 class MethodHolder(VppTestCase):
68 """ NAT create capture and verify method holder """
71 def config_flags(self):
72 return VppEnum.vl_api_nat_config_flags_t
75 def nat44_config_flags(self):
76 return VppEnum.vl_api_nat44_config_flags_t
79 def SYSLOG_SEVERITY(self):
80 return VppEnum.vl_api_syslog_severity_t
82 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
83 local_port=0, external_port=0, vrf_id=0,
84 is_add=1, external_sw_if_index=0xFFFFFFFF,
85 proto=0, tag="", flags=0):
87 Add/delete NAT44EI static mapping
89 :param local_ip: Local IP address
90 :param external_ip: External IP address
91 :param local_port: Local port number (Optional)
92 :param external_port: External port number (Optional)
93 :param vrf_id: VRF ID (Default 0)
94 :param is_add: 1 if add, 0 if delete (Default add)
95 :param external_sw_if_index: External interface instead of IP address
96 :param proto: IP protocol (Mandatory if port specified)
97 :param tag: Opaque string tag
98 :param flags: NAT configuration flags
101 if not (local_port and external_port):
102 flags |= self.config_flags.NAT_IS_ADDR_ONLY
104 self.vapi.nat44_add_del_static_mapping(
106 local_ip_address=local_ip,
107 external_ip_address=external_ip,
108 external_sw_if_index=external_sw_if_index,
109 local_port=local_port,
110 external_port=external_port,
111 vrf_id=vrf_id, protocol=proto,
115 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
117 Add/delete NAT44EI address
119 :param ip: IP address
120 :param is_add: 1 if add, 0 if delete (Default add)
121 :param twice_nat: twice NAT address for external hosts
123 flags = self.config_flags.NAT_IS_TWICE_NAT if twice_nat else 0
124 self.vapi.nat44_add_del_address_range(first_ip_address=ip,
130 def create_routes_and_neigbors(self):
131 r1 = VppIpRoute(self, self.pg7.remote_ip4, 32,
132 [VppRoutePath(self.pg7.remote_ip4,
133 self.pg7.sw_if_index)])
134 r2 = VppIpRoute(self, self.pg8.remote_ip4, 32,
135 [VppRoutePath(self.pg8.remote_ip4,
136 self.pg8.sw_if_index)])
140 n1 = VppNeighbor(self,
141 self.pg7.sw_if_index,
145 n2 = VppNeighbor(self,
146 self.pg8.sw_if_index,
153 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
155 Create packet stream for inside network
157 :param in_if: Inside interface
158 :param out_if: Outside interface
159 :param dst_ip: Destination address
160 :param ttl: TTL of generated packets
163 dst_ip = out_if.remote_ip4
167 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
168 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
169 TCP(sport=self.tcp_port_in, dport=20))
173 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
174 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
175 UDP(sport=self.udp_port_in, dport=20))
179 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
180 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
181 ICMP(id=self.icmp_id_in, type='echo-request'))
186 def compose_ip6(self, ip4, pref, plen):
188 Compose IPv4-embedded IPv6 addresses
190 :param ip4: IPv4 address
191 :param pref: IPv6 prefix
192 :param plen: IPv6 prefix length
193 :returns: IPv4-embedded IPv6 addresses
195 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
196 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
211 pref_n[10] = ip4_n[3]
215 pref_n[10] = ip4_n[2]
216 pref_n[11] = ip4_n[3]
219 pref_n[10] = ip4_n[1]
220 pref_n[11] = ip4_n[2]
221 pref_n[12] = ip4_n[3]
223 pref_n[12] = ip4_n[0]
224 pref_n[13] = ip4_n[1]
225 pref_n[14] = ip4_n[2]
226 pref_n[15] = ip4_n[3]
227 packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n])
228 return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
230 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
231 use_inside_ports=False):
233 Create packet stream for outside network
235 :param out_if: Outside interface
236 :param dst_ip: Destination IP address (Default use global NAT address)
237 :param ttl: TTL of generated packets
238 :param use_inside_ports: Use inside NAT ports as destination ports
239 instead of outside ports
242 dst_ip = self.nat_addr
243 if not use_inside_ports:
244 tcp_port = self.tcp_port_out
245 udp_port = self.udp_port_out
246 icmp_id = self.icmp_id_out
248 tcp_port = self.tcp_port_in
249 udp_port = self.udp_port_in
250 icmp_id = self.icmp_id_in
253 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
254 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
255 TCP(dport=tcp_port, sport=20))
259 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
260 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
261 UDP(dport=udp_port, sport=20))
265 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
266 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
267 ICMP(id=icmp_id, type='echo-reply'))
272 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
274 Create packet stream for outside network
276 :param out_if: Outside interface
277 :param dst_ip: Destination IP address (Default use global NAT address)
278 :param hl: HL of generated packets
282 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
283 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
284 TCP(dport=self.tcp_port_out, sport=20))
288 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
289 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
290 UDP(dport=self.udp_port_out, sport=20))
294 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
295 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
296 ICMPv6EchoReply(id=self.icmp_id_out))
301 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
302 dst_ip=None, is_ip6=False, ignore_port=False):
304 Verify captured packets on outside network
306 :param capture: Captured packets
307 :param nat_ip: Translated IP address (Default use global NAT address)
308 :param same_port: Source port number is not translated (Default False)
309 :param dst_ip: Destination IP address (Default do not verify)
310 :param is_ip6: If L3 protocol is IPv6 (Default False)
314 ICMP46 = ICMPv6EchoRequest
319 nat_ip = self.nat_addr
320 for packet in capture:
323 self.assert_packet_checksums_valid(packet)
324 self.assertEqual(packet[IP46].src, nat_ip)
325 if dst_ip is not None:
326 self.assertEqual(packet[IP46].dst, dst_ip)
327 if packet.haslayer(TCP):
331 packet[TCP].sport, self.tcp_port_in)
334 packet[TCP].sport, self.tcp_port_in)
335 self.tcp_port_out = packet[TCP].sport
336 self.assert_packet_checksums_valid(packet)
337 elif packet.haslayer(UDP):
341 packet[UDP].sport, self.udp_port_in)
344 packet[UDP].sport, self.udp_port_in)
345 self.udp_port_out = packet[UDP].sport
350 packet[ICMP46].id, self.icmp_id_in)
353 packet[ICMP46].id, self.icmp_id_in)
354 self.icmp_id_out = packet[ICMP46].id
355 self.assert_packet_checksums_valid(packet)
357 self.logger.error(ppp("Unexpected or invalid packet "
358 "(outside network):", packet))
361 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
364 Verify captured packets on outside network
366 :param capture: Captured packets
367 :param nat_ip: Translated IP address
368 :param same_port: Source port number is not translated (Default False)
369 :param dst_ip: Destination IP address (Default do not verify)
371 return self.verify_capture_out(capture, nat_ip, same_port, dst_ip,
374 def verify_capture_in(self, capture, in_if):
376 Verify captured packets on inside network
378 :param capture: Captured packets
379 :param in_if: Inside interface
381 for packet in capture:
383 self.assert_packet_checksums_valid(packet)
384 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
385 if packet.haslayer(TCP):
386 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
387 elif packet.haslayer(UDP):
388 self.assertEqual(packet[UDP].dport, self.udp_port_in)
390 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
392 self.logger.error(ppp("Unexpected or invalid packet "
393 "(inside network):", packet))
396 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
398 Verify captured packet that don't have to be translated
400 :param capture: Captured packets
401 :param ingress_if: Ingress interface
402 :param egress_if: Egress interface
404 for packet in capture:
406 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
407 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
408 if packet.haslayer(TCP):
409 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
410 elif packet.haslayer(UDP):
411 self.assertEqual(packet[UDP].sport, self.udp_port_in)
413 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
415 self.logger.error(ppp("Unexpected or invalid packet "
416 "(inside network):", packet))
419 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
422 Verify captured packets with ICMP errors on outside network
424 :param capture: Captured packets
425 :param src_ip: Translated IP address or IP address of VPP
426 (Default use global NAT address)
427 :param icmp_type: Type of error ICMP packet
428 we are expecting (Default 11)
431 src_ip = self.nat_addr
432 for packet in capture:
434 self.assertEqual(packet[IP].src, src_ip)
435 self.assertEqual(packet.haslayer(ICMP), 1)
437 self.assertEqual(icmp.type, icmp_type)
438 self.assertTrue(icmp.haslayer(IPerror))
439 inner_ip = icmp[IPerror]
440 if inner_ip.haslayer(TCPerror):
441 self.assertEqual(inner_ip[TCPerror].dport,
443 elif inner_ip.haslayer(UDPerror):
444 self.assertEqual(inner_ip[UDPerror].dport,
447 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
449 self.logger.error(ppp("Unexpected or invalid packet "
450 "(outside network):", packet))
453 def verify_capture_in_with_icmp_errors(self, capture, in_if, icmp_type=11):
455 Verify captured packets with ICMP errors on inside network
457 :param capture: Captured packets
458 :param in_if: Inside interface
459 :param icmp_type: Type of error ICMP packet
460 we are expecting (Default 11)
462 for packet in capture:
464 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
465 self.assertEqual(packet.haslayer(ICMP), 1)
467 self.assertEqual(icmp.type, icmp_type)
468 self.assertTrue(icmp.haslayer(IPerror))
469 inner_ip = icmp[IPerror]
470 if inner_ip.haslayer(TCPerror):
471 self.assertEqual(inner_ip[TCPerror].sport,
473 elif inner_ip.haslayer(UDPerror):
474 self.assertEqual(inner_ip[UDPerror].sport,
477 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
479 self.logger.error(ppp("Unexpected or invalid packet "
480 "(inside network):", packet))
483 def create_stream_frag(self, src_if, dst, sport, dport, data,
484 proto=IP_PROTOS.tcp, echo_reply=False):
486 Create fragmented packet stream
488 :param src_if: Source interface
489 :param dst: Destination IPv4 address
490 :param sport: Source port
491 :param dport: Destination port
492 :param data: Payload data
493 :param proto: protocol (TCP, UDP, ICMP)
494 :param echo_reply: use echo_reply if protocol is ICMP
497 if proto == IP_PROTOS.tcp:
498 p = (IP(src=src_if.remote_ip4, dst=dst) /
499 TCP(sport=sport, dport=dport) /
501 p = p.__class__(scapy.compat.raw(p))
502 chksum = p[TCP].chksum
503 proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
504 elif proto == IP_PROTOS.udp:
505 proto_header = UDP(sport=sport, dport=dport)
506 elif proto == IP_PROTOS.icmp:
508 proto_header = ICMP(id=sport, type='echo-request')
510 proto_header = ICMP(id=sport, type='echo-reply')
512 raise Exception("Unsupported protocol")
513 id = random.randint(0, 65535)
515 if proto == IP_PROTOS.tcp:
518 raw = Raw(data[0:16])
519 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
520 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
524 if proto == IP_PROTOS.tcp:
525 raw = Raw(data[4:20])
527 raw = Raw(data[16:32])
528 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
529 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
533 if proto == IP_PROTOS.tcp:
537 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
538 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto,
544 def reass_frags_and_verify(self, frags, src, dst):
546 Reassemble and verify fragmented packet
548 :param frags: Captured fragments
549 :param src: Source IPv4 address to verify
550 :param dst: Destination IPv4 address to verify
552 :returns: Reassembled IPv4 packet
556 self.assertEqual(p[IP].src, src)
557 self.assertEqual(p[IP].dst, dst)
558 self.assert_ip_checksum_valid(p)
559 buffer.seek(p[IP].frag * 8)
560 buffer.write(bytes(p[IP].payload))
561 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
562 proto=frags[0][IP].proto)
563 if ip.proto == IP_PROTOS.tcp:
564 p = (ip / TCP(buffer.getvalue()))
565 self.logger.debug(ppp("Reassembled:", p))
566 self.assert_tcp_checksum_valid(p)
567 elif ip.proto == IP_PROTOS.udp:
568 p = (ip / UDP(buffer.getvalue()[:8]) /
569 Raw(buffer.getvalue()[8:]))
570 elif ip.proto == IP_PROTOS.icmp:
571 p = (ip / ICMP(buffer.getvalue()))
574 def verify_ipfix_nat44_ses(self, data):
576 Verify IPFIX NAT44EI session create/delete event
578 :param data: Decoded IPFIX data records
580 nat44_ses_create_num = 0
581 nat44_ses_delete_num = 0
582 self.assertEqual(6, len(data))
585 self.assertIn(scapy.compat.orb(record[230]), [4, 5])
586 if scapy.compat.orb(record[230]) == 4:
587 nat44_ses_create_num += 1
589 nat44_ses_delete_num += 1
591 self.assertEqual(self.pg0.remote_ip4,
592 str(ipaddress.IPv4Address(record[8])))
593 # postNATSourceIPv4Address
594 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
597 self.assertEqual(struct.pack("!I", 0), record[234])
598 # protocolIdentifier/sourceTransportPort
599 # /postNAPTSourceTransportPort
600 if IP_PROTOS.icmp == scapy.compat.orb(record[4]):
601 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
602 self.assertEqual(struct.pack("!H", self.icmp_id_out),
604 elif IP_PROTOS.tcp == scapy.compat.orb(record[4]):
605 self.assertEqual(struct.pack("!H", self.tcp_port_in),
607 self.assertEqual(struct.pack("!H", self.tcp_port_out),
609 elif IP_PROTOS.udp == scapy.compat.orb(record[4]):
610 self.assertEqual(struct.pack("!H", self.udp_port_in),
612 self.assertEqual(struct.pack("!H", self.udp_port_out),
615 self.fail("Invalid protocol")
616 self.assertEqual(3, nat44_ses_create_num)
617 self.assertEqual(3, nat44_ses_delete_num)
619 def verify_ipfix_addr_exhausted(self, data):
620 self.assertEqual(1, len(data))
623 self.assertEqual(scapy.compat.orb(record[230]), 3)
625 self.assertEqual(struct.pack("!I", 0), record[283])
627 def verify_ipfix_max_sessions(self, data, limit):
628 self.assertEqual(1, len(data))
631 self.assertEqual(scapy.compat.orb(record[230]), 13)
632 # natQuotaExceededEvent
633 self.assertEqual(struct.pack("I", 1), record[466])
635 self.assertEqual(struct.pack("I", limit), record[471])
637 def verify_no_nat44_user(self):
638 """ Verify that there is no NAT44EI user """
639 users = self.vapi.nat44_user_dump()
640 self.assertEqual(len(users), 0)
641 users = self.statistics.get_counter('/nat44/total-users')
642 self.assertEqual(users[0][0], 0)
643 sessions = self.statistics.get_counter('/nat44/total-sessions')
644 self.assertEqual(sessions[0][0], 0)
646 def verify_syslog_apmap(self, data, is_add=True):
647 message = data.decode('utf-8')
649 message = SyslogMessage.parse(message)
650 except ParseError as e:
654 self.assertEqual(message.severity, SyslogSeverity.info)
655 self.assertEqual(message.appname, 'NAT')
656 self.assertEqual(message.msgid, 'APMADD' if is_add else 'APMDEL')
657 sd_params = message.sd.get('napmap')
658 self.assertTrue(sd_params is not None)
659 self.assertEqual(sd_params.get('IATYP'), 'IPv4')
660 self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip4)
661 self.assertEqual(sd_params.get('ISPORT'), "%d" % self.tcp_port_in)
662 self.assertEqual(sd_params.get('XATYP'), 'IPv4')
663 self.assertEqual(sd_params.get('XSADDR'), self.nat_addr)
664 self.assertEqual(sd_params.get('XSPORT'), "%d" % self.tcp_port_out)
665 self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp)
666 self.assertTrue(sd_params.get('SSUBIX') is not None)
667 self.assertEqual(sd_params.get('SVLAN'), '0')
669 def verify_mss_value(self, pkt, mss):
670 if not pkt.haslayer(IP) or not pkt.haslayer(TCP):
671 raise TypeError("Not a TCP/IP packet")
673 for option in pkt[TCP].options:
674 if option[0] == 'MSS':
675 self.assertEqual(option[1], mss)
676 self.assert_tcp_checksum_valid(pkt)
679 def proto2layer(proto):
680 if proto == IP_PROTOS.tcp:
682 elif proto == IP_PROTOS.udp:
684 elif proto == IP_PROTOS.icmp:
687 raise Exception("Unsupported protocol")
689 def frag_in_order(self, proto=IP_PROTOS.tcp, dont_translate=False,
691 layer = self.proto2layer(proto)
693 if proto == IP_PROTOS.tcp:
694 data = b"A" * 4 + b"B" * 16 + b"C" * 3
696 data = b"A" * 16 + b"B" * 16 + b"C" * 3
697 self.port_in = random.randint(1025, 65535)
700 pkts = self.create_stream_frag(self.pg0, self.pg1.remote_ip4,
701 self.port_in, 20, data, proto)
702 self.pg0.add_stream(pkts)
703 self.pg_enable_capture(self.pg_interfaces)
705 frags = self.pg1.get_capture(len(pkts))
706 if not dont_translate:
707 p = self.reass_frags_and_verify(frags,
711 p = self.reass_frags_and_verify(frags,
714 if proto != IP_PROTOS.icmp:
715 if not dont_translate:
716 self.assertEqual(p[layer].dport, 20)
718 self.assertNotEqual(p[layer].sport, self.port_in)
720 self.assertEqual(p[layer].sport, self.port_in)
723 if not dont_translate:
724 self.assertNotEqual(p[layer].id, self.port_in)
726 self.assertEqual(p[layer].id, self.port_in)
727 self.assertEqual(data, p[Raw].load)
730 if not dont_translate:
731 dst_addr = self.nat_addr
733 dst_addr = self.pg0.remote_ip4
734 if proto != IP_PROTOS.icmp:
736 dport = p[layer].sport
740 pkts = self.create_stream_frag(self.pg1, dst_addr, sport, dport, data,
741 proto, echo_reply=True)
742 self.pg1.add_stream(pkts)
743 self.pg_enable_capture(self.pg_interfaces)
745 frags = self.pg0.get_capture(len(pkts))
746 p = self.reass_frags_and_verify(frags,
749 if proto != IP_PROTOS.icmp:
750 self.assertEqual(p[layer].sport, 20)
751 self.assertEqual(p[layer].dport, self.port_in)
753 self.assertEqual(p[layer].id, self.port_in)
754 self.assertEqual(data, p[Raw].load)
756 def reass_hairpinning(self, server_addr, server_in_port, server_out_port,
757 host_in_port, proto=IP_PROTOS.tcp,
760 layer = self.proto2layer(proto)
762 if proto == IP_PROTOS.tcp:
763 data = b"A" * 4 + b"B" * 16 + b"C" * 3
765 data = b"A" * 16 + b"B" * 16 + b"C" * 3
767 # send packet from host to server
768 pkts = self.create_stream_frag(self.pg0,
774 self.pg0.add_stream(pkts)
775 self.pg_enable_capture(self.pg_interfaces)
777 frags = self.pg0.get_capture(len(pkts))
778 p = self.reass_frags_and_verify(frags,
781 if proto != IP_PROTOS.icmp:
783 self.assertNotEqual(p[layer].sport, host_in_port)
784 self.assertEqual(p[layer].dport, server_in_port)
787 self.assertNotEqual(p[layer].id, host_in_port)
788 self.assertEqual(data, p[Raw].load)
790 def frag_out_of_order(self, proto=IP_PROTOS.tcp, dont_translate=False,
792 layer = self.proto2layer(proto)
794 if proto == IP_PROTOS.tcp:
795 data = b"A" * 4 + b"B" * 16 + b"C" * 3
797 data = b"A" * 16 + b"B" * 16 + b"C" * 3
798 self.port_in = random.randint(1025, 65535)
802 pkts = self.create_stream_frag(self.pg0, self.pg1.remote_ip4,
803 self.port_in, 20, data, proto)
805 self.pg0.add_stream(pkts)
806 self.pg_enable_capture(self.pg_interfaces)
808 frags = self.pg1.get_capture(len(pkts))
809 if not dont_translate:
810 p = self.reass_frags_and_verify(frags,
814 p = self.reass_frags_and_verify(frags,
817 if proto != IP_PROTOS.icmp:
818 if not dont_translate:
819 self.assertEqual(p[layer].dport, 20)
821 self.assertNotEqual(p[layer].sport, self.port_in)
823 self.assertEqual(p[layer].sport, self.port_in)
826 if not dont_translate:
827 self.assertNotEqual(p[layer].id, self.port_in)
829 self.assertEqual(p[layer].id, self.port_in)
830 self.assertEqual(data, p[Raw].load)
833 if not dont_translate:
834 dst_addr = self.nat_addr
836 dst_addr = self.pg0.remote_ip4
837 if proto != IP_PROTOS.icmp:
839 dport = p[layer].sport
843 pkts = self.create_stream_frag(self.pg1, dst_addr, sport, dport,
844 data, proto, echo_reply=True)
846 self.pg1.add_stream(pkts)
847 self.pg_enable_capture(self.pg_interfaces)
849 frags = self.pg0.get_capture(len(pkts))
850 p = self.reass_frags_and_verify(frags,
853 if proto != IP_PROTOS.icmp:
854 self.assertEqual(p[layer].sport, 20)
855 self.assertEqual(p[layer].dport, self.port_in)
857 self.assertEqual(p[layer].id, self.port_in)
858 self.assertEqual(data, p[Raw].load)
861 class TestNAT44EIAPI(MethodHolder):
862 """ NAT44EI API Test Cases """
867 super(TestNAT44EIAPI, self).setUp()
868 self.vapi.nat_set_fq_options(frame_queue_nelts=self.fq_nelts)
869 self.vapi.nat44_plugin_enable_disable(enable=1)
872 super(TestNAT44EIAPI, self).tearDown()
873 if not self.vpp_dead:
874 self.vapi.nat44_plugin_enable_disable(enable=0)
875 self.vapi.cli("clear logging")
877 def test_show_frame_queue_nelts(self):
878 """ API test - worker handoff frame queue elements """
879 nat_config = self.vapi.nat_show_fq_options()
880 self.assertEqual(self.fq_nelts, nat_config.frame_queue_nelts)
881 self.vapi.nat44_plugin_enable_disable(enable=0)
882 self.vapi.cli("set nat frame-queue-nelts 256")
883 self.vapi.nat44_plugin_enable_disable(enable=1)
884 nat_config = self.vapi.nat_show_fq_options()
885 self.assertEqual(256, nat_config.frame_queue_nelts)
888 class TestNAT44EI(MethodHolder):
889 """ NAT44EI Test Cases """
891 max_translations = 10240
896 super(TestNAT44EI, cls).setUpClass()
897 cls.vapi.cli("set log class nat level debug")
899 cls.tcp_port_in = 6303
900 cls.tcp_port_out = 6303
901 cls.udp_port_in = 6304
902 cls.udp_port_out = 6304
903 cls.icmp_id_in = 6305
904 cls.icmp_id_out = 6305
905 cls.nat_addr = '10.0.0.3'
906 cls.ipfix_src_port = 4739
907 cls.ipfix_domain_id = 1
908 cls.tcp_external_port = 80
909 cls.udp_external_port = 69
911 cls.create_pg_interfaces(range(10))
912 cls.interfaces = list(cls.pg_interfaces[0:4])
914 for i in cls.interfaces:
919 cls.pg0.generate_remote_hosts(3)
920 cls.pg0.configure_ipv4_neighbors()
922 cls.pg1.generate_remote_hosts(1)
923 cls.pg1.configure_ipv4_neighbors()
925 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
926 cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 10})
927 cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 20})
929 cls.pg4._local_ip4 = "172.16.255.1"
930 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
931 cls.pg4.set_table_ip4(10)
932 cls.pg5._local_ip4 = "172.17.255.3"
933 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
934 cls.pg5.set_table_ip4(10)
935 cls.pg6._local_ip4 = "172.16.255.1"
936 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
937 cls.pg6.set_table_ip4(20)
938 for i in cls.overlapping_interfaces:
946 cls.pg9.generate_remote_hosts(2)
948 cls.vapi.sw_interface_add_del_address(
949 sw_if_index=cls.pg9.sw_if_index,
950 prefix="10.0.0.1/24")
953 cls.pg9.resolve_arp()
954 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
955 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
956 cls.pg9.resolve_arp()
959 super(TestNAT44EI, self).setUp()
960 self.vapi.nat44_plugin_enable_disable(
961 sessions=self.max_translations,
962 users=self.max_users, enable=1)
965 super(TestNAT44EI, self).tearDown()
966 if not self.vpp_dead:
967 self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
968 src_port=self.ipfix_src_port,
970 self.ipfix_src_port = 4739
971 self.ipfix_domain_id = 1
973 self.vapi.nat44_plugin_enable_disable(enable=0)
974 self.vapi.cli("clear logging")
976 def test_clear_sessions(self):
977 """ NAT44EI session clearing test """
979 self.nat44_add_address(self.nat_addr)
980 flags = self.config_flags.NAT_IS_INSIDE
981 self.vapi.nat44_interface_add_del_feature(
982 sw_if_index=self.pg0.sw_if_index,
983 flags=flags, is_add=1)
984 self.vapi.nat44_interface_add_del_feature(
985 sw_if_index=self.pg1.sw_if_index,
988 nat_config = self.vapi.nat_show_config()
989 self.assertEqual(0, nat_config.endpoint_dependent)
991 pkts = self.create_stream_in(self.pg0, self.pg1)
992 self.pg0.add_stream(pkts)
993 self.pg_enable_capture(self.pg_interfaces)
995 capture = self.pg1.get_capture(len(pkts))
996 self.verify_capture_out(capture)
998 sessions = self.statistics.get_counter('/nat44/total-sessions')
999 self.assertTrue(sessions[0][0] > 0)
1000 self.logger.info("sessions before clearing: %s" % sessions[0][0])
1002 self.vapi.cli("clear nat44 sessions")
1004 sessions = self.statistics.get_counter('/nat44/total-sessions')
1005 self.assertEqual(sessions[0][0], 0)
1006 self.logger.info("sessions after clearing: %s" % sessions[0][0])
1008 def test_dynamic(self):
1009 """ NAT44EI dynamic translation test """
1010 self.nat44_add_address(self.nat_addr)
1011 flags = self.config_flags.NAT_IS_INSIDE
1012 self.vapi.nat44_interface_add_del_feature(
1013 sw_if_index=self.pg0.sw_if_index,
1014 flags=flags, is_add=1)
1015 self.vapi.nat44_interface_add_del_feature(
1016 sw_if_index=self.pg1.sw_if_index,
1020 tcpn = self.statistics.get_counter('/nat44/in2out/slowpath/tcp')[0]
1021 udpn = self.statistics.get_counter('/nat44/in2out/slowpath/udp')[0]
1022 icmpn = self.statistics.get_counter('/nat44/in2out/slowpath/icmp')[0]
1023 drops = self.statistics.get_counter('/nat44/in2out/slowpath/drops')[0]
1025 pkts = self.create_stream_in(self.pg0, self.pg1)
1026 self.pg0.add_stream(pkts)
1027 self.pg_enable_capture(self.pg_interfaces)
1029 capture = self.pg1.get_capture(len(pkts))
1030 self.verify_capture_out(capture)
1032 if_idx = self.pg0.sw_if_index
1033 cnt = self.statistics.get_counter('/nat44/in2out/slowpath/tcp')[0]
1034 self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
1035 cnt = self.statistics.get_counter('/nat44/in2out/slowpath/udp')[0]
1036 self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
1037 cnt = self.statistics.get_counter('/nat44/in2out/slowpath/icmp')[0]
1038 self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
1039 cnt = self.statistics.get_counter('/nat44/in2out/slowpath/drops')[0]
1040 self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
1043 tcpn = self.statistics.get_counter('/nat44/out2in/slowpath/tcp')[0]
1044 udpn = self.statistics.get_counter('/nat44/out2in/slowpath/udp')[0]
1045 icmpn = self.statistics.get_counter('/nat44/out2in/slowpath/icmp')[0]
1046 drops = self.statistics.get_counter('/nat44/out2in/slowpath/drops')[0]
1048 pkts = self.create_stream_out(self.pg1)
1049 self.pg1.add_stream(pkts)
1050 self.pg_enable_capture(self.pg_interfaces)
1052 capture = self.pg0.get_capture(len(pkts))
1053 self.verify_capture_in(capture, self.pg0)
1055 if_idx = self.pg1.sw_if_index
1056 cnt = self.statistics.get_counter('/nat44/out2in/slowpath/tcp')[0]
1057 self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
1058 cnt = self.statistics.get_counter('/nat44/out2in/slowpath/udp')[0]
1059 self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
1060 cnt = self.statistics.get_counter('/nat44/out2in/slowpath/icmp')[0]
1061 self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
1062 cnt = self.statistics.get_counter('/nat44/out2in/slowpath/drops')[0]
1063 self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
1065 users = self.statistics.get_counter('/nat44/total-users')
1066 self.assertEqual(users[0][0], 1)
1067 sessions = self.statistics.get_counter('/nat44/total-sessions')
1068 self.assertEqual(sessions[0][0], 3)
1070 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1071 """ NAT44EI handling of client packets with TTL=1 """
1073 self.nat44_add_address(self.nat_addr)
1074 flags = self.config_flags.NAT_IS_INSIDE
1075 self.vapi.nat44_interface_add_del_feature(
1076 sw_if_index=self.pg0.sw_if_index,
1077 flags=flags, is_add=1)
1078 self.vapi.nat44_interface_add_del_feature(
1079 sw_if_index=self.pg1.sw_if_index,
1082 # Client side - generate traffic
1083 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1084 self.pg0.add_stream(pkts)
1085 self.pg_enable_capture(self.pg_interfaces)
1088 # Client side - verify ICMP type 11 packets
1089 capture = self.pg0.get_capture(len(pkts))
1090 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1092 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1093 """ NAT44EI handling of server packets with TTL=1 """
1095 self.nat44_add_address(self.nat_addr)
1096 flags = self.config_flags.NAT_IS_INSIDE
1097 self.vapi.nat44_interface_add_del_feature(
1098 sw_if_index=self.pg0.sw_if_index,
1099 flags=flags, is_add=1)
1100 self.vapi.nat44_interface_add_del_feature(
1101 sw_if_index=self.pg1.sw_if_index,
1104 # Client side - create sessions
1105 pkts = self.create_stream_in(self.pg0, self.pg1)
1106 self.pg0.add_stream(pkts)
1107 self.pg_enable_capture(self.pg_interfaces)
1110 # Server side - generate traffic
1111 capture = self.pg1.get_capture(len(pkts))
1112 self.verify_capture_out(capture)
1113 pkts = self.create_stream_out(self.pg1, ttl=1)
1114 self.pg1.add_stream(pkts)
1115 self.pg_enable_capture(self.pg_interfaces)
1118 # Server side - verify ICMP type 11 packets
1119 capture = self.pg1.get_capture(len(pkts))
1120 self.verify_capture_out_with_icmp_errors(capture,
1121 src_ip=self.pg1.local_ip4)
1123 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1124 """ NAT44EI handling of error responses to client packets with TTL=2
1127 self.nat44_add_address(self.nat_addr)
1128 flags = self.config_flags.NAT_IS_INSIDE
1129 self.vapi.nat44_interface_add_del_feature(
1130 sw_if_index=self.pg0.sw_if_index,
1131 flags=flags, is_add=1)
1132 self.vapi.nat44_interface_add_del_feature(
1133 sw_if_index=self.pg1.sw_if_index,
1136 # Client side - generate traffic
1137 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1138 self.pg0.add_stream(pkts)
1139 self.pg_enable_capture(self.pg_interfaces)
1142 # Server side - simulate ICMP type 11 response
1143 capture = self.pg1.get_capture(len(pkts))
1144 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1145 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1146 ICMP(type=11) / packet[IP] for packet in capture]
1147 self.pg1.add_stream(pkts)
1148 self.pg_enable_capture(self.pg_interfaces)
1151 # Client side - verify ICMP type 11 packets
1152 capture = self.pg0.get_capture(len(pkts))
1153 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1155 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1156 """ NAT44EI handling of error responses to server packets with TTL=2
1159 self.nat44_add_address(self.nat_addr)
1160 flags = self.config_flags.NAT_IS_INSIDE
1161 self.vapi.nat44_interface_add_del_feature(
1162 sw_if_index=self.pg0.sw_if_index,
1163 flags=flags, is_add=1)
1164 self.vapi.nat44_interface_add_del_feature(
1165 sw_if_index=self.pg1.sw_if_index,
1168 # Client side - create sessions
1169 pkts = self.create_stream_in(self.pg0, self.pg1)
1170 self.pg0.add_stream(pkts)
1171 self.pg_enable_capture(self.pg_interfaces)
1174 # Server side - generate traffic
1175 capture = self.pg1.get_capture(len(pkts))
1176 self.verify_capture_out(capture)
1177 pkts = self.create_stream_out(self.pg1, ttl=2)
1178 self.pg1.add_stream(pkts)
1179 self.pg_enable_capture(self.pg_interfaces)
1182 # Client side - simulate ICMP type 11 response
1183 capture = self.pg0.get_capture(len(pkts))
1184 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1185 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1186 ICMP(type=11) / packet[IP] for packet in capture]
1187 self.pg0.add_stream(pkts)
1188 self.pg_enable_capture(self.pg_interfaces)
1191 # Server side - verify ICMP type 11 packets
1192 capture = self.pg1.get_capture(len(pkts))
1193 self.verify_capture_out_with_icmp_errors(capture)
1195 def test_ping_out_interface_from_outside(self):
1196 """ NAT44EI ping out interface from outside network """
1198 self.nat44_add_address(self.nat_addr)
1199 flags = self.config_flags.NAT_IS_INSIDE
1200 self.vapi.nat44_interface_add_del_feature(
1201 sw_if_index=self.pg0.sw_if_index,
1202 flags=flags, is_add=1)
1203 self.vapi.nat44_interface_add_del_feature(
1204 sw_if_index=self.pg1.sw_if_index,
1207 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1208 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1209 ICMP(id=self.icmp_id_out, type='echo-request'))
1211 self.pg1.add_stream(pkts)
1212 self.pg_enable_capture(self.pg_interfaces)
1214 capture = self.pg1.get_capture(len(pkts))
1217 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1218 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1219 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1220 self.assertEqual(packet[ICMP].type, 0) # echo reply
1222 self.logger.error(ppp("Unexpected or invalid packet "
1223 "(outside network):", packet))
1226 def test_ping_internal_host_from_outside(self):
1227 """ NAT44EI ping internal host from outside network """
1229 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1230 flags = self.config_flags.NAT_IS_INSIDE
1231 self.vapi.nat44_interface_add_del_feature(
1232 sw_if_index=self.pg0.sw_if_index,
1233 flags=flags, is_add=1)
1234 self.vapi.nat44_interface_add_del_feature(
1235 sw_if_index=self.pg1.sw_if_index,
1239 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1240 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1241 ICMP(id=self.icmp_id_out, type='echo-request'))
1242 self.pg1.add_stream(pkt)
1243 self.pg_enable_capture(self.pg_interfaces)
1245 capture = self.pg0.get_capture(1)
1246 self.verify_capture_in(capture, self.pg0)
1247 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1250 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1251 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1252 ICMP(id=self.icmp_id_in, type='echo-reply'))
1253 self.pg0.add_stream(pkt)
1254 self.pg_enable_capture(self.pg_interfaces)
1256 capture = self.pg1.get_capture(1)
1257 self.verify_capture_out(capture, same_port=True)
1258 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1260 def test_forwarding(self):
1261 """ NAT44EI forwarding test """
1263 flags = self.config_flags.NAT_IS_INSIDE
1264 self.vapi.nat44_interface_add_del_feature(
1265 sw_if_index=self.pg0.sw_if_index,
1266 flags=flags, is_add=1)
1267 self.vapi.nat44_interface_add_del_feature(
1268 sw_if_index=self.pg1.sw_if_index,
1270 self.vapi.nat44_forwarding_enable_disable(enable=1)
1272 real_ip = self.pg0.remote_ip4
1273 alias_ip = self.nat_addr
1274 flags = self.config_flags.NAT_IS_ADDR_ONLY
1275 self.vapi.nat44_add_del_static_mapping(is_add=1,
1276 local_ip_address=real_ip,
1277 external_ip_address=alias_ip,
1278 external_sw_if_index=0xFFFFFFFF,
1282 # static mapping match
1284 pkts = self.create_stream_out(self.pg1)
1285 self.pg1.add_stream(pkts)
1286 self.pg_enable_capture(self.pg_interfaces)
1288 capture = self.pg0.get_capture(len(pkts))
1289 self.verify_capture_in(capture, self.pg0)
1291 pkts = self.create_stream_in(self.pg0, self.pg1)
1292 self.pg0.add_stream(pkts)
1293 self.pg_enable_capture(self.pg_interfaces)
1295 capture = self.pg1.get_capture(len(pkts))
1296 self.verify_capture_out(capture, same_port=True)
1298 # no static mapping match
1300 host0 = self.pg0.remote_hosts[0]
1301 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1303 pkts = self.create_stream_out(self.pg1,
1304 dst_ip=self.pg0.remote_ip4,
1305 use_inside_ports=True)
1306 self.pg1.add_stream(pkts)
1307 self.pg_enable_capture(self.pg_interfaces)
1309 capture = self.pg0.get_capture(len(pkts))
1310 self.verify_capture_in(capture, self.pg0)
1312 pkts = self.create_stream_in(self.pg0, self.pg1)
1313 self.pg0.add_stream(pkts)
1314 self.pg_enable_capture(self.pg_interfaces)
1316 capture = self.pg1.get_capture(len(pkts))
1317 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1320 self.pg0.remote_hosts[0] = host0
1323 self.vapi.nat44_forwarding_enable_disable(enable=0)
1324 flags = self.config_flags.NAT_IS_ADDR_ONLY
1325 self.vapi.nat44_add_del_static_mapping(
1327 local_ip_address=real_ip,
1328 external_ip_address=alias_ip,
1329 external_sw_if_index=0xFFFFFFFF,
1332 def test_static_in(self):
1333 """ NAT44EI 1:1 NAT initialized from inside network """
1335 nat_ip = "10.0.0.10"
1336 self.tcp_port_out = 6303
1337 self.udp_port_out = 6304
1338 self.icmp_id_out = 6305
1340 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1341 flags = self.config_flags.NAT_IS_INSIDE
1342 self.vapi.nat44_interface_add_del_feature(
1343 sw_if_index=self.pg0.sw_if_index,
1344 flags=flags, is_add=1)
1345 self.vapi.nat44_interface_add_del_feature(
1346 sw_if_index=self.pg1.sw_if_index,
1348 sm = self.vapi.nat44_static_mapping_dump()
1349 self.assertEqual(len(sm), 1)
1350 self.assertEqual(sm[0].tag, '')
1351 self.assertEqual(sm[0].protocol, 0)
1352 self.assertEqual(sm[0].local_port, 0)
1353 self.assertEqual(sm[0].external_port, 0)
1356 pkts = self.create_stream_in(self.pg0, self.pg1)
1357 self.pg0.add_stream(pkts)
1358 self.pg_enable_capture(self.pg_interfaces)
1360 capture = self.pg1.get_capture(len(pkts))
1361 self.verify_capture_out(capture, nat_ip, True)
1364 pkts = self.create_stream_out(self.pg1, nat_ip)
1365 self.pg1.add_stream(pkts)
1366 self.pg_enable_capture(self.pg_interfaces)
1368 capture = self.pg0.get_capture(len(pkts))
1369 self.verify_capture_in(capture, self.pg0)
1371 def test_static_out(self):
1372 """ NAT44EI 1:1 NAT initialized from outside network """
1374 nat_ip = "10.0.0.20"
1375 self.tcp_port_out = 6303
1376 self.udp_port_out = 6304
1377 self.icmp_id_out = 6305
1380 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1381 flags = self.config_flags.NAT_IS_INSIDE
1382 self.vapi.nat44_interface_add_del_feature(
1383 sw_if_index=self.pg0.sw_if_index,
1384 flags=flags, is_add=1)
1385 self.vapi.nat44_interface_add_del_feature(
1386 sw_if_index=self.pg1.sw_if_index,
1388 sm = self.vapi.nat44_static_mapping_dump()
1389 self.assertEqual(len(sm), 1)
1390 self.assertEqual(sm[0].tag, tag)
1393 pkts = self.create_stream_out(self.pg1, nat_ip)
1394 self.pg1.add_stream(pkts)
1395 self.pg_enable_capture(self.pg_interfaces)
1397 capture = self.pg0.get_capture(len(pkts))
1398 self.verify_capture_in(capture, self.pg0)
1401 pkts = self.create_stream_in(self.pg0, self.pg1)
1402 self.pg0.add_stream(pkts)
1403 self.pg_enable_capture(self.pg_interfaces)
1405 capture = self.pg1.get_capture(len(pkts))
1406 self.verify_capture_out(capture, nat_ip, True)
1408 def test_static_with_port_in(self):
1409 """ NAT44EI 1:1 NAPT initialized from inside network """
1411 self.tcp_port_out = 3606
1412 self.udp_port_out = 3607
1413 self.icmp_id_out = 3608
1415 self.nat44_add_address(self.nat_addr)
1416 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1417 self.tcp_port_in, self.tcp_port_out,
1418 proto=IP_PROTOS.tcp)
1419 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1420 self.udp_port_in, self.udp_port_out,
1421 proto=IP_PROTOS.udp)
1422 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1423 self.icmp_id_in, self.icmp_id_out,
1424 proto=IP_PROTOS.icmp)
1425 flags = self.config_flags.NAT_IS_INSIDE
1426 self.vapi.nat44_interface_add_del_feature(
1427 sw_if_index=self.pg0.sw_if_index,
1428 flags=flags, is_add=1)
1429 self.vapi.nat44_interface_add_del_feature(
1430 sw_if_index=self.pg1.sw_if_index,
1434 pkts = self.create_stream_in(self.pg0, self.pg1)
1435 self.pg0.add_stream(pkts)
1436 self.pg_enable_capture(self.pg_interfaces)
1438 capture = self.pg1.get_capture(len(pkts))
1439 self.verify_capture_out(capture)
1442 pkts = self.create_stream_out(self.pg1)
1443 self.pg1.add_stream(pkts)
1444 self.pg_enable_capture(self.pg_interfaces)
1446 capture = self.pg0.get_capture(len(pkts))
1447 self.verify_capture_in(capture, self.pg0)
1449 def test_static_with_port_out(self):
1450 """ NAT44EI 1:1 NAPT initialized from outside network """
1452 self.tcp_port_out = 30606
1453 self.udp_port_out = 30607
1454 self.icmp_id_out = 30608
1456 self.nat44_add_address(self.nat_addr)
1457 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1458 self.tcp_port_in, self.tcp_port_out,
1459 proto=IP_PROTOS.tcp)
1460 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1461 self.udp_port_in, self.udp_port_out,
1462 proto=IP_PROTOS.udp)
1463 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1464 self.icmp_id_in, self.icmp_id_out,
1465 proto=IP_PROTOS.icmp)
1466 flags = self.config_flags.NAT_IS_INSIDE
1467 self.vapi.nat44_interface_add_del_feature(
1468 sw_if_index=self.pg0.sw_if_index,
1469 flags=flags, is_add=1)
1470 self.vapi.nat44_interface_add_del_feature(
1471 sw_if_index=self.pg1.sw_if_index,
1475 pkts = self.create_stream_out(self.pg1)
1476 self.pg1.add_stream(pkts)
1477 self.pg_enable_capture(self.pg_interfaces)
1479 capture = self.pg0.get_capture(len(pkts))
1480 self.verify_capture_in(capture, self.pg0)
1483 pkts = self.create_stream_in(self.pg0, self.pg1)
1484 self.pg0.add_stream(pkts)
1485 self.pg_enable_capture(self.pg_interfaces)
1487 capture = self.pg1.get_capture(len(pkts))
1488 self.verify_capture_out(capture)
1490 def test_static_vrf_aware(self):
1491 """ NAT44EI 1:1 NAT VRF awareness """
1493 nat_ip1 = "10.0.0.30"
1494 nat_ip2 = "10.0.0.40"
1495 self.tcp_port_out = 6303
1496 self.udp_port_out = 6304
1497 self.icmp_id_out = 6305
1499 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1501 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1503 flags = self.config_flags.NAT_IS_INSIDE
1504 self.vapi.nat44_interface_add_del_feature(
1505 sw_if_index=self.pg3.sw_if_index,
1507 self.vapi.nat44_interface_add_del_feature(
1508 sw_if_index=self.pg0.sw_if_index,
1509 flags=flags, is_add=1)
1510 self.vapi.nat44_interface_add_del_feature(
1511 sw_if_index=self.pg4.sw_if_index,
1512 flags=flags, is_add=1)
1514 # inside interface VRF match NAT44EI static mapping VRF
1515 pkts = self.create_stream_in(self.pg4, self.pg3)
1516 self.pg4.add_stream(pkts)
1517 self.pg_enable_capture(self.pg_interfaces)
1519 capture = self.pg3.get_capture(len(pkts))
1520 self.verify_capture_out(capture, nat_ip1, True)
1522 # inside interface VRF don't match NAT44EI static mapping VRF (packets
1524 pkts = self.create_stream_in(self.pg0, self.pg3)
1525 self.pg0.add_stream(pkts)
1526 self.pg_enable_capture(self.pg_interfaces)
1528 self.pg3.assert_nothing_captured()
1530 def test_dynamic_to_static(self):
1531 """ NAT44EI Switch from dynamic translation to 1:1NAT """
1532 nat_ip = "10.0.0.10"
1533 self.tcp_port_out = 6303
1534 self.udp_port_out = 6304
1535 self.icmp_id_out = 6305
1537 self.nat44_add_address(self.nat_addr)
1538 flags = self.config_flags.NAT_IS_INSIDE
1539 self.vapi.nat44_interface_add_del_feature(
1540 sw_if_index=self.pg0.sw_if_index,
1541 flags=flags, is_add=1)
1542 self.vapi.nat44_interface_add_del_feature(
1543 sw_if_index=self.pg1.sw_if_index,
1547 pkts = self.create_stream_in(self.pg0, self.pg1)
1548 self.pg0.add_stream(pkts)
1549 self.pg_enable_capture(self.pg_interfaces)
1551 capture = self.pg1.get_capture(len(pkts))
1552 self.verify_capture_out(capture)
1555 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1556 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
1557 self.assertEqual(len(sessions), 0)
1558 pkts = self.create_stream_in(self.pg0, self.pg1)
1559 self.pg0.add_stream(pkts)
1560 self.pg_enable_capture(self.pg_interfaces)
1562 capture = self.pg1.get_capture(len(pkts))
1563 self.verify_capture_out(capture, nat_ip, True)
1565 def test_identity_nat(self):
1566 """ NAT44EI Identity NAT """
1567 flags = self.config_flags.NAT_IS_ADDR_ONLY
1568 self.vapi.nat44_add_del_identity_mapping(
1569 ip_address=self.pg0.remote_ip4, sw_if_index=0xFFFFFFFF,
1570 flags=flags, is_add=1)
1571 flags = self.config_flags.NAT_IS_INSIDE
1572 self.vapi.nat44_interface_add_del_feature(
1573 sw_if_index=self.pg0.sw_if_index,
1574 flags=flags, is_add=1)
1575 self.vapi.nat44_interface_add_del_feature(
1576 sw_if_index=self.pg1.sw_if_index,
1579 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1580 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1581 TCP(sport=12345, dport=56789))
1582 self.pg1.add_stream(p)
1583 self.pg_enable_capture(self.pg_interfaces)
1585 capture = self.pg0.get_capture(1)
1590 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1591 self.assertEqual(ip.src, self.pg1.remote_ip4)
1592 self.assertEqual(tcp.dport, 56789)
1593 self.assertEqual(tcp.sport, 12345)
1594 self.assert_packet_checksums_valid(p)
1596 self.logger.error(ppp("Unexpected or invalid packet:", p))
1599 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
1600 self.assertEqual(len(sessions), 0)
1601 flags = self.config_flags.NAT_IS_ADDR_ONLY
1602 self.vapi.nat44_add_del_identity_mapping(
1603 ip_address=self.pg0.remote_ip4, sw_if_index=0xFFFFFFFF,
1604 flags=flags, vrf_id=1, is_add=1)
1605 identity_mappings = self.vapi.nat44_identity_mapping_dump()
1606 self.assertEqual(len(identity_mappings), 2)
1608 def test_multiple_inside_interfaces(self):
1609 """ NAT44EI multiple non-overlapping address space inside interfaces
1612 self.nat44_add_address(self.nat_addr)
1613 flags = self.config_flags.NAT_IS_INSIDE
1614 self.vapi.nat44_interface_add_del_feature(
1615 sw_if_index=self.pg0.sw_if_index,
1616 flags=flags, is_add=1)
1617 self.vapi.nat44_interface_add_del_feature(
1618 sw_if_index=self.pg1.sw_if_index,
1619 flags=flags, is_add=1)
1620 self.vapi.nat44_interface_add_del_feature(
1621 sw_if_index=self.pg3.sw_if_index,
1624 # between two NAT44EI inside interfaces (no translation)
1625 pkts = self.create_stream_in(self.pg0, self.pg1)
1626 self.pg0.add_stream(pkts)
1627 self.pg_enable_capture(self.pg_interfaces)
1629 capture = self.pg1.get_capture(len(pkts))
1630 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1632 # from inside to interface without translation
1633 pkts = self.create_stream_in(self.pg0, self.pg2)
1634 self.pg0.add_stream(pkts)
1635 self.pg_enable_capture(self.pg_interfaces)
1637 capture = self.pg2.get_capture(len(pkts))
1638 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1640 # in2out 1st interface
1641 pkts = self.create_stream_in(self.pg0, self.pg3)
1642 self.pg0.add_stream(pkts)
1643 self.pg_enable_capture(self.pg_interfaces)
1645 capture = self.pg3.get_capture(len(pkts))
1646 self.verify_capture_out(capture)
1648 # out2in 1st interface
1649 pkts = self.create_stream_out(self.pg3)
1650 self.pg3.add_stream(pkts)
1651 self.pg_enable_capture(self.pg_interfaces)
1653 capture = self.pg0.get_capture(len(pkts))
1654 self.verify_capture_in(capture, self.pg0)
1656 # in2out 2nd interface
1657 pkts = self.create_stream_in(self.pg1, self.pg3)
1658 self.pg1.add_stream(pkts)
1659 self.pg_enable_capture(self.pg_interfaces)
1661 capture = self.pg3.get_capture(len(pkts))
1662 self.verify_capture_out(capture)
1664 # out2in 2nd interface
1665 pkts = self.create_stream_out(self.pg3)
1666 self.pg3.add_stream(pkts)
1667 self.pg_enable_capture(self.pg_interfaces)
1669 capture = self.pg1.get_capture(len(pkts))
1670 self.verify_capture_in(capture, self.pg1)
1672 def test_inside_overlapping_interfaces(self):
1673 """ NAT44EI multiple inside interfaces with overlapping address space
1676 static_nat_ip = "10.0.0.10"
1677 self.nat44_add_address(self.nat_addr)
1678 flags = self.config_flags.NAT_IS_INSIDE
1679 self.vapi.nat44_interface_add_del_feature(
1680 sw_if_index=self.pg3.sw_if_index,
1682 self.vapi.nat44_interface_add_del_feature(
1683 sw_if_index=self.pg4.sw_if_index,
1684 flags=flags, is_add=1)
1685 self.vapi.nat44_interface_add_del_feature(
1686 sw_if_index=self.pg5.sw_if_index,
1687 flags=flags, is_add=1)
1688 self.vapi.nat44_interface_add_del_feature(
1689 sw_if_index=self.pg6.sw_if_index,
1690 flags=flags, is_add=1)
1691 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1694 # between NAT44EI inside interfaces with same VRF (no translation)
1695 pkts = self.create_stream_in(self.pg4, self.pg5)
1696 self.pg4.add_stream(pkts)
1697 self.pg_enable_capture(self.pg_interfaces)
1699 capture = self.pg5.get_capture(len(pkts))
1700 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1702 # between NAT44EI inside interfaces with different VRF (hairpinning)
1703 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1704 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1705 TCP(sport=1234, dport=5678))
1706 self.pg4.add_stream(p)
1707 self.pg_enable_capture(self.pg_interfaces)
1709 capture = self.pg6.get_capture(1)
1714 self.assertEqual(ip.src, self.nat_addr)
1715 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1716 self.assertNotEqual(tcp.sport, 1234)
1717 self.assertEqual(tcp.dport, 5678)
1719 self.logger.error(ppp("Unexpected or invalid packet:", p))
1722 # in2out 1st interface
1723 pkts = self.create_stream_in(self.pg4, self.pg3)
1724 self.pg4.add_stream(pkts)
1725 self.pg_enable_capture(self.pg_interfaces)
1727 capture = self.pg3.get_capture(len(pkts))
1728 self.verify_capture_out(capture)
1730 # out2in 1st interface
1731 pkts = self.create_stream_out(self.pg3)
1732 self.pg3.add_stream(pkts)
1733 self.pg_enable_capture(self.pg_interfaces)
1735 capture = self.pg4.get_capture(len(pkts))
1736 self.verify_capture_in(capture, self.pg4)
1738 # in2out 2nd interface
1739 pkts = self.create_stream_in(self.pg5, self.pg3)
1740 self.pg5.add_stream(pkts)
1741 self.pg_enable_capture(self.pg_interfaces)
1743 capture = self.pg3.get_capture(len(pkts))
1744 self.verify_capture_out(capture)
1746 # out2in 2nd interface
1747 pkts = self.create_stream_out(self.pg3)
1748 self.pg3.add_stream(pkts)
1749 self.pg_enable_capture(self.pg_interfaces)
1751 capture = self.pg5.get_capture(len(pkts))
1752 self.verify_capture_in(capture, self.pg5)
1755 addresses = self.vapi.nat44_address_dump()
1756 self.assertEqual(len(addresses), 1)
1757 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4, 10)
1758 self.assertEqual(len(sessions), 3)
1759 for session in sessions:
1760 self.assertFalse(session.flags & self.config_flags.NAT_IS_STATIC)
1761 self.assertEqual(str(session.inside_ip_address),
1762 self.pg5.remote_ip4)
1763 self.assertEqual(session.outside_ip_address,
1764 addresses[0].ip_address)
1765 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1766 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1767 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1768 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1769 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1770 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1771 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1772 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1773 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1775 # in2out 3rd interface
1776 pkts = self.create_stream_in(self.pg6, self.pg3)
1777 self.pg6.add_stream(pkts)
1778 self.pg_enable_capture(self.pg_interfaces)
1780 capture = self.pg3.get_capture(len(pkts))
1781 self.verify_capture_out(capture, static_nat_ip, True)
1783 # out2in 3rd interface
1784 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1785 self.pg3.add_stream(pkts)
1786 self.pg_enable_capture(self.pg_interfaces)
1788 capture = self.pg6.get_capture(len(pkts))
1789 self.verify_capture_in(capture, self.pg6)
1791 # general user and session dump verifications
1792 users = self.vapi.nat44_user_dump()
1793 self.assertGreaterEqual(len(users), 3)
1794 addresses = self.vapi.nat44_address_dump()
1795 self.assertEqual(len(addresses), 1)
1797 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1799 for session in sessions:
1800 self.assertEqual(user.ip_address, session.inside_ip_address)
1801 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1802 self.assertTrue(session.protocol in
1803 [IP_PROTOS.tcp, IP_PROTOS.udp,
1805 self.assertFalse(session.flags &
1806 self.config_flags.NAT_IS_EXT_HOST_VALID)
1809 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4, 10)
1810 self.assertGreaterEqual(len(sessions), 4)
1811 for session in sessions:
1812 self.assertFalse(session.flags & self.config_flags.NAT_IS_STATIC)
1813 self.assertEqual(str(session.inside_ip_address),
1814 self.pg4.remote_ip4)
1815 self.assertEqual(session.outside_ip_address,
1816 addresses[0].ip_address)
1819 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4, 20)
1820 self.assertGreaterEqual(len(sessions), 3)
1821 for session in sessions:
1822 self.assertTrue(session.flags & self.config_flags.NAT_IS_STATIC)
1823 self.assertEqual(str(session.inside_ip_address),
1824 self.pg6.remote_ip4)
1825 self.assertEqual(str(session.outside_ip_address),
1827 self.assertTrue(session.inside_port in
1828 [self.tcp_port_in, self.udp_port_in,
1831 def test_hairpinning(self):
1832 """ NAT44EI hairpinning - 1:1 NAPT """
1834 host = self.pg0.remote_hosts[0]
1835 server = self.pg0.remote_hosts[1]
1838 server_in_port = 5678
1839 server_out_port = 8765
1841 self.nat44_add_address(self.nat_addr)
1842 flags = self.config_flags.NAT_IS_INSIDE
1843 self.vapi.nat44_interface_add_del_feature(
1844 sw_if_index=self.pg0.sw_if_index,
1845 flags=flags, is_add=1)
1846 self.vapi.nat44_interface_add_del_feature(
1847 sw_if_index=self.pg1.sw_if_index,
1850 # add static mapping for server
1851 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1852 server_in_port, server_out_port,
1853 proto=IP_PROTOS.tcp)
1855 cnt = self.statistics.get_counter('/nat44/hairpinning')[0]
1856 # send packet from host to server
1857 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1858 IP(src=host.ip4, dst=self.nat_addr) /
1859 TCP(sport=host_in_port, dport=server_out_port))
1860 self.pg0.add_stream(p)
1861 self.pg_enable_capture(self.pg_interfaces)
1863 capture = self.pg0.get_capture(1)
1868 self.assertEqual(ip.src, self.nat_addr)
1869 self.assertEqual(ip.dst, server.ip4)
1870 self.assertNotEqual(tcp.sport, host_in_port)
1871 self.assertEqual(tcp.dport, server_in_port)
1872 self.assert_packet_checksums_valid(p)
1873 host_out_port = tcp.sport
1875 self.logger.error(ppp("Unexpected or invalid packet:", p))
1878 after = self.statistics.get_counter('/nat44/hairpinning')[0]
1879 if_idx = self.pg0.sw_if_index
1880 self.assertEqual(after[if_idx] - cnt[if_idx], 1)
1882 # send reply from server to host
1883 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1884 IP(src=server.ip4, dst=self.nat_addr) /
1885 TCP(sport=server_in_port, dport=host_out_port))
1886 self.pg0.add_stream(p)
1887 self.pg_enable_capture(self.pg_interfaces)
1889 capture = self.pg0.get_capture(1)
1894 self.assertEqual(ip.src, self.nat_addr)
1895 self.assertEqual(ip.dst, host.ip4)
1896 self.assertEqual(tcp.sport, server_out_port)
1897 self.assertEqual(tcp.dport, host_in_port)
1898 self.assert_packet_checksums_valid(p)
1900 self.logger.error(ppp("Unexpected or invalid packet:", p))
1903 after = self.statistics.get_counter('/nat44/hairpinning')[0]
1904 if_idx = self.pg0.sw_if_index
1905 self.assertEqual(after[if_idx] - cnt[if_idx], 2)
1907 def test_hairpinning2(self):
1908 """ NAT44EI hairpinning - 1:1 NAT"""
1910 server1_nat_ip = "10.0.0.10"
1911 server2_nat_ip = "10.0.0.11"
1912 host = self.pg0.remote_hosts[0]
1913 server1 = self.pg0.remote_hosts[1]
1914 server2 = self.pg0.remote_hosts[2]
1915 server_tcp_port = 22
1916 server_udp_port = 20
1918 self.nat44_add_address(self.nat_addr)
1919 flags = self.config_flags.NAT_IS_INSIDE
1920 self.vapi.nat44_interface_add_del_feature(
1921 sw_if_index=self.pg0.sw_if_index,
1922 flags=flags, is_add=1)
1923 self.vapi.nat44_interface_add_del_feature(
1924 sw_if_index=self.pg1.sw_if_index,
1927 # add static mapping for servers
1928 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1929 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1933 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1934 IP(src=host.ip4, dst=server1_nat_ip) /
1935 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1937 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1938 IP(src=host.ip4, dst=server1_nat_ip) /
1939 UDP(sport=self.udp_port_in, dport=server_udp_port))
1941 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1942 IP(src=host.ip4, dst=server1_nat_ip) /
1943 ICMP(id=self.icmp_id_in, type='echo-request'))
1945 self.pg0.add_stream(pkts)
1946 self.pg_enable_capture(self.pg_interfaces)
1948 capture = self.pg0.get_capture(len(pkts))
1949 for packet in capture:
1951 self.assertEqual(packet[IP].src, self.nat_addr)
1952 self.assertEqual(packet[IP].dst, server1.ip4)
1953 if packet.haslayer(TCP):
1954 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1955 self.assertEqual(packet[TCP].dport, server_tcp_port)
1956 self.tcp_port_out = packet[TCP].sport
1957 self.assert_packet_checksums_valid(packet)
1958 elif packet.haslayer(UDP):
1959 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1960 self.assertEqual(packet[UDP].dport, server_udp_port)
1961 self.udp_port_out = packet[UDP].sport
1963 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1964 self.icmp_id_out = packet[ICMP].id
1966 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1971 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1972 IP(src=server1.ip4, dst=self.nat_addr) /
1973 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1975 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1976 IP(src=server1.ip4, dst=self.nat_addr) /
1977 UDP(sport=server_udp_port, dport=self.udp_port_out))
1979 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1980 IP(src=server1.ip4, dst=self.nat_addr) /
1981 ICMP(id=self.icmp_id_out, type='echo-reply'))
1983 self.pg0.add_stream(pkts)
1984 self.pg_enable_capture(self.pg_interfaces)
1986 capture = self.pg0.get_capture(len(pkts))
1987 for packet in capture:
1989 self.assertEqual(packet[IP].src, server1_nat_ip)
1990 self.assertEqual(packet[IP].dst, host.ip4)
1991 if packet.haslayer(TCP):
1992 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1993 self.assertEqual(packet[TCP].sport, server_tcp_port)
1994 self.assert_packet_checksums_valid(packet)
1995 elif packet.haslayer(UDP):
1996 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1997 self.assertEqual(packet[UDP].sport, server_udp_port)
1999 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2001 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2004 # server2 to server1
2006 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2007 IP(src=server2.ip4, dst=server1_nat_ip) /
2008 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2010 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2011 IP(src=server2.ip4, dst=server1_nat_ip) /
2012 UDP(sport=self.udp_port_in, dport=server_udp_port))
2014 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2015 IP(src=server2.ip4, dst=server1_nat_ip) /
2016 ICMP(id=self.icmp_id_in, type='echo-request'))
2018 self.pg0.add_stream(pkts)
2019 self.pg_enable_capture(self.pg_interfaces)
2021 capture = self.pg0.get_capture(len(pkts))
2022 for packet in capture:
2024 self.assertEqual(packet[IP].src, server2_nat_ip)
2025 self.assertEqual(packet[IP].dst, server1.ip4)
2026 if packet.haslayer(TCP):
2027 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2028 self.assertEqual(packet[TCP].dport, server_tcp_port)
2029 self.tcp_port_out = packet[TCP].sport
2030 self.assert_packet_checksums_valid(packet)
2031 elif packet.haslayer(UDP):
2032 self.assertEqual(packet[UDP].sport, self.udp_port_in)
2033 self.assertEqual(packet[UDP].dport, server_udp_port)
2034 self.udp_port_out = packet[UDP].sport
2036 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2037 self.icmp_id_out = packet[ICMP].id
2039 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2042 # server1 to server2
2044 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2045 IP(src=server1.ip4, dst=server2_nat_ip) /
2046 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2048 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2049 IP(src=server1.ip4, dst=server2_nat_ip) /
2050 UDP(sport=server_udp_port, dport=self.udp_port_out))
2052 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2053 IP(src=server1.ip4, dst=server2_nat_ip) /
2054 ICMP(id=self.icmp_id_out, type='echo-reply'))
2056 self.pg0.add_stream(pkts)
2057 self.pg_enable_capture(self.pg_interfaces)
2059 capture = self.pg0.get_capture(len(pkts))
2060 for packet in capture:
2062 self.assertEqual(packet[IP].src, server1_nat_ip)
2063 self.assertEqual(packet[IP].dst, server2.ip4)
2064 if packet.haslayer(TCP):
2065 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2066 self.assertEqual(packet[TCP].sport, server_tcp_port)
2067 self.assert_packet_checksums_valid(packet)
2068 elif packet.haslayer(UDP):
2069 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2070 self.assertEqual(packet[UDP].sport, server_udp_port)
2072 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2074 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2077 def test_hairpinning_avoid_inf_loop(self):
2078 """ NAT44 hairpinning - 1:1 NAPT avoid infinite loop """
2080 host = self.pg0.remote_hosts[0]
2081 server = self.pg0.remote_hosts[1]
2084 server_in_port = 5678
2085 server_out_port = 8765
2087 self.nat44_add_address(self.nat_addr)
2088 flags = self.config_flags.NAT_IS_INSIDE
2089 self.vapi.nat44_interface_add_del_feature(
2090 sw_if_index=self.pg0.sw_if_index,
2091 flags=flags, is_add=1)
2092 self.vapi.nat44_interface_add_del_feature(
2093 sw_if_index=self.pg1.sw_if_index,
2096 # add static mapping for server
2097 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2098 server_in_port, server_out_port,
2099 proto=IP_PROTOS.tcp)
2101 # add another static mapping that maps pg0.local_ip4 address to itself
2102 self.nat44_add_static_mapping(self.pg0.local_ip4, self.pg0.local_ip4)
2104 # send packet from host to VPP (the packet should get dropped)
2105 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2106 IP(src=host.ip4, dst=self.pg0.local_ip4) /
2107 TCP(sport=host_in_port, dport=server_out_port))
2108 self.pg0.add_stream(p)
2109 self.pg_enable_capture(self.pg_interfaces)
2111 # Here VPP used to crash due to an infinite loop
2113 cnt = self.statistics.get_counter('/nat44/hairpinning')[0]
2114 # send packet from host to server
2115 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2116 IP(src=host.ip4, dst=self.nat_addr) /
2117 TCP(sport=host_in_port, dport=server_out_port))
2118 self.pg0.add_stream(p)
2119 self.pg_enable_capture(self.pg_interfaces)
2121 capture = self.pg0.get_capture(1)
2126 self.assertEqual(ip.src, self.nat_addr)
2127 self.assertEqual(ip.dst, server.ip4)
2128 self.assertNotEqual(tcp.sport, host_in_port)
2129 self.assertEqual(tcp.dport, server_in_port)
2130 self.assert_packet_checksums_valid(p)
2131 host_out_port = tcp.sport
2133 self.logger.error(ppp("Unexpected or invalid packet:", p))
2136 after = self.statistics.get_counter('/nat44/hairpinning')[0]
2137 if_idx = self.pg0.sw_if_index
2138 self.assertEqual(after[if_idx] - cnt[if_idx], 1)
2140 # send reply from server to host
2141 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2142 IP(src=server.ip4, dst=self.nat_addr) /
2143 TCP(sport=server_in_port, dport=host_out_port))
2144 self.pg0.add_stream(p)
2145 self.pg_enable_capture(self.pg_interfaces)
2147 capture = self.pg0.get_capture(1)
2152 self.assertEqual(ip.src, self.nat_addr)
2153 self.assertEqual(ip.dst, host.ip4)
2154 self.assertEqual(tcp.sport, server_out_port)
2155 self.assertEqual(tcp.dport, host_in_port)
2156 self.assert_packet_checksums_valid(p)
2158 self.logger.error(ppp("Unexpected or invalid packet:", p))
2161 after = self.statistics.get_counter('/nat44/hairpinning')[0]
2162 if_idx = self.pg0.sw_if_index
2163 self.assertEqual(after[if_idx] - cnt[if_idx], 2)
2165 def test_interface_addr(self):
2166 """ NAT44EI acquire addresses from interface """
2167 self.vapi.nat44_add_del_interface_addr(
2169 sw_if_index=self.pg7.sw_if_index)
2171 # no address in NAT pool
2172 addresses = self.vapi.nat44_address_dump()
2173 self.assertEqual(0, len(addresses))
2175 # configure interface address and check NAT address pool
2176 self.pg7.config_ip4()
2177 addresses = self.vapi.nat44_address_dump()
2178 self.assertEqual(1, len(addresses))
2179 self.assertEqual(str(addresses[0].ip_address), self.pg7.local_ip4)
2181 # remove interface address and check NAT address pool
2182 self.pg7.unconfig_ip4()
2183 addresses = self.vapi.nat44_address_dump()
2184 self.assertEqual(0, len(addresses))
2186 def test_interface_addr_static_mapping(self):
2187 """ NAT44EI Static mapping with addresses from interface """
2190 self.vapi.nat44_add_del_interface_addr(
2192 sw_if_index=self.pg7.sw_if_index)
2193 self.nat44_add_static_mapping(
2195 external_sw_if_index=self.pg7.sw_if_index,
2198 # static mappings with external interface
2199 static_mappings = self.vapi.nat44_static_mapping_dump()
2200 self.assertEqual(1, len(static_mappings))
2201 self.assertEqual(self.pg7.sw_if_index,
2202 static_mappings[0].external_sw_if_index)
2203 self.assertEqual(static_mappings[0].tag, tag)
2205 # configure interface address and check static mappings
2206 self.pg7.config_ip4()
2207 static_mappings = self.vapi.nat44_static_mapping_dump()
2208 self.assertEqual(2, len(static_mappings))
2210 for sm in static_mappings:
2211 if sm.external_sw_if_index == 0xFFFFFFFF:
2212 self.assertEqual(str(sm.external_ip_address),
2214 self.assertEqual(sm.tag, tag)
2216 self.assertTrue(resolved)
2218 # remove interface address and check static mappings
2219 self.pg7.unconfig_ip4()
2220 static_mappings = self.vapi.nat44_static_mapping_dump()
2221 self.assertEqual(1, len(static_mappings))
2222 self.assertEqual(self.pg7.sw_if_index,
2223 static_mappings[0].external_sw_if_index)
2224 self.assertEqual(static_mappings[0].tag, tag)
2226 # configure interface address again and check static mappings
2227 self.pg7.config_ip4()
2228 static_mappings = self.vapi.nat44_static_mapping_dump()
2229 self.assertEqual(2, len(static_mappings))
2231 for sm in static_mappings:
2232 if sm.external_sw_if_index == 0xFFFFFFFF:
2233 self.assertEqual(str(sm.external_ip_address),
2235 self.assertEqual(sm.tag, tag)
2237 self.assertTrue(resolved)
2239 # remove static mapping
2240 self.nat44_add_static_mapping(
2242 external_sw_if_index=self.pg7.sw_if_index,
2245 static_mappings = self.vapi.nat44_static_mapping_dump()
2246 self.assertEqual(0, len(static_mappings))
2248 def test_interface_addr_identity_nat(self):
2249 """ NAT44EI Identity NAT with addresses from interface """
2252 self.vapi.nat44_add_del_interface_addr(
2254 sw_if_index=self.pg7.sw_if_index)
2255 self.vapi.nat44_add_del_identity_mapping(
2257 sw_if_index=self.pg7.sw_if_index,
2259 protocol=IP_PROTOS.tcp,
2262 # identity mappings with external interface
2263 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2264 self.assertEqual(1, len(identity_mappings))
2265 self.assertEqual(self.pg7.sw_if_index,
2266 identity_mappings[0].sw_if_index)
2268 # configure interface address and check identity mappings
2269 self.pg7.config_ip4()
2270 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2272 self.assertEqual(2, len(identity_mappings))
2273 for sm in identity_mappings:
2274 if sm.sw_if_index == 0xFFFFFFFF:
2275 self.assertEqual(str(identity_mappings[0].ip_address),
2277 self.assertEqual(port, identity_mappings[0].port)
2278 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2280 self.assertTrue(resolved)
2282 # remove interface address and check identity mappings
2283 self.pg7.unconfig_ip4()
2284 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2285 self.assertEqual(1, len(identity_mappings))
2286 self.assertEqual(self.pg7.sw_if_index,
2287 identity_mappings[0].sw_if_index)
2289 def test_ipfix_nat44_sess(self):
2290 """ NAT44EI IPFIX logging NAT44EI session created/deleted """
2291 self.ipfix_domain_id = 10
2292 self.ipfix_src_port = 20202
2293 collector_port = 30303
2294 bind_layers(UDP, IPFIX, dport=30303)
2295 self.nat44_add_address(self.nat_addr)
2296 flags = self.config_flags.NAT_IS_INSIDE
2297 self.vapi.nat44_interface_add_del_feature(
2298 sw_if_index=self.pg0.sw_if_index,
2299 flags=flags, is_add=1)
2300 self.vapi.nat44_interface_add_del_feature(
2301 sw_if_index=self.pg1.sw_if_index,
2303 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
2304 src_address=self.pg3.local_ip4,
2306 template_interval=10,
2307 collector_port=collector_port)
2308 self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
2309 src_port=self.ipfix_src_port,
2312 pkts = self.create_stream_in(self.pg0, self.pg1)
2313 self.pg0.add_stream(pkts)
2314 self.pg_enable_capture(self.pg_interfaces)
2316 capture = self.pg1.get_capture(len(pkts))
2317 self.verify_capture_out(capture)
2318 self.nat44_add_address(self.nat_addr, is_add=0)
2319 self.vapi.ipfix_flush()
2320 capture = self.pg3.get_capture(7)
2321 ipfix = IPFIXDecoder()
2322 # first load template
2324 self.assertTrue(p.haslayer(IPFIX))
2325 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2326 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2327 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2328 self.assertEqual(p[UDP].dport, collector_port)
2329 self.assertEqual(p[IPFIX].observationDomainID,
2330 self.ipfix_domain_id)
2331 if p.haslayer(Template):
2332 ipfix.add_template(p.getlayer(Template))
2333 # verify events in data set
2335 if p.haslayer(Data):
2336 data = ipfix.decode_data_set(p.getlayer(Set))
2337 self.verify_ipfix_nat44_ses(data)
2339 def test_ipfix_addr_exhausted(self):
2340 """ NAT44EI IPFIX logging NAT addresses exhausted """
2341 flags = self.config_flags.NAT_IS_INSIDE
2342 self.vapi.nat44_interface_add_del_feature(
2343 sw_if_index=self.pg0.sw_if_index,
2344 flags=flags, is_add=1)
2345 self.vapi.nat44_interface_add_del_feature(
2346 sw_if_index=self.pg1.sw_if_index,
2348 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
2349 src_address=self.pg3.local_ip4,
2351 template_interval=10)
2352 self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
2353 src_port=self.ipfix_src_port,
2356 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2357 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2359 self.pg0.add_stream(p)
2360 self.pg_enable_capture(self.pg_interfaces)
2362 self.pg1.assert_nothing_captured()
2364 self.vapi.ipfix_flush()
2365 capture = self.pg3.get_capture(7)
2366 ipfix = IPFIXDecoder()
2367 # first load template
2369 self.assertTrue(p.haslayer(IPFIX))
2370 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2371 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2372 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2373 self.assertEqual(p[UDP].dport, 4739)
2374 self.assertEqual(p[IPFIX].observationDomainID,
2375 self.ipfix_domain_id)
2376 if p.haslayer(Template):
2377 ipfix.add_template(p.getlayer(Template))
2378 # verify events in data set
2380 if p.haslayer(Data):
2381 data = ipfix.decode_data_set(p.getlayer(Set))
2382 self.verify_ipfix_addr_exhausted(data)
2384 def test_ipfix_max_sessions(self):
2385 """ NAT44EI IPFIX logging maximum session entries exceeded """
2386 self.nat44_add_address(self.nat_addr)
2387 flags = self.config_flags.NAT_IS_INSIDE
2388 self.vapi.nat44_interface_add_del_feature(
2389 sw_if_index=self.pg0.sw_if_index,
2390 flags=flags, is_add=1)
2391 self.vapi.nat44_interface_add_del_feature(
2392 sw_if_index=self.pg1.sw_if_index,
2395 max_sessions = self.max_translations
2398 for i in range(0, max_sessions):
2399 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2400 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2401 IP(src=src, dst=self.pg1.remote_ip4) /
2404 self.pg0.add_stream(pkts)
2405 self.pg_enable_capture(self.pg_interfaces)
2408 self.pg1.get_capture(max_sessions)
2409 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
2410 src_address=self.pg3.local_ip4,
2412 template_interval=10)
2413 self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
2414 src_port=self.ipfix_src_port,
2417 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2418 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2420 self.pg0.add_stream(p)
2421 self.pg_enable_capture(self.pg_interfaces)
2423 self.pg1.assert_nothing_captured()
2425 self.vapi.ipfix_flush()
2426 capture = self.pg3.get_capture(7)
2427 ipfix = IPFIXDecoder()
2428 # first load template
2430 self.assertTrue(p.haslayer(IPFIX))
2431 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2432 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2433 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2434 self.assertEqual(p[UDP].dport, 4739)
2435 self.assertEqual(p[IPFIX].observationDomainID,
2436 self.ipfix_domain_id)
2437 if p.haslayer(Template):
2438 ipfix.add_template(p.getlayer(Template))
2439 # verify events in data set
2441 if p.haslayer(Data):
2442 data = ipfix.decode_data_set(p.getlayer(Set))
2443 self.verify_ipfix_max_sessions(data, max_sessions)
2445 def test_syslog_apmap(self):
2446 """ NAT44EI syslog address and port mapping creation and deletion """
2447 self.vapi.syslog_set_filter(
2448 self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
2449 self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
2450 self.nat44_add_address(self.nat_addr)
2451 flags = self.config_flags.NAT_IS_INSIDE
2452 self.vapi.nat44_interface_add_del_feature(
2453 sw_if_index=self.pg0.sw_if_index,
2454 flags=flags, is_add=1)
2455 self.vapi.nat44_interface_add_del_feature(
2456 sw_if_index=self.pg1.sw_if_index,
2459 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2460 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2461 TCP(sport=self.tcp_port_in, dport=20))
2462 self.pg0.add_stream(p)
2463 self.pg_enable_capture(self.pg_interfaces)
2465 capture = self.pg1.get_capture(1)
2466 self.tcp_port_out = capture[0][TCP].sport
2467 capture = self.pg3.get_capture(1)
2468 self.verify_syslog_apmap(capture[0][Raw].load)
2470 self.pg_enable_capture(self.pg_interfaces)
2472 self.nat44_add_address(self.nat_addr, is_add=0)
2473 capture = self.pg3.get_capture(1)
2474 self.verify_syslog_apmap(capture[0][Raw].load, False)
2476 def test_pool_addr_fib(self):
2477 """ NAT44EI add pool addresses to FIB """
2478 static_addr = '10.0.0.10'
2479 self.nat44_add_address(self.nat_addr)
2480 flags = self.config_flags.NAT_IS_INSIDE
2481 self.vapi.nat44_interface_add_del_feature(
2482 sw_if_index=self.pg0.sw_if_index,
2483 flags=flags, is_add=1)
2484 self.vapi.nat44_interface_add_del_feature(
2485 sw_if_index=self.pg1.sw_if_index,
2487 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2490 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2491 ARP(op=ARP.who_has, pdst=self.nat_addr,
2492 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2493 self.pg1.add_stream(p)
2494 self.pg_enable_capture(self.pg_interfaces)
2496 capture = self.pg1.get_capture(1)
2497 self.assertTrue(capture[0].haslayer(ARP))
2498 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2501 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2502 ARP(op=ARP.who_has, pdst=static_addr,
2503 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2504 self.pg1.add_stream(p)
2505 self.pg_enable_capture(self.pg_interfaces)
2507 capture = self.pg1.get_capture(1)
2508 self.assertTrue(capture[0].haslayer(ARP))
2509 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2511 # send ARP to non-NAT44EI interface
2512 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2513 ARP(op=ARP.who_has, pdst=self.nat_addr,
2514 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2515 self.pg2.add_stream(p)
2516 self.pg_enable_capture(self.pg_interfaces)
2518 self.pg1.assert_nothing_captured()
2520 # remove addresses and verify
2521 self.nat44_add_address(self.nat_addr, is_add=0)
2522 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2525 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2526 ARP(op=ARP.who_has, pdst=self.nat_addr,
2527 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2528 self.pg1.add_stream(p)
2529 self.pg_enable_capture(self.pg_interfaces)
2531 self.pg1.assert_nothing_captured()
2533 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2534 ARP(op=ARP.who_has, pdst=static_addr,
2535 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2536 self.pg1.add_stream(p)
2537 self.pg_enable_capture(self.pg_interfaces)
2539 self.pg1.assert_nothing_captured()
2541 def test_vrf_mode(self):
2542 """ NAT44EI tenant VRF aware address pool mode """
2546 nat_ip1 = "10.0.0.10"
2547 nat_ip2 = "10.0.0.11"
2549 self.pg0.unconfig_ip4()
2550 self.pg1.unconfig_ip4()
2551 self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id1})
2552 self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id2})
2553 self.pg0.set_table_ip4(vrf_id1)
2554 self.pg1.set_table_ip4(vrf_id2)
2555 self.pg0.config_ip4()
2556 self.pg1.config_ip4()
2557 self.pg0.resolve_arp()
2558 self.pg1.resolve_arp()
2560 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2561 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2562 flags = self.config_flags.NAT_IS_INSIDE
2563 self.vapi.nat44_interface_add_del_feature(
2564 sw_if_index=self.pg0.sw_if_index,
2565 flags=flags, is_add=1)
2566 self.vapi.nat44_interface_add_del_feature(
2567 sw_if_index=self.pg1.sw_if_index,
2568 flags=flags, is_add=1)
2569 self.vapi.nat44_interface_add_del_feature(
2570 sw_if_index=self.pg2.sw_if_index,
2575 pkts = self.create_stream_in(self.pg0, self.pg2)
2576 self.pg0.add_stream(pkts)
2577 self.pg_enable_capture(self.pg_interfaces)
2579 capture = self.pg2.get_capture(len(pkts))
2580 self.verify_capture_out(capture, nat_ip1)
2583 pkts = self.create_stream_in(self.pg1, self.pg2)
2584 self.pg1.add_stream(pkts)
2585 self.pg_enable_capture(self.pg_interfaces)
2587 capture = self.pg2.get_capture(len(pkts))
2588 self.verify_capture_out(capture, nat_ip2)
2591 self.pg0.unconfig_ip4()
2592 self.pg1.unconfig_ip4()
2593 self.pg0.set_table_ip4(0)
2594 self.pg1.set_table_ip4(0)
2595 self.pg0.config_ip4()
2596 self.pg1.config_ip4()
2597 self.pg0.resolve_arp()
2598 self.pg1.resolve_arp()
2599 self.vapi.ip_table_add_del(is_add=0, table={'table_id': vrf_id1})
2600 self.vapi.ip_table_add_del(is_add=0, table={'table_id': vrf_id2})
2602 def test_vrf_feature_independent(self):
2603 """ NAT44EI tenant VRF independent address pool mode """
2605 nat_ip1 = "10.0.0.10"
2606 nat_ip2 = "10.0.0.11"
2608 self.nat44_add_address(nat_ip1)
2609 self.nat44_add_address(nat_ip2, vrf_id=99)
2610 flags = self.config_flags.NAT_IS_INSIDE
2611 self.vapi.nat44_interface_add_del_feature(
2612 sw_if_index=self.pg0.sw_if_index,
2613 flags=flags, is_add=1)
2614 self.vapi.nat44_interface_add_del_feature(
2615 sw_if_index=self.pg1.sw_if_index,
2616 flags=flags, is_add=1)
2617 self.vapi.nat44_interface_add_del_feature(
2618 sw_if_index=self.pg2.sw_if_index,
2622 pkts = self.create_stream_in(self.pg0, self.pg2)
2623 self.pg0.add_stream(pkts)
2624 self.pg_enable_capture(self.pg_interfaces)
2626 capture = self.pg2.get_capture(len(pkts))
2627 self.verify_capture_out(capture, nat_ip1)
2630 pkts = self.create_stream_in(self.pg1, self.pg2)
2631 self.pg1.add_stream(pkts)
2632 self.pg_enable_capture(self.pg_interfaces)
2634 capture = self.pg2.get_capture(len(pkts))
2635 self.verify_capture_out(capture, nat_ip1)
2637 def test_dynamic_ipless_interfaces(self):
2638 """ NAT44EI interfaces without configured IP address """
2639 self.create_routes_and_neigbors()
2640 self.nat44_add_address(self.nat_addr)
2641 flags = self.config_flags.NAT_IS_INSIDE
2642 self.vapi.nat44_interface_add_del_feature(
2643 sw_if_index=self.pg7.sw_if_index,
2644 flags=flags, is_add=1)
2645 self.vapi.nat44_interface_add_del_feature(
2646 sw_if_index=self.pg8.sw_if_index,
2650 pkts = self.create_stream_in(self.pg7, self.pg8)
2651 self.pg7.add_stream(pkts)
2652 self.pg_enable_capture(self.pg_interfaces)
2654 capture = self.pg8.get_capture(len(pkts))
2655 self.verify_capture_out(capture)
2658 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2659 self.pg8.add_stream(pkts)
2660 self.pg_enable_capture(self.pg_interfaces)
2662 capture = self.pg7.get_capture(len(pkts))
2663 self.verify_capture_in(capture, self.pg7)
2665 def test_static_ipless_interfaces(self):
2666 """ NAT44EI interfaces without configured IP address - 1:1 NAT """
2668 self.create_routes_and_neigbors()
2669 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2670 flags = self.config_flags.NAT_IS_INSIDE
2671 self.vapi.nat44_interface_add_del_feature(
2672 sw_if_index=self.pg7.sw_if_index,
2673 flags=flags, is_add=1)
2674 self.vapi.nat44_interface_add_del_feature(
2675 sw_if_index=self.pg8.sw_if_index,
2679 pkts = self.create_stream_out(self.pg8)
2680 self.pg8.add_stream(pkts)
2681 self.pg_enable_capture(self.pg_interfaces)
2683 capture = self.pg7.get_capture(len(pkts))
2684 self.verify_capture_in(capture, self.pg7)
2687 pkts = self.create_stream_in(self.pg7, self.pg8)
2688 self.pg7.add_stream(pkts)
2689 self.pg_enable_capture(self.pg_interfaces)
2691 capture = self.pg8.get_capture(len(pkts))
2692 self.verify_capture_out(capture, self.nat_addr, True)
2694 def test_static_with_port_ipless_interfaces(self):
2695 """ NAT44EI interfaces without configured IP address - 1:1 NAPT """
2697 self.tcp_port_out = 30606
2698 self.udp_port_out = 30607
2699 self.icmp_id_out = 30608
2701 self.create_routes_and_neigbors()
2702 self.nat44_add_address(self.nat_addr)
2703 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2704 self.tcp_port_in, self.tcp_port_out,
2705 proto=IP_PROTOS.tcp)
2706 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2707 self.udp_port_in, self.udp_port_out,
2708 proto=IP_PROTOS.udp)
2709 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2710 self.icmp_id_in, self.icmp_id_out,
2711 proto=IP_PROTOS.icmp)
2712 flags = self.config_flags.NAT_IS_INSIDE
2713 self.vapi.nat44_interface_add_del_feature(
2714 sw_if_index=self.pg7.sw_if_index,
2715 flags=flags, is_add=1)
2716 self.vapi.nat44_interface_add_del_feature(
2717 sw_if_index=self.pg8.sw_if_index,
2721 pkts = self.create_stream_out(self.pg8)
2722 self.pg8.add_stream(pkts)
2723 self.pg_enable_capture(self.pg_interfaces)
2725 capture = self.pg7.get_capture(len(pkts))
2726 self.verify_capture_in(capture, self.pg7)
2729 pkts = self.create_stream_in(self.pg7, self.pg8)
2730 self.pg7.add_stream(pkts)
2731 self.pg_enable_capture(self.pg_interfaces)
2733 capture = self.pg8.get_capture(len(pkts))
2734 self.verify_capture_out(capture)
2736 def test_static_unknown_proto(self):
2737 """ NAT44EI 1:1 translate packet with unknown protocol """
2738 nat_ip = "10.0.0.10"
2739 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2740 flags = self.config_flags.NAT_IS_INSIDE
2741 self.vapi.nat44_interface_add_del_feature(
2742 sw_if_index=self.pg0.sw_if_index,
2743 flags=flags, is_add=1)
2744 self.vapi.nat44_interface_add_del_feature(
2745 sw_if_index=self.pg1.sw_if_index,
2749 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2750 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2752 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2753 TCP(sport=1234, dport=1234))
2754 self.pg0.add_stream(p)
2755 self.pg_enable_capture(self.pg_interfaces)
2757 p = self.pg1.get_capture(1)
2760 self.assertEqual(packet[IP].src, nat_ip)
2761 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2762 self.assertEqual(packet.haslayer(GRE), 1)
2763 self.assert_packet_checksums_valid(packet)
2765 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2769 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2770 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2772 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2773 TCP(sport=1234, dport=1234))
2774 self.pg1.add_stream(p)
2775 self.pg_enable_capture(self.pg_interfaces)
2777 p = self.pg0.get_capture(1)
2780 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2781 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2782 self.assertEqual(packet.haslayer(GRE), 1)
2783 self.assert_packet_checksums_valid(packet)
2785 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2788 def test_hairpinning_static_unknown_proto(self):
2789 """ NAT44EI 1:1 translate packet with unknown protocol - hairpinning
2792 host = self.pg0.remote_hosts[0]
2793 server = self.pg0.remote_hosts[1]
2795 host_nat_ip = "10.0.0.10"
2796 server_nat_ip = "10.0.0.11"
2798 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2799 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2800 flags = self.config_flags.NAT_IS_INSIDE
2801 self.vapi.nat44_interface_add_del_feature(
2802 sw_if_index=self.pg0.sw_if_index,
2803 flags=flags, is_add=1)
2804 self.vapi.nat44_interface_add_del_feature(
2805 sw_if_index=self.pg1.sw_if_index,
2809 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2810 IP(src=host.ip4, dst=server_nat_ip) /
2812 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2813 TCP(sport=1234, dport=1234))
2814 self.pg0.add_stream(p)
2815 self.pg_enable_capture(self.pg_interfaces)
2817 p = self.pg0.get_capture(1)
2820 self.assertEqual(packet[IP].src, host_nat_ip)
2821 self.assertEqual(packet[IP].dst, server.ip4)
2822 self.assertEqual(packet.haslayer(GRE), 1)
2823 self.assert_packet_checksums_valid(packet)
2825 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2829 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2830 IP(src=server.ip4, dst=host_nat_ip) /
2832 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2833 TCP(sport=1234, dport=1234))
2834 self.pg0.add_stream(p)
2835 self.pg_enable_capture(self.pg_interfaces)
2837 p = self.pg0.get_capture(1)
2840 self.assertEqual(packet[IP].src, server_nat_ip)
2841 self.assertEqual(packet[IP].dst, host.ip4)
2842 self.assertEqual(packet.haslayer(GRE), 1)
2843 self.assert_packet_checksums_valid(packet)
2845 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2848 def test_output_feature(self):
2849 """ NAT44EI output feature (in2out postrouting) """
2850 self.nat44_add_address(self.nat_addr)
2851 flags = self.config_flags.NAT_IS_INSIDE
2852 self.vapi.nat44_interface_add_del_output_feature(
2853 is_add=1, flags=flags,
2854 sw_if_index=self.pg0.sw_if_index)
2855 self.vapi.nat44_interface_add_del_output_feature(
2856 is_add=1, flags=flags,
2857 sw_if_index=self.pg1.sw_if_index)
2858 self.vapi.nat44_interface_add_del_output_feature(
2860 sw_if_index=self.pg3.sw_if_index)
2863 pkts = self.create_stream_in(self.pg0, self.pg3)
2864 self.pg0.add_stream(pkts)
2865 self.pg_enable_capture(self.pg_interfaces)
2867 capture = self.pg3.get_capture(len(pkts))
2868 self.verify_capture_out(capture)
2871 pkts = self.create_stream_out(self.pg3)
2872 self.pg3.add_stream(pkts)
2873 self.pg_enable_capture(self.pg_interfaces)
2875 capture = self.pg0.get_capture(len(pkts))
2876 self.verify_capture_in(capture, self.pg0)
2878 # from non-NAT interface to NAT inside interface
2879 pkts = self.create_stream_in(self.pg2, self.pg0)
2880 self.pg2.add_stream(pkts)
2881 self.pg_enable_capture(self.pg_interfaces)
2883 capture = self.pg0.get_capture(len(pkts))
2884 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2886 def test_output_feature_vrf_aware(self):
2887 """ NAT44EI output feature VRF aware (in2out postrouting) """
2888 nat_ip_vrf10 = "10.0.0.10"
2889 nat_ip_vrf20 = "10.0.0.20"
2891 r1 = VppIpRoute(self, self.pg3.remote_ip4, 32,
2892 [VppRoutePath(self.pg3.remote_ip4,
2893 self.pg3.sw_if_index)],
2895 r2 = VppIpRoute(self, self.pg3.remote_ip4, 32,
2896 [VppRoutePath(self.pg3.remote_ip4,
2897 self.pg3.sw_if_index)],
2902 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2903 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2904 flags = self.config_flags.NAT_IS_INSIDE
2905 self.vapi.nat44_interface_add_del_output_feature(
2906 is_add=1, flags=flags,
2907 sw_if_index=self.pg4.sw_if_index)
2908 self.vapi.nat44_interface_add_del_output_feature(
2909 is_add=1, flags=flags,
2910 sw_if_index=self.pg6.sw_if_index)
2911 self.vapi.nat44_interface_add_del_output_feature(
2913 sw_if_index=self.pg3.sw_if_index)
2916 pkts = self.create_stream_in(self.pg4, self.pg3)
2917 self.pg4.add_stream(pkts)
2918 self.pg_enable_capture(self.pg_interfaces)
2920 capture = self.pg3.get_capture(len(pkts))
2921 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2924 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2925 self.pg3.add_stream(pkts)
2926 self.pg_enable_capture(self.pg_interfaces)
2928 capture = self.pg4.get_capture(len(pkts))
2929 self.verify_capture_in(capture, self.pg4)
2932 pkts = self.create_stream_in(self.pg6, self.pg3)
2933 self.pg6.add_stream(pkts)
2934 self.pg_enable_capture(self.pg_interfaces)
2936 capture = self.pg3.get_capture(len(pkts))
2937 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2940 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2941 self.pg3.add_stream(pkts)
2942 self.pg_enable_capture(self.pg_interfaces)
2944 capture = self.pg6.get_capture(len(pkts))
2945 self.verify_capture_in(capture, self.pg6)
2947 def test_output_feature_hairpinning(self):
2948 """ NAT44EI output feature hairpinning (in2out postrouting) """
2949 host = self.pg0.remote_hosts[0]
2950 server = self.pg0.remote_hosts[1]
2953 server_in_port = 5678
2954 server_out_port = 8765
2956 self.nat44_add_address(self.nat_addr)
2957 flags = self.config_flags.NAT_IS_INSIDE
2958 self.vapi.nat44_interface_add_del_output_feature(
2959 is_add=1, flags=flags,
2960 sw_if_index=self.pg0.sw_if_index)
2961 self.vapi.nat44_interface_add_del_output_feature(
2963 sw_if_index=self.pg1.sw_if_index)
2965 # add static mapping for server
2966 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2967 server_in_port, server_out_port,
2968 proto=IP_PROTOS.tcp)
2970 # send packet from host to server
2971 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2972 IP(src=host.ip4, dst=self.nat_addr) /
2973 TCP(sport=host_in_port, dport=server_out_port))
2974 self.pg0.add_stream(p)
2975 self.pg_enable_capture(self.pg_interfaces)
2977 capture = self.pg0.get_capture(1)
2982 self.assertEqual(ip.src, self.nat_addr)
2983 self.assertEqual(ip.dst, server.ip4)
2984 self.assertNotEqual(tcp.sport, host_in_port)
2985 self.assertEqual(tcp.dport, server_in_port)
2986 self.assert_packet_checksums_valid(p)
2987 host_out_port = tcp.sport
2989 self.logger.error(ppp("Unexpected or invalid packet:", p))
2992 # send reply from server to host
2993 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2994 IP(src=server.ip4, dst=self.nat_addr) /
2995 TCP(sport=server_in_port, dport=host_out_port))
2996 self.pg0.add_stream(p)
2997 self.pg_enable_capture(self.pg_interfaces)
2999 capture = self.pg0.get_capture(1)
3004 self.assertEqual(ip.src, self.nat_addr)
3005 self.assertEqual(ip.dst, host.ip4)
3006 self.assertEqual(tcp.sport, server_out_port)
3007 self.assertEqual(tcp.dport, host_in_port)
3008 self.assert_packet_checksums_valid(p)
3010 self.logger.error(ppp("Unexpected or invalid packet:", p))
3013 def test_one_armed_nat44(self):
3014 """ NAT44EI One armed NAT """
3015 remote_host = self.pg9.remote_hosts[0]
3016 local_host = self.pg9.remote_hosts[1]
3019 self.nat44_add_address(self.nat_addr)
3020 flags = self.config_flags.NAT_IS_INSIDE
3021 self.vapi.nat44_interface_add_del_feature(
3022 sw_if_index=self.pg9.sw_if_index,
3024 self.vapi.nat44_interface_add_del_feature(
3025 sw_if_index=self.pg9.sw_if_index,
3026 flags=flags, is_add=1)
3029 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3030 IP(src=local_host.ip4, dst=remote_host.ip4) /
3031 TCP(sport=12345, dport=80))
3032 self.pg9.add_stream(p)
3033 self.pg_enable_capture(self.pg_interfaces)
3035 capture = self.pg9.get_capture(1)
3040 self.assertEqual(ip.src, self.nat_addr)
3041 self.assertEqual(ip.dst, remote_host.ip4)
3042 self.assertNotEqual(tcp.sport, 12345)
3043 external_port = tcp.sport
3044 self.assertEqual(tcp.dport, 80)
3045 self.assert_packet_checksums_valid(p)
3047 self.logger.error(ppp("Unexpected or invalid packet:", p))
3051 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3052 IP(src=remote_host.ip4, dst=self.nat_addr) /
3053 TCP(sport=80, dport=external_port))
3054 self.pg9.add_stream(p)
3055 self.pg_enable_capture(self.pg_interfaces)
3057 capture = self.pg9.get_capture(1)
3062 self.assertEqual(ip.src, remote_host.ip4)
3063 self.assertEqual(ip.dst, local_host.ip4)
3064 self.assertEqual(tcp.sport, 80)
3065 self.assertEqual(tcp.dport, 12345)
3066 self.assert_packet_checksums_valid(p)
3068 self.logger.error(ppp("Unexpected or invalid packet:", p))
3071 err = self.statistics.get_err_counter(
3072 '/err/nat44-classify/next in2out')
3073 self.assertEqual(err, 1)
3074 err = self.statistics.get_err_counter(
3075 '/err/nat44-classify/next out2in')
3076 self.assertEqual(err, 1)
3078 def test_del_session(self):
3079 """ NAT44EI delete session """
3080 self.nat44_add_address(self.nat_addr)
3081 flags = self.config_flags.NAT_IS_INSIDE
3082 self.vapi.nat44_interface_add_del_feature(
3083 sw_if_index=self.pg0.sw_if_index,
3084 flags=flags, is_add=1)
3085 self.vapi.nat44_interface_add_del_feature(
3086 sw_if_index=self.pg1.sw_if_index,
3089 pkts = self.create_stream_in(self.pg0, self.pg1)
3090 self.pg0.add_stream(pkts)
3091 self.pg_enable_capture(self.pg_interfaces)
3093 self.pg1.get_capture(len(pkts))
3095 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3096 nsessions = len(sessions)
3098 self.vapi.nat44_del_session(address=sessions[0].inside_ip_address,
3099 port=sessions[0].inside_port,
3100 protocol=sessions[0].protocol,
3101 flags=self.config_flags.NAT_IS_INSIDE)
3102 self.vapi.nat44_del_session(address=sessions[1].outside_ip_address,
3103 port=sessions[1].outside_port,
3104 protocol=sessions[1].protocol)
3106 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3107 self.assertEqual(nsessions - len(sessions), 2)
3109 self.vapi.nat44_del_session(address=sessions[0].inside_ip_address,
3110 port=sessions[0].inside_port,
3111 protocol=sessions[0].protocol,
3112 flags=self.config_flags.NAT_IS_INSIDE)
3114 self.verify_no_nat44_user()
3116 def test_frag_in_order(self):
3117 """ NAT44EI translate fragments arriving in order """
3119 self.nat44_add_address(self.nat_addr)
3120 flags = self.config_flags.NAT_IS_INSIDE
3121 self.vapi.nat44_interface_add_del_feature(
3122 sw_if_index=self.pg0.sw_if_index,
3123 flags=flags, is_add=1)
3124 self.vapi.nat44_interface_add_del_feature(
3125 sw_if_index=self.pg1.sw_if_index,
3128 self.frag_in_order(proto=IP_PROTOS.tcp)
3129 self.frag_in_order(proto=IP_PROTOS.udp)
3130 self.frag_in_order(proto=IP_PROTOS.icmp)
3132 def test_frag_forwarding(self):
3133 """ NAT44EI forwarding fragment test """
3134 self.vapi.nat44_add_del_interface_addr(
3136 sw_if_index=self.pg1.sw_if_index)
3137 flags = self.config_flags.NAT_IS_INSIDE
3138 self.vapi.nat44_interface_add_del_feature(
3139 sw_if_index=self.pg0.sw_if_index,
3140 flags=flags, is_add=1)
3141 self.vapi.nat44_interface_add_del_feature(
3142 sw_if_index=self.pg1.sw_if_index,
3144 self.vapi.nat44_forwarding_enable_disable(enable=1)
3146 data = b"A" * 16 + b"B" * 16 + b"C" * 3
3147 pkts = self.create_stream_frag(self.pg1,
3148 self.pg0.remote_ip4,
3152 proto=IP_PROTOS.udp)
3153 self.pg1.add_stream(pkts)
3154 self.pg_enable_capture(self.pg_interfaces)
3156 frags = self.pg0.get_capture(len(pkts))
3157 p = self.reass_frags_and_verify(frags,
3158 self.pg1.remote_ip4,
3159 self.pg0.remote_ip4)
3160 self.assertEqual(p[UDP].sport, 4789)
3161 self.assertEqual(p[UDP].dport, 4789)
3162 self.assertEqual(data, p[Raw].load)
3164 def test_reass_hairpinning(self):
3165 """ NAT44EI fragments hairpinning """
3167 server_addr = self.pg0.remote_hosts[1].ip4
3168 host_in_port = random.randint(1025, 65535)
3169 server_in_port = random.randint(1025, 65535)
3170 server_out_port = random.randint(1025, 65535)
3172 self.nat44_add_address(self.nat_addr)
3173 flags = self.config_flags.NAT_IS_INSIDE
3174 self.vapi.nat44_interface_add_del_feature(
3175 sw_if_index=self.pg0.sw_if_index,
3176 flags=flags, is_add=1)
3177 self.vapi.nat44_interface_add_del_feature(
3178 sw_if_index=self.pg1.sw_if_index,
3180 # add static mapping for server
3181 self.nat44_add_static_mapping(server_addr, self.nat_addr,
3184 proto=IP_PROTOS.tcp)
3185 self.nat44_add_static_mapping(server_addr, self.nat_addr,
3188 proto=IP_PROTOS.udp)
3189 self.nat44_add_static_mapping(server_addr, self.nat_addr)
3191 self.reass_hairpinning(server_addr, server_in_port, server_out_port,
3192 host_in_port, proto=IP_PROTOS.tcp)
3193 self.reass_hairpinning(server_addr, server_in_port, server_out_port,
3194 host_in_port, proto=IP_PROTOS.udp)
3195 self.reass_hairpinning(server_addr, server_in_port, server_out_port,
3196 host_in_port, proto=IP_PROTOS.icmp)
3198 def test_frag_out_of_order(self):
3199 """ NAT44EI translate fragments arriving out of order """
3201 self.nat44_add_address(self.nat_addr)
3202 flags = self.config_flags.NAT_IS_INSIDE
3203 self.vapi.nat44_interface_add_del_feature(
3204 sw_if_index=self.pg0.sw_if_index,
3205 flags=flags, is_add=1)
3206 self.vapi.nat44_interface_add_del_feature(
3207 sw_if_index=self.pg1.sw_if_index,
3210 self.frag_out_of_order(proto=IP_PROTOS.tcp)
3211 self.frag_out_of_order(proto=IP_PROTOS.udp)
3212 self.frag_out_of_order(proto=IP_PROTOS.icmp)
3214 def test_port_restricted(self):
3215 """ NAT44EI Port restricted NAT44EI (MAP-E CE) """
3216 self.nat44_add_address(self.nat_addr)
3217 flags = self.config_flags.NAT_IS_INSIDE
3218 self.vapi.nat44_interface_add_del_feature(
3219 sw_if_index=self.pg0.sw_if_index,
3220 flags=flags, is_add=1)
3221 self.vapi.nat44_interface_add_del_feature(
3222 sw_if_index=self.pg1.sw_if_index,
3224 self.vapi.nat_set_addr_and_port_alloc_alg(alg=1,
3229 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3230 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3231 TCP(sport=4567, dport=22))
3232 self.pg0.add_stream(p)
3233 self.pg_enable_capture(self.pg_interfaces)
3235 capture = self.pg1.get_capture(1)
3240 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3241 self.assertEqual(ip.src, self.nat_addr)
3242 self.assertEqual(tcp.dport, 22)
3243 self.assertNotEqual(tcp.sport, 4567)
3244 self.assertEqual((tcp.sport >> 6) & 63, 10)
3245 self.assert_packet_checksums_valid(p)
3247 self.logger.error(ppp("Unexpected or invalid packet:", p))
3250 def test_port_range(self):
3251 """ NAT44EI External address port range """
3252 self.nat44_add_address(self.nat_addr)
3253 flags = self.config_flags.NAT_IS_INSIDE
3254 self.vapi.nat44_interface_add_del_feature(
3255 sw_if_index=self.pg0.sw_if_index,
3256 flags=flags, is_add=1)
3257 self.vapi.nat44_interface_add_del_feature(
3258 sw_if_index=self.pg1.sw_if_index,
3260 self.vapi.nat_set_addr_and_port_alloc_alg(alg=2,
3265 for port in range(0, 5):
3266 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3267 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3268 TCP(sport=1125 + port))
3270 self.pg0.add_stream(pkts)
3271 self.pg_enable_capture(self.pg_interfaces)
3273 capture = self.pg1.get_capture(3)
3276 self.assertGreaterEqual(tcp.sport, 1025)
3277 self.assertLessEqual(tcp.sport, 1027)
3279 def test_multiple_outside_vrf(self):
3280 """ NAT44EI Multiple outside VRF """
3284 self.pg1.unconfig_ip4()
3285 self.pg2.unconfig_ip4()
3286 self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id1})
3287 self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id2})
3288 self.pg1.set_table_ip4(vrf_id1)
3289 self.pg2.set_table_ip4(vrf_id2)
3290 self.pg1.config_ip4()
3291 self.pg2.config_ip4()
3292 self.pg1.resolve_arp()
3293 self.pg2.resolve_arp()
3295 self.nat44_add_address(self.nat_addr)
3296 flags = self.config_flags.NAT_IS_INSIDE
3297 self.vapi.nat44_interface_add_del_feature(
3298 sw_if_index=self.pg0.sw_if_index,
3299 flags=flags, is_add=1)
3300 self.vapi.nat44_interface_add_del_feature(
3301 sw_if_index=self.pg1.sw_if_index,
3303 self.vapi.nat44_interface_add_del_feature(
3304 sw_if_index=self.pg2.sw_if_index,
3309 pkts = self.create_stream_in(self.pg0, self.pg1)
3310 self.pg0.add_stream(pkts)
3311 self.pg_enable_capture(self.pg_interfaces)
3313 capture = self.pg1.get_capture(len(pkts))
3314 self.verify_capture_out(capture, self.nat_addr)
3316 pkts = self.create_stream_out(self.pg1, self.nat_addr)
3317 self.pg1.add_stream(pkts)
3318 self.pg_enable_capture(self.pg_interfaces)
3320 capture = self.pg0.get_capture(len(pkts))
3321 self.verify_capture_in(capture, self.pg0)
3323 self.tcp_port_in = 60303
3324 self.udp_port_in = 60304
3325 self.icmp_id_in = 60305
3328 pkts = self.create_stream_in(self.pg0, self.pg2)
3329 self.pg0.add_stream(pkts)
3330 self.pg_enable_capture(self.pg_interfaces)
3332 capture = self.pg2.get_capture(len(pkts))
3333 self.verify_capture_out(capture, self.nat_addr)
3335 pkts = self.create_stream_out(self.pg2, self.nat_addr)
3336 self.pg2.add_stream(pkts)
3337 self.pg_enable_capture(self.pg_interfaces)
3339 capture = self.pg0.get_capture(len(pkts))
3340 self.verify_capture_in(capture, self.pg0)
3343 self.nat44_add_address(self.nat_addr, is_add=0)
3344 self.pg1.unconfig_ip4()
3345 self.pg2.unconfig_ip4()
3346 self.pg1.set_table_ip4(0)
3347 self.pg2.set_table_ip4(0)
3348 self.pg1.config_ip4()
3349 self.pg2.config_ip4()
3350 self.pg1.resolve_arp()
3351 self.pg2.resolve_arp()
3353 def test_mss_clamping(self):
3354 """ NAT44EI TCP MSS clamping """
3355 self.nat44_add_address(self.nat_addr)
3356 flags = self.config_flags.NAT_IS_INSIDE
3357 self.vapi.nat44_interface_add_del_feature(
3358 sw_if_index=self.pg0.sw_if_index,
3359 flags=flags, is_add=1)
3360 self.vapi.nat44_interface_add_del_feature(
3361 sw_if_index=self.pg1.sw_if_index,
3364 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3365 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3366 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3367 flags="S", options=[('MSS', 1400)]))
3369 self.vapi.nat_set_mss_clamping(enable=1, mss_value=1000)
3370 self.pg0.add_stream(p)
3371 self.pg_enable_capture(self.pg_interfaces)
3373 capture = self.pg1.get_capture(1)
3374 # Negotiated MSS value greater than configured - changed
3375 self.verify_mss_value(capture[0], 1000)
3377 self.vapi.nat_set_mss_clamping(enable=0, mss_value=1500)
3378 self.pg0.add_stream(p)
3379 self.pg_enable_capture(self.pg_interfaces)
3381 capture = self.pg1.get_capture(1)
3382 # MSS clamping disabled - negotiated MSS unchanged
3383 self.verify_mss_value(capture[0], 1400)
3385 self.vapi.nat_set_mss_clamping(enable=1, mss_value=1500)
3386 self.pg0.add_stream(p)
3387 self.pg_enable_capture(self.pg_interfaces)
3389 capture = self.pg1.get_capture(1)
3390 # Negotiated MSS value smaller than configured - unchanged
3391 self.verify_mss_value(capture[0], 1400)
3393 def test_ha_send(self):
3394 """ NAT44EI Send HA session synchronization events (active) """
3395 flags = self.config_flags.NAT_IS_INSIDE
3396 self.vapi.nat44_interface_add_del_feature(
3397 sw_if_index=self.pg0.sw_if_index,
3398 flags=flags, is_add=1)
3399 self.vapi.nat44_interface_add_del_feature(
3400 sw_if_index=self.pg1.sw_if_index,
3402 self.nat44_add_address(self.nat_addr)
3404 self.vapi.nat_ha_set_listener(ip_address=self.pg3.local_ip4,
3407 self.vapi.nat_ha_set_failover(ip_address=self.pg3.remote_ip4,
3408 port=12346, session_refresh_interval=10)
3409 bind_layers(UDP, HANATStateSync, sport=12345)
3412 pkts = self.create_stream_in(self.pg0, self.pg1)
3413 self.pg0.add_stream(pkts)
3414 self.pg_enable_capture(self.pg_interfaces)
3416 capture = self.pg1.get_capture(len(pkts))
3417 self.verify_capture_out(capture)
3418 # active send HA events
3419 self.vapi.nat_ha_flush()
3420 stats = self.statistics.get_counter('/nat44/ha/add-event-send')
3421 self.assertEqual(stats[0][0], 3)
3422 capture = self.pg3.get_capture(1)
3424 self.assert_packet_checksums_valid(p)
3428 hanat = p[HANATStateSync]
3430 self.logger.error(ppp("Invalid packet:", p))
3433 self.assertEqual(ip.src, self.pg3.local_ip4)
3434 self.assertEqual(ip.dst, self.pg3.remote_ip4)
3435 self.assertEqual(udp.sport, 12345)
3436 self.assertEqual(udp.dport, 12346)
3437 self.assertEqual(hanat.version, 1)
3438 self.assertEqual(hanat.thread_index, 0)
3439 self.assertEqual(hanat.count, 3)
3440 seq = hanat.sequence_number
3441 for event in hanat.events:
3442 self.assertEqual(event.event_type, 1)
3443 self.assertEqual(event.in_addr, self.pg0.remote_ip4)
3444 self.assertEqual(event.out_addr, self.nat_addr)
3445 self.assertEqual(event.fib_index, 0)
3447 # ACK received events
3448 ack = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
3449 IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
3450 UDP(sport=12346, dport=12345) /
3451 HANATStateSync(sequence_number=seq, flags='ACK'))
3452 self.pg3.add_stream(ack)
3454 stats = self.statistics.get_counter('/nat44/ha/ack-recv')
3455 self.assertEqual(stats[0][0], 1)
3457 # delete one session
3458 self.pg_enable_capture(self.pg_interfaces)
3459 self.vapi.nat44_del_session(address=self.pg0.remote_ip4,
3460 port=self.tcp_port_in,
3461 protocol=IP_PROTOS.tcp,
3462 flags=self.config_flags.NAT_IS_INSIDE)
3463 self.vapi.nat_ha_flush()
3464 stats = self.statistics.get_counter('/nat44/ha/del-event-send')
3465 self.assertEqual(stats[0][0], 1)
3466 capture = self.pg3.get_capture(1)
3469 hanat = p[HANATStateSync]
3471 self.logger.error(ppp("Invalid packet:", p))
3474 self.assertGreater(hanat.sequence_number, seq)
3476 # do not send ACK, active retry send HA event again
3477 self.pg_enable_capture(self.pg_interfaces)
3479 stats = self.statistics.get_counter('/nat44/ha/retry-count')
3480 self.assertEqual(stats[0][0], 3)
3481 stats = self.statistics.get_counter('/nat44/ha/missed-count')
3482 self.assertEqual(stats[0][0], 1)
3483 capture = self.pg3.get_capture(3)
3484 for packet in capture:
3485 self.assertEqual(packet, p)
3487 # session counters refresh
3488 pkts = self.create_stream_out(self.pg1)
3489 self.pg1.add_stream(pkts)
3490 self.pg_enable_capture(self.pg_interfaces)
3492 self.pg0.get_capture(2)
3493 self.vapi.nat_ha_flush()
3494 stats = self.statistics.get_counter('/nat44/ha/refresh-event-send')
3495 self.assertEqual(stats[0][0], 2)
3496 capture = self.pg3.get_capture(1)
3498 self.assert_packet_checksums_valid(p)
3502 hanat = p[HANATStateSync]
3504 self.logger.error(ppp("Invalid packet:", p))
3507 self.assertEqual(ip.src, self.pg3.local_ip4)
3508 self.assertEqual(ip.dst, self.pg3.remote_ip4)
3509 self.assertEqual(udp.sport, 12345)
3510 self.assertEqual(udp.dport, 12346)
3511 self.assertEqual(hanat.version, 1)
3512 self.assertEqual(hanat.count, 2)
3513 seq = hanat.sequence_number
3514 for event in hanat.events:
3515 self.assertEqual(event.event_type, 3)
3516 self.assertEqual(event.out_addr, self.nat_addr)
3517 self.assertEqual(event.fib_index, 0)
3518 self.assertEqual(event.total_pkts, 2)
3519 self.assertGreater(event.total_bytes, 0)
3521 ack = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
3522 IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
3523 UDP(sport=12346, dport=12345) /
3524 HANATStateSync(sequence_number=seq, flags='ACK'))
3525 self.pg3.add_stream(ack)
3527 stats = self.statistics.get_counter('/nat44/ha/ack-recv')
3528 self.assertEqual(stats[0][0], 2)
3530 def test_ha_recv(self):
3531 """ NAT44EI Receive HA session synchronization events (passive) """
3532 self.nat44_add_address(self.nat_addr)
3533 flags = self.config_flags.NAT_IS_INSIDE
3534 self.vapi.nat44_interface_add_del_feature(
3535 sw_if_index=self.pg0.sw_if_index,
3536 flags=flags, is_add=1)
3537 self.vapi.nat44_interface_add_del_feature(
3538 sw_if_index=self.pg1.sw_if_index,
3540 self.vapi.nat_ha_set_listener(ip_address=self.pg3.local_ip4,
3543 bind_layers(UDP, HANATStateSync, sport=12345)
3545 self.tcp_port_out = random.randint(1025, 65535)
3546 self.udp_port_out = random.randint(1025, 65535)
3548 # send HA session add events to failover/passive
3549 p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
3550 IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
3551 UDP(sport=12346, dport=12345) /
3552 HANATStateSync(sequence_number=1, events=[
3553 Event(event_type='add', protocol='tcp',
3554 in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
3555 in_port=self.tcp_port_in, out_port=self.tcp_port_out,
3556 eh_addr=self.pg1.remote_ip4,
3557 ehn_addr=self.pg1.remote_ip4,
3558 eh_port=self.tcp_external_port,
3559 ehn_port=self.tcp_external_port, fib_index=0),
3560 Event(event_type='add', protocol='udp',
3561 in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
3562 in_port=self.udp_port_in, out_port=self.udp_port_out,
3563 eh_addr=self.pg1.remote_ip4,
3564 ehn_addr=self.pg1.remote_ip4,
3565 eh_port=self.udp_external_port,
3566 ehn_port=self.udp_external_port, fib_index=0)]))
3568 self.pg3.add_stream(p)
3569 self.pg_enable_capture(self.pg_interfaces)
3572 capture = self.pg3.get_capture(1)
3575 hanat = p[HANATStateSync]
3577 self.logger.error(ppp("Invalid packet:", p))
3580 self.assertEqual(hanat.sequence_number, 1)
3581 self.assertEqual(hanat.flags, 'ACK')
3582 self.assertEqual(hanat.version, 1)
3583 self.assertEqual(hanat.thread_index, 0)
3584 stats = self.statistics.get_counter('/nat44/ha/ack-send')
3585 self.assertEqual(stats[0][0], 1)
3586 stats = self.statistics.get_counter('/nat44/ha/add-event-recv')
3587 self.assertEqual(stats[0][0], 2)
3588 users = self.statistics.get_counter('/nat44/total-users')
3589 self.assertEqual(users[0][0], 1)
3590 sessions = self.statistics.get_counter('/nat44/total-sessions')
3591 self.assertEqual(sessions[0][0], 2)
3592 users = self.vapi.nat44_user_dump()
3593 self.assertEqual(len(users), 1)
3594 self.assertEqual(str(users[0].ip_address),
3595 self.pg0.remote_ip4)
3596 # there should be 2 sessions created by HA
3597 sessions = self.vapi.nat44_user_session_dump(users[0].ip_address,
3599 self.assertEqual(len(sessions), 2)
3600 for session in sessions:
3601 self.assertEqual(str(session.inside_ip_address),
3602 self.pg0.remote_ip4)
3603 self.assertEqual(str(session.outside_ip_address),
3605 self.assertIn(session.inside_port,
3606 [self.tcp_port_in, self.udp_port_in])
3607 self.assertIn(session.outside_port,
3608 [self.tcp_port_out, self.udp_port_out])
3609 self.assertIn(session.protocol, [IP_PROTOS.tcp, IP_PROTOS.udp])
3611 # send HA session delete event to failover/passive
3612 p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
3613 IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
3614 UDP(sport=12346, dport=12345) /
3615 HANATStateSync(sequence_number=2, events=[
3616 Event(event_type='del', protocol='udp',
3617 in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
3618 in_port=self.udp_port_in, out_port=self.udp_port_out,
3619 eh_addr=self.pg1.remote_ip4,
3620 ehn_addr=self.pg1.remote_ip4,
3621 eh_port=self.udp_external_port,
3622 ehn_port=self.udp_external_port, fib_index=0)]))
3624 self.pg3.add_stream(p)
3625 self.pg_enable_capture(self.pg_interfaces)
3628 capture = self.pg3.get_capture(1)
3631 hanat = p[HANATStateSync]
3633 self.logger.error(ppp("Invalid packet:", p))
3636 self.assertEqual(hanat.sequence_number, 2)
3637 self.assertEqual(hanat.flags, 'ACK')
3638 self.assertEqual(hanat.version, 1)
3639 users = self.vapi.nat44_user_dump()
3640 self.assertEqual(len(users), 1)
3641 self.assertEqual(str(users[0].ip_address),
3642 self.pg0.remote_ip4)
3643 # now we should have only 1 session, 1 deleted by HA
3644 sessions = self.vapi.nat44_user_session_dump(users[0].ip_address,
3646 self.assertEqual(len(sessions), 1)
3647 stats = self.statistics.get_counter('/nat44/ha/del-event-recv')
3648 self.assertEqual(stats[0][0], 1)
3650 stats = self.statistics.get_err_counter('/err/nat-ha/pkts-processed')
3651 self.assertEqual(stats, 2)
3653 # send HA session refresh event to failover/passive
3654 p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
3655 IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
3656 UDP(sport=12346, dport=12345) /
3657 HANATStateSync(sequence_number=3, events=[
3658 Event(event_type='refresh', protocol='tcp',
3659 in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
3660 in_port=self.tcp_port_in, out_port=self.tcp_port_out,
3661 eh_addr=self.pg1.remote_ip4,
3662 ehn_addr=self.pg1.remote_ip4,
3663 eh_port=self.tcp_external_port,
3664 ehn_port=self.tcp_external_port, fib_index=0,
3665 total_bytes=1024, total_pkts=2)]))
3666 self.pg3.add_stream(p)
3667 self.pg_enable_capture(self.pg_interfaces)
3670 capture = self.pg3.get_capture(1)
3673 hanat = p[HANATStateSync]
3675 self.logger.error(ppp("Invalid packet:", p))
3678 self.assertEqual(hanat.sequence_number, 3)
3679 self.assertEqual(hanat.flags, 'ACK')
3680 self.assertEqual(hanat.version, 1)
3681 users = self.vapi.nat44_user_dump()
3682 self.assertEqual(len(users), 1)
3683 self.assertEqual(str(users[0].ip_address),
3684 self.pg0.remote_ip4)
3685 sessions = self.vapi.nat44_user_session_dump(users[0].ip_address,
3687 self.assertEqual(len(sessions), 1)
3688 session = sessions[0]
3689 self.assertEqual(session.total_bytes, 1024)
3690 self.assertEqual(session.total_pkts, 2)
3691 stats = self.statistics.get_counter('/nat44/ha/refresh-event-recv')
3692 self.assertEqual(stats[0][0], 1)
3694 stats = self.statistics.get_err_counter('/err/nat-ha/pkts-processed')
3695 self.assertEqual(stats, 3)
3697 # send packet to test session created by HA
3698 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3699 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3700 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out))
3701 self.pg1.add_stream(p)
3702 self.pg_enable_capture(self.pg_interfaces)
3704 capture = self.pg0.get_capture(1)
3710 self.logger.error(ppp("Invalid packet:", p))
3713 self.assertEqual(ip.src, self.pg1.remote_ip4)
3714 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3715 self.assertEqual(tcp.sport, self.tcp_external_port)
3716 self.assertEqual(tcp.dport, self.tcp_port_in)
3718 def show_commands_at_teardown(self):
3719 self.logger.info(self.vapi.cli("show nat44 addresses"))
3720 self.logger.info(self.vapi.cli("show nat44 interfaces"))
3721 self.logger.info(self.vapi.cli("show nat44 static mappings"))
3722 self.logger.info(self.vapi.cli("show nat44 interface address"))
3723 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3724 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
3725 self.logger.info(self.vapi.cli("show nat timeouts"))
3727 self.vapi.cli("show nat addr-port-assignment-alg"))
3728 self.logger.info(self.vapi.cli("show nat ha"))
3731 class TestNAT44Out2InDPO(MethodHolder):
3732 """ NAT44EI Test Cases using out2in DPO """
3735 def setUpClass(cls):
3736 super(TestNAT44Out2InDPO, cls).setUpClass()
3737 cls.vapi.cli("set log class nat level debug")
3739 cls.tcp_port_in = 6303
3740 cls.tcp_port_out = 6303
3741 cls.udp_port_in = 6304
3742 cls.udp_port_out = 6304
3743 cls.icmp_id_in = 6305
3744 cls.icmp_id_out = 6305
3745 cls.nat_addr = '10.0.0.3'
3746 cls.dst_ip4 = '192.168.70.1'
3748 cls.create_pg_interfaces(range(2))
3751 cls.pg0.config_ip4()
3752 cls.pg0.resolve_arp()
3755 cls.pg1.config_ip6()
3756 cls.pg1.resolve_ndp()
3758 r1 = VppIpRoute(cls, "::", 0,
3759 [VppRoutePath(cls.pg1.remote_ip6,
3760 cls.pg1.sw_if_index)],
3765 super(TestNAT44Out2InDPO, self).setUp()
3766 flags = self.nat44_config_flags.NAT44_API_IS_OUT2IN_DPO
3767 self.vapi.nat44_plugin_enable_disable(enable=1, flags=flags)
3770 super(TestNAT44Out2InDPO, self).tearDown()
3771 if not self.vpp_dead:
3772 self.vapi.nat44_plugin_enable_disable(enable=0)
3773 self.vapi.cli("clear logging")
3775 def configure_xlat(self):
3776 self.dst_ip6_pfx = '1:2:3::'
3777 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3779 self.dst_ip6_pfx_len = 96
3780 self.src_ip6_pfx = '4:5:6::'
3781 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3783 self.src_ip6_pfx_len = 96
3784 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
3785 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
3786 '\x00\x00\x00\x00', 0)
3788 @unittest.skip('Temporary disabled')
3789 def test_464xlat_ce(self):
3790 """ Test 464XLAT CE with NAT44EI """
3792 nat_config = self.vapi.nat_show_config()
3793 self.assertEqual(1, nat_config.out2in_dpo)
3795 self.configure_xlat()
3797 flags = self.config_flags.NAT_IS_INSIDE
3798 self.vapi.nat44_interface_add_del_feature(
3799 sw_if_index=self.pg0.sw_if_index,
3800 flags=flags, is_add=1)
3801 self.vapi.nat44_add_del_address_range(first_ip_address=self.nat_addr_n,
3802 last_ip_address=self.nat_addr_n,
3803 vrf_id=0xFFFFFFFF, is_add=1)
3805 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3806 self.dst_ip6_pfx_len)
3807 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
3808 self.src_ip6_pfx_len)
3811 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3812 self.pg0.add_stream(pkts)
3813 self.pg_enable_capture(self.pg_interfaces)
3815 capture = self.pg1.get_capture(len(pkts))
3816 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
3819 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
3821 self.pg1.add_stream(pkts)
3822 self.pg_enable_capture(self.pg_interfaces)
3824 capture = self.pg0.get_capture(len(pkts))
3825 self.verify_capture_in(capture, self.pg0)
3827 self.vapi.nat44_interface_add_del_feature(
3828 sw_if_index=self.pg0.sw_if_index,
3830 self.vapi.nat44_add_del_address_range(
3831 first_ip_address=self.nat_addr_n,
3832 last_ip_address=self.nat_addr_n,
3835 @unittest.skip('Temporary disabled')
3836 def test_464xlat_ce_no_nat(self):
3837 """ Test 464XLAT CE without NAT44EI """
3839 self.configure_xlat()
3841 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3842 self.dst_ip6_pfx_len)
3843 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
3844 self.src_ip6_pfx_len)
3846 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3847 self.pg0.add_stream(pkts)
3848 self.pg_enable_capture(self.pg_interfaces)
3850 capture = self.pg1.get_capture(len(pkts))
3851 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
3852 nat_ip=out_dst_ip6, same_port=True)
3854 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
3855 self.pg1.add_stream(pkts)
3856 self.pg_enable_capture(self.pg_interfaces)
3858 capture = self.pg0.get_capture(len(pkts))
3859 self.verify_capture_in(capture, self.pg0)
3862 class TestNAT44EIMW(MethodHolder):
3863 """ NAT44EI Test Cases (multiple workers) """
3865 worker_config = "workers %d" % 2
3867 max_translations = 10240
3871 def setUpClass(cls):
3872 super(TestNAT44EIMW, cls).setUpClass()
3873 cls.vapi.cli("set log class nat level debug")
3875 cls.tcp_port_in = 6303
3876 cls.tcp_port_out = 6303
3877 cls.udp_port_in = 6304
3878 cls.udp_port_out = 6304
3879 cls.icmp_id_in = 6305
3880 cls.icmp_id_out = 6305
3881 cls.nat_addr = '10.0.0.3'
3882 cls.ipfix_src_port = 4739
3883 cls.ipfix_domain_id = 1
3884 cls.tcp_external_port = 80
3885 cls.udp_external_port = 69
3887 cls.create_pg_interfaces(range(10))
3888 cls.interfaces = list(cls.pg_interfaces[0:4])
3890 for i in cls.interfaces:
3895 cls.pg0.generate_remote_hosts(3)
3896 cls.pg0.configure_ipv4_neighbors()
3898 cls.pg1.generate_remote_hosts(1)
3899 cls.pg1.configure_ipv4_neighbors()
3901 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
3902 cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 10})
3903 cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 20})
3905 cls.pg4._local_ip4 = "172.16.255.1"
3906 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
3907 cls.pg4.set_table_ip4(10)
3908 cls.pg5._local_ip4 = "172.17.255.3"
3909 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
3910 cls.pg5.set_table_ip4(10)
3911 cls.pg6._local_ip4 = "172.16.255.1"
3912 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
3913 cls.pg6.set_table_ip4(20)
3914 for i in cls.overlapping_interfaces:
3922 cls.pg9.generate_remote_hosts(2)
3923 cls.pg9.config_ip4()
3924 cls.vapi.sw_interface_add_del_address(
3925 sw_if_index=cls.pg9.sw_if_index,
3926 prefix="10.0.0.1/24")
3929 cls.pg9.resolve_arp()
3930 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
3931 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
3932 cls.pg9.resolve_arp()
3935 super(TestNAT44EIMW, self).setUp()
3936 self.vapi.nat44_plugin_enable_disable(
3937 sessions=self.max_translations,
3938 users=self.max_users, enable=1)
3941 super(TestNAT44EIMW, self).tearDown()
3942 if not self.vpp_dead:
3943 self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
3944 src_port=self.ipfix_src_port,
3946 self.ipfix_src_port = 4739
3947 self.ipfix_domain_id = 1
3949 self.vapi.nat44_plugin_enable_disable(enable=0)
3950 self.vapi.cli("clear logging")
3952 def test_hairpinning(self):
3953 """ NAT44EI hairpinning - 1:1 NAPT """
3955 host = self.pg0.remote_hosts[0]
3956 server = self.pg0.remote_hosts[1]
3959 server_in_port = 5678
3960 server_out_port = 8765
3964 self.nat44_add_address(self.nat_addr)
3965 flags = self.config_flags.NAT_IS_INSIDE
3966 self.vapi.nat44_interface_add_del_feature(
3967 sw_if_index=self.pg0.sw_if_index,
3968 flags=flags, is_add=1)
3969 self.vapi.nat44_interface_add_del_feature(
3970 sw_if_index=self.pg1.sw_if_index,
3973 # add static mapping for server
3974 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3975 server_in_port, server_out_port,
3976 proto=IP_PROTOS.tcp)
3978 cnt = self.statistics.get_counter('/nat44/hairpinning')
3979 # send packet from host to server
3980 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3981 IP(src=host.ip4, dst=self.nat_addr) /
3982 TCP(sport=host_in_port, dport=server_out_port))
3983 self.pg0.add_stream(p)
3984 self.pg_enable_capture(self.pg_interfaces)
3986 capture = self.pg0.get_capture(1)
3991 self.assertEqual(ip.src, self.nat_addr)
3992 self.assertEqual(ip.dst, server.ip4)
3993 self.assertNotEqual(tcp.sport, host_in_port)
3994 self.assertEqual(tcp.dport, server_in_port)
3995 self.assert_packet_checksums_valid(p)
3996 host_out_port = tcp.sport
3998 self.logger.error(ppp("Unexpected or invalid packet:", p))
4001 after = self.statistics.get_counter('/nat44/hairpinning')
4003 if_idx = self.pg0.sw_if_index
4004 self.assertEqual(after[worker_2][if_idx] - cnt[worker_1][if_idx], 1)
4006 # send reply from server to host
4007 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
4008 IP(src=server.ip4, dst=self.nat_addr) /
4009 TCP(sport=server_in_port, dport=host_out_port))
4010 self.pg0.add_stream(p)
4011 self.pg_enable_capture(self.pg_interfaces)
4013 capture = self.pg0.get_capture(1)
4018 self.assertEqual(ip.src, self.nat_addr)
4019 self.assertEqual(ip.dst, host.ip4)
4020 self.assertEqual(tcp.sport, server_out_port)
4021 self.assertEqual(tcp.dport, host_in_port)
4022 self.assert_packet_checksums_valid(p)
4024 self.logger.error(ppp("Unexpected or invalid packet:", p))
4027 after = self.statistics.get_counter('/nat44/hairpinning')
4028 if_idx = self.pg0.sw_if_index
4029 self.assertEqual(after[worker_1][if_idx] - cnt[worker_1][if_idx], 1)
4030 self.assertEqual(after[worker_2][if_idx] - cnt[worker_2][if_idx], 2)
4032 def test_hairpinning2(self):
4033 """ NAT44EI hairpinning - 1:1 NAT"""
4035 server1_nat_ip = "10.0.0.10"
4036 server2_nat_ip = "10.0.0.11"
4037 host = self.pg0.remote_hosts[0]
4038 server1 = self.pg0.remote_hosts[1]
4039 server2 = self.pg0.remote_hosts[2]
4040 server_tcp_port = 22
4041 server_udp_port = 20
4043 self.nat44_add_address(self.nat_addr)
4044 flags = self.config_flags.NAT_IS_INSIDE
4045 self.vapi.nat44_interface_add_del_feature(
4046 sw_if_index=self.pg0.sw_if_index,
4047 flags=flags, is_add=1)
4048 self.vapi.nat44_interface_add_del_feature(
4049 sw_if_index=self.pg1.sw_if_index,
4052 # add static mapping for servers
4053 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
4054 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
4058 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4059 IP(src=host.ip4, dst=server1_nat_ip) /
4060 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
4062 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4063 IP(src=host.ip4, dst=server1_nat_ip) /
4064 UDP(sport=self.udp_port_in, dport=server_udp_port))
4066 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4067 IP(src=host.ip4, dst=server1_nat_ip) /
4068 ICMP(id=self.icmp_id_in, type='echo-request'))
4070 self.pg0.add_stream(pkts)
4071 self.pg_enable_capture(self.pg_interfaces)
4073 capture = self.pg0.get_capture(len(pkts))
4074 for packet in capture:
4076 self.assertEqual(packet[IP].src, self.nat_addr)
4077 self.assertEqual(packet[IP].dst, server1.ip4)
4078 if packet.haslayer(TCP):
4079 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
4080 self.assertEqual(packet[TCP].dport, server_tcp_port)
4081 self.tcp_port_out = packet[TCP].sport
4082 self.assert_packet_checksums_valid(packet)
4083 elif packet.haslayer(UDP):
4084 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
4085 self.assertEqual(packet[UDP].dport, server_udp_port)
4086 self.udp_port_out = packet[UDP].sport
4088 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
4089 self.icmp_id_out = packet[ICMP].id
4091 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4096 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4097 IP(src=server1.ip4, dst=self.nat_addr) /
4098 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
4100 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4101 IP(src=server1.ip4, dst=self.nat_addr) /
4102 UDP(sport=server_udp_port, dport=self.udp_port_out))
4104 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4105 IP(src=server1.ip4, dst=self.nat_addr) /
4106 ICMP(id=self.icmp_id_out, type='echo-reply'))
4108 self.pg0.add_stream(pkts)
4109 self.pg_enable_capture(self.pg_interfaces)
4111 capture = self.pg0.get_capture(len(pkts))
4112 for packet in capture:
4114 self.assertEqual(packet[IP].src, server1_nat_ip)
4115 self.assertEqual(packet[IP].dst, host.ip4)
4116 if packet.haslayer(TCP):
4117 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
4118 self.assertEqual(packet[TCP].sport, server_tcp_port)
4119 self.assert_packet_checksums_valid(packet)
4120 elif packet.haslayer(UDP):
4121 self.assertEqual(packet[UDP].dport, self.udp_port_in)
4122 self.assertEqual(packet[UDP].sport, server_udp_port)
4124 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
4126 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4129 # server2 to server1
4131 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4132 IP(src=server2.ip4, dst=server1_nat_ip) /
4133 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
4135 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4136 IP(src=server2.ip4, dst=server1_nat_ip) /
4137 UDP(sport=self.udp_port_in, dport=server_udp_port))
4139 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4140 IP(src=server2.ip4, dst=server1_nat_ip) /
4141 ICMP(id=self.icmp_id_in, type='echo-request'))
4143 self.pg0.add_stream(pkts)
4144 self.pg_enable_capture(self.pg_interfaces)
4146 capture = self.pg0.get_capture(len(pkts))
4147 for packet in capture:
4149 self.assertEqual(packet[IP].src, server2_nat_ip)
4150 self.assertEqual(packet[IP].dst, server1.ip4)
4151 if packet.haslayer(TCP):
4152 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
4153 self.assertEqual(packet[TCP].dport, server_tcp_port)
4154 self.tcp_port_out = packet[TCP].sport
4155 self.assert_packet_checksums_valid(packet)
4156 elif packet.haslayer(UDP):
4157 self.assertEqual(packet[UDP].sport, self.udp_port_in)
4158 self.assertEqual(packet[UDP].dport, server_udp_port)
4159 self.udp_port_out = packet[UDP].sport
4161 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
4162 self.icmp_id_out = packet[ICMP].id
4164 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4167 # server1 to server2
4169 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4170 IP(src=server1.ip4, dst=server2_nat_ip) /
4171 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
4173 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4174 IP(src=server1.ip4, dst=server2_nat_ip) /
4175 UDP(sport=server_udp_port, dport=self.udp_port_out))
4177 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4178 IP(src=server1.ip4, dst=server2_nat_ip) /
4179 ICMP(id=self.icmp_id_out, type='echo-reply'))
4181 self.pg0.add_stream(pkts)
4182 self.pg_enable_capture(self.pg_interfaces)
4184 capture = self.pg0.get_capture(len(pkts))
4185 for packet in capture:
4187 self.assertEqual(packet[IP].src, server1_nat_ip)
4188 self.assertEqual(packet[IP].dst, server2.ip4)
4189 if packet.haslayer(TCP):
4190 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
4191 self.assertEqual(packet[TCP].sport, server_tcp_port)
4192 self.assert_packet_checksums_valid(packet)
4193 elif packet.haslayer(UDP):
4194 self.assertEqual(packet[UDP].dport, self.udp_port_in)
4195 self.assertEqual(packet[UDP].sport, server_udp_port)
4197 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
4199 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4202 if __name__ == '__main__':
4203 unittest.main(testRunner=VppTestRunner)