12 from framework import VppTestCase, VppTestRunner
13 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
14 from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
15 IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
17 from scapy.data import IP_PROTOS
18 from scapy.layers.inet import IP, TCP, UDP, ICMP
19 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
20 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
21 from scapy.layers.l2 import Ether, ARP, GRE
22 from scapy.packet import Raw
23 from syslog_rfc5424_parser import SyslogMessage, ParseError
24 from syslog_rfc5424_parser.constants import SyslogSeverity
26 from vpp_ip_route import VppIpRoute, VppRoutePath
27 from vpp_neighbor import VppNeighbor
28 from vpp_papi import VppEnum
31 # NAT HA protocol event data
34 fields_desc = [ByteEnumField("event_type", None,
35 {1: "add", 2: "del", 3: "refresh"}),
36 ByteEnumField("protocol", None,
37 {0: "other", 1: "udp", 2: "tcp", 3: "icmp"}),
38 ShortField("flags", 0),
39 IPField("in_addr", None),
40 IPField("out_addr", None),
41 ShortField("in_port", None),
42 ShortField("out_port", None),
43 IPField("eh_addr", None),
44 IPField("ehn_addr", None),
45 ShortField("eh_port", None),
46 ShortField("ehn_port", None),
47 IntField("fib_index", None),
48 IntField("total_pkts", 0),
49 LongField("total_bytes", 0)]
51 def extract_padding(self, s):
55 # NAT HA protocol header
56 class HANATStateSync(Packet):
57 name = "HA NAT state sync"
58 fields_desc = [XByteField("version", 1),
59 FlagsField("flags", 0, 8, ['ACK']),
60 FieldLenField("count", None, count_of="events"),
61 IntField("sequence_number", 1),
62 IntField("thread_index", 0),
63 PacketListField("events", [], Event,
64 count_from=lambda pkt: pkt.count)]
67 class MethodHolder(VppTestCase):
68 """ NAT create capture and verify method holder """
71 def config_flags(self):
72 return VppEnum.vl_api_nat_config_flags_t
75 def nat44_config_flags(self):
76 return VppEnum.vl_api_nat44_config_flags_t
79 def SYSLOG_SEVERITY(self):
80 return VppEnum.vl_api_syslog_severity_t
82 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
83 local_port=0, external_port=0, vrf_id=0,
84 is_add=1, external_sw_if_index=0xFFFFFFFF,
85 proto=0, tag="", flags=0):
87 Add/delete NAT44EI static mapping
89 :param local_ip: Local IP address
90 :param external_ip: External IP address
91 :param local_port: Local port number (Optional)
92 :param external_port: External port number (Optional)
93 :param vrf_id: VRF ID (Default 0)
94 :param is_add: 1 if add, 0 if delete (Default add)
95 :param external_sw_if_index: External interface instead of IP address
96 :param proto: IP protocol (Mandatory if port specified)
97 :param tag: Opaque string tag
98 :param flags: NAT configuration flags
101 if not (local_port and external_port):
102 flags |= self.config_flags.NAT_IS_ADDR_ONLY
104 self.vapi.nat44_add_del_static_mapping(
106 local_ip_address=local_ip,
107 external_ip_address=external_ip,
108 external_sw_if_index=external_sw_if_index,
109 local_port=local_port,
110 external_port=external_port,
111 vrf_id=vrf_id, protocol=proto,
115 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
117 Add/delete NAT44EI address
119 :param ip: IP address
120 :param is_add: 1 if add, 0 if delete (Default add)
121 :param twice_nat: twice NAT address for external hosts
123 flags = self.config_flags.NAT_IS_TWICE_NAT if twice_nat else 0
124 self.vapi.nat44_add_del_address_range(first_ip_address=ip,
130 def create_routes_and_neigbors(self):
131 r1 = VppIpRoute(self, self.pg7.remote_ip4, 32,
132 [VppRoutePath(self.pg7.remote_ip4,
133 self.pg7.sw_if_index)])
134 r2 = VppIpRoute(self, self.pg8.remote_ip4, 32,
135 [VppRoutePath(self.pg8.remote_ip4,
136 self.pg8.sw_if_index)])
140 n1 = VppNeighbor(self,
141 self.pg7.sw_if_index,
145 n2 = VppNeighbor(self,
146 self.pg8.sw_if_index,
153 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
155 Create packet stream for inside network
157 :param in_if: Inside interface
158 :param out_if: Outside interface
159 :param dst_ip: Destination address
160 :param ttl: TTL of generated packets
163 dst_ip = out_if.remote_ip4
167 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
168 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
169 TCP(sport=self.tcp_port_in, dport=20))
173 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
174 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
175 UDP(sport=self.udp_port_in, dport=20))
179 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
180 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
181 ICMP(id=self.icmp_id_in, type='echo-request'))
186 def compose_ip6(self, ip4, pref, plen):
188 Compose IPv4-embedded IPv6 addresses
190 :param ip4: IPv4 address
191 :param pref: IPv6 prefix
192 :param plen: IPv6 prefix length
193 :returns: IPv4-embedded IPv6 addresses
195 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
196 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
211 pref_n[10] = ip4_n[3]
215 pref_n[10] = ip4_n[2]
216 pref_n[11] = ip4_n[3]
219 pref_n[10] = ip4_n[1]
220 pref_n[11] = ip4_n[2]
221 pref_n[12] = ip4_n[3]
223 pref_n[12] = ip4_n[0]
224 pref_n[13] = ip4_n[1]
225 pref_n[14] = ip4_n[2]
226 pref_n[15] = ip4_n[3]
227 packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n])
228 return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
230 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
231 use_inside_ports=False):
233 Create packet stream for outside network
235 :param out_if: Outside interface
236 :param dst_ip: Destination IP address (Default use global NAT address)
237 :param ttl: TTL of generated packets
238 :param use_inside_ports: Use inside NAT ports as destination ports
239 instead of outside ports
242 dst_ip = self.nat_addr
243 if not use_inside_ports:
244 tcp_port = self.tcp_port_out
245 udp_port = self.udp_port_out
246 icmp_id = self.icmp_id_out
248 tcp_port = self.tcp_port_in
249 udp_port = self.udp_port_in
250 icmp_id = self.icmp_id_in
253 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
254 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
255 TCP(dport=tcp_port, sport=20))
259 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
260 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
261 UDP(dport=udp_port, sport=20))
265 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
266 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
267 ICMP(id=icmp_id, type='echo-reply'))
272 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
274 Create packet stream for outside network
276 :param out_if: Outside interface
277 :param dst_ip: Destination IP address (Default use global NAT address)
278 :param hl: HL of generated packets
282 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
283 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
284 TCP(dport=self.tcp_port_out, sport=20))
288 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
289 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
290 UDP(dport=self.udp_port_out, sport=20))
294 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
295 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
296 ICMPv6EchoReply(id=self.icmp_id_out))
301 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
302 dst_ip=None, is_ip6=False, ignore_port=False):
304 Verify captured packets on outside network
306 :param capture: Captured packets
307 :param nat_ip: Translated IP address (Default use global NAT address)
308 :param same_port: Source port number is not translated (Default False)
309 :param dst_ip: Destination IP address (Default do not verify)
310 :param is_ip6: If L3 protocol is IPv6 (Default False)
314 ICMP46 = ICMPv6EchoRequest
319 nat_ip = self.nat_addr
320 for packet in capture:
323 self.assert_packet_checksums_valid(packet)
324 self.assertEqual(packet[IP46].src, nat_ip)
325 if dst_ip is not None:
326 self.assertEqual(packet[IP46].dst, dst_ip)
327 if packet.haslayer(TCP):
331 packet[TCP].sport, self.tcp_port_in)
334 packet[TCP].sport, self.tcp_port_in)
335 self.tcp_port_out = packet[TCP].sport
336 self.assert_packet_checksums_valid(packet)
337 elif packet.haslayer(UDP):
341 packet[UDP].sport, self.udp_port_in)
344 packet[UDP].sport, self.udp_port_in)
345 self.udp_port_out = packet[UDP].sport
350 packet[ICMP46].id, self.icmp_id_in)
353 packet[ICMP46].id, self.icmp_id_in)
354 self.icmp_id_out = packet[ICMP46].id
355 self.assert_packet_checksums_valid(packet)
357 self.logger.error(ppp("Unexpected or invalid packet "
358 "(outside network):", packet))
361 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
364 Verify captured packets on outside network
366 :param capture: Captured packets
367 :param nat_ip: Translated IP address
368 :param same_port: Source port number is not translated (Default False)
369 :param dst_ip: Destination IP address (Default do not verify)
371 return self.verify_capture_out(capture, nat_ip, same_port, dst_ip,
374 def verify_capture_in(self, capture, in_if):
376 Verify captured packets on inside network
378 :param capture: Captured packets
379 :param in_if: Inside interface
381 for packet in capture:
383 self.assert_packet_checksums_valid(packet)
384 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
385 if packet.haslayer(TCP):
386 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
387 elif packet.haslayer(UDP):
388 self.assertEqual(packet[UDP].dport, self.udp_port_in)
390 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
392 self.logger.error(ppp("Unexpected or invalid packet "
393 "(inside network):", packet))
396 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
398 Verify captured packet that don't have to be translated
400 :param capture: Captured packets
401 :param ingress_if: Ingress interface
402 :param egress_if: Egress interface
404 for packet in capture:
406 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
407 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
408 if packet.haslayer(TCP):
409 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
410 elif packet.haslayer(UDP):
411 self.assertEqual(packet[UDP].sport, self.udp_port_in)
413 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
415 self.logger.error(ppp("Unexpected or invalid packet "
416 "(inside network):", packet))
419 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
422 Verify captured packets with ICMP errors on outside network
424 :param capture: Captured packets
425 :param src_ip: Translated IP address or IP address of VPP
426 (Default use global NAT address)
427 :param icmp_type: Type of error ICMP packet
428 we are expecting (Default 11)
431 src_ip = self.nat_addr
432 for packet in capture:
434 self.assertEqual(packet[IP].src, src_ip)
435 self.assertEqual(packet.haslayer(ICMP), 1)
437 self.assertEqual(icmp.type, icmp_type)
438 self.assertTrue(icmp.haslayer(IPerror))
439 inner_ip = icmp[IPerror]
440 if inner_ip.haslayer(TCPerror):
441 self.assertEqual(inner_ip[TCPerror].dport,
443 elif inner_ip.haslayer(UDPerror):
444 self.assertEqual(inner_ip[UDPerror].dport,
447 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
449 self.logger.error(ppp("Unexpected or invalid packet "
450 "(outside network):", packet))
453 def verify_capture_in_with_icmp_errors(self, capture, in_if, icmp_type=11):
455 Verify captured packets with ICMP errors on inside network
457 :param capture: Captured packets
458 :param in_if: Inside interface
459 :param icmp_type: Type of error ICMP packet
460 we are expecting (Default 11)
462 for packet in capture:
464 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
465 self.assertEqual(packet.haslayer(ICMP), 1)
467 self.assertEqual(icmp.type, icmp_type)
468 self.assertTrue(icmp.haslayer(IPerror))
469 inner_ip = icmp[IPerror]
470 if inner_ip.haslayer(TCPerror):
471 self.assertEqual(inner_ip[TCPerror].sport,
473 elif inner_ip.haslayer(UDPerror):
474 self.assertEqual(inner_ip[UDPerror].sport,
477 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
479 self.logger.error(ppp("Unexpected or invalid packet "
480 "(inside network):", packet))
483 def create_stream_frag(self, src_if, dst, sport, dport, data,
484 proto=IP_PROTOS.tcp, echo_reply=False):
486 Create fragmented packet stream
488 :param src_if: Source interface
489 :param dst: Destination IPv4 address
490 :param sport: Source port
491 :param dport: Destination port
492 :param data: Payload data
493 :param proto: protocol (TCP, UDP, ICMP)
494 :param echo_reply: use echo_reply if protocol is ICMP
497 if proto == IP_PROTOS.tcp:
498 p = (IP(src=src_if.remote_ip4, dst=dst) /
499 TCP(sport=sport, dport=dport) /
501 p = p.__class__(scapy.compat.raw(p))
502 chksum = p[TCP].chksum
503 proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
504 elif proto == IP_PROTOS.udp:
505 proto_header = UDP(sport=sport, dport=dport)
506 elif proto == IP_PROTOS.icmp:
508 proto_header = ICMP(id=sport, type='echo-request')
510 proto_header = ICMP(id=sport, type='echo-reply')
512 raise Exception("Unsupported protocol")
513 id = random.randint(0, 65535)
515 if proto == IP_PROTOS.tcp:
518 raw = Raw(data[0:16])
519 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
520 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
524 if proto == IP_PROTOS.tcp:
525 raw = Raw(data[4:20])
527 raw = Raw(data[16:32])
528 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
529 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
533 if proto == IP_PROTOS.tcp:
537 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
538 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto,
544 def reass_frags_and_verify(self, frags, src, dst):
546 Reassemble and verify fragmented packet
548 :param frags: Captured fragments
549 :param src: Source IPv4 address to verify
550 :param dst: Destination IPv4 address to verify
552 :returns: Reassembled IPv4 packet
556 self.assertEqual(p[IP].src, src)
557 self.assertEqual(p[IP].dst, dst)
558 self.assert_ip_checksum_valid(p)
559 buffer.seek(p[IP].frag * 8)
560 buffer.write(bytes(p[IP].payload))
561 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
562 proto=frags[0][IP].proto)
563 if ip.proto == IP_PROTOS.tcp:
564 p = (ip / TCP(buffer.getvalue()))
565 self.logger.debug(ppp("Reassembled:", p))
566 self.assert_tcp_checksum_valid(p)
567 elif ip.proto == IP_PROTOS.udp:
568 p = (ip / UDP(buffer.getvalue()[:8]) /
569 Raw(buffer.getvalue()[8:]))
570 elif ip.proto == IP_PROTOS.icmp:
571 p = (ip / ICMP(buffer.getvalue()))
574 def verify_ipfix_nat44_ses(self, data):
576 Verify IPFIX NAT44EI session create/delete event
578 :param data: Decoded IPFIX data records
580 nat44_ses_create_num = 0
581 nat44_ses_delete_num = 0
582 self.assertEqual(6, len(data))
585 self.assertIn(scapy.compat.orb(record[230]), [4, 5])
586 if scapy.compat.orb(record[230]) == 4:
587 nat44_ses_create_num += 1
589 nat44_ses_delete_num += 1
591 self.assertEqual(self.pg0.remote_ip4,
592 str(ipaddress.IPv4Address(record[8])))
593 # postNATSourceIPv4Address
594 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
597 self.assertEqual(struct.pack("!I", 0), record[234])
598 # protocolIdentifier/sourceTransportPort
599 # /postNAPTSourceTransportPort
600 if IP_PROTOS.icmp == scapy.compat.orb(record[4]):
601 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
602 self.assertEqual(struct.pack("!H", self.icmp_id_out),
604 elif IP_PROTOS.tcp == scapy.compat.orb(record[4]):
605 self.assertEqual(struct.pack("!H", self.tcp_port_in),
607 self.assertEqual(struct.pack("!H", self.tcp_port_out),
609 elif IP_PROTOS.udp == scapy.compat.orb(record[4]):
610 self.assertEqual(struct.pack("!H", self.udp_port_in),
612 self.assertEqual(struct.pack("!H", self.udp_port_out),
615 self.fail("Invalid protocol")
616 self.assertEqual(3, nat44_ses_create_num)
617 self.assertEqual(3, nat44_ses_delete_num)
619 def verify_ipfix_addr_exhausted(self, data):
620 self.assertEqual(1, len(data))
623 self.assertEqual(scapy.compat.orb(record[230]), 3)
625 self.assertEqual(struct.pack("!I", 0), record[283])
627 def verify_ipfix_max_sessions(self, data, limit):
628 self.assertEqual(1, len(data))
631 self.assertEqual(scapy.compat.orb(record[230]), 13)
632 # natQuotaExceededEvent
633 self.assertEqual(struct.pack("I", 1), record[466])
635 self.assertEqual(struct.pack("I", limit), record[471])
637 def verify_no_nat44_user(self):
638 """ Verify that there is no NAT44EI user """
639 users = self.vapi.nat44_user_dump()
640 self.assertEqual(len(users), 0)
641 users = self.statistics.get_counter('/nat44/total-users')
642 self.assertEqual(users[0][0], 0)
643 sessions = self.statistics.get_counter('/nat44/total-sessions')
644 self.assertEqual(sessions[0][0], 0)
646 def verify_syslog_apmap(self, data, is_add=True):
647 message = data.decode('utf-8')
649 message = SyslogMessage.parse(message)
650 except ParseError as e:
654 self.assertEqual(message.severity, SyslogSeverity.info)
655 self.assertEqual(message.appname, 'NAT')
656 self.assertEqual(message.msgid, 'APMADD' if is_add else 'APMDEL')
657 sd_params = message.sd.get('napmap')
658 self.assertTrue(sd_params is not None)
659 self.assertEqual(sd_params.get('IATYP'), 'IPv4')
660 self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip4)
661 self.assertEqual(sd_params.get('ISPORT'), "%d" % self.tcp_port_in)
662 self.assertEqual(sd_params.get('XATYP'), 'IPv4')
663 self.assertEqual(sd_params.get('XSADDR'), self.nat_addr)
664 self.assertEqual(sd_params.get('XSPORT'), "%d" % self.tcp_port_out)
665 self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp)
666 self.assertTrue(sd_params.get('SSUBIX') is not None)
667 self.assertEqual(sd_params.get('SVLAN'), '0')
669 def verify_mss_value(self, pkt, mss):
670 if not pkt.haslayer(IP) or not pkt.haslayer(TCP):
671 raise TypeError("Not a TCP/IP packet")
673 for option in pkt[TCP].options:
674 if option[0] == 'MSS':
675 self.assertEqual(option[1], mss)
676 self.assert_tcp_checksum_valid(pkt)
679 def proto2layer(proto):
680 if proto == IP_PROTOS.tcp:
682 elif proto == IP_PROTOS.udp:
684 elif proto == IP_PROTOS.icmp:
687 raise Exception("Unsupported protocol")
689 def frag_in_order(self, proto=IP_PROTOS.tcp, dont_translate=False,
691 layer = self.proto2layer(proto)
693 if proto == IP_PROTOS.tcp:
694 data = b"A" * 4 + b"B" * 16 + b"C" * 3
696 data = b"A" * 16 + b"B" * 16 + b"C" * 3
697 self.port_in = random.randint(1025, 65535)
700 pkts = self.create_stream_frag(self.pg0, self.pg1.remote_ip4,
701 self.port_in, 20, data, proto)
702 self.pg0.add_stream(pkts)
703 self.pg_enable_capture(self.pg_interfaces)
705 frags = self.pg1.get_capture(len(pkts))
706 if not dont_translate:
707 p = self.reass_frags_and_verify(frags,
711 p = self.reass_frags_and_verify(frags,
714 if proto != IP_PROTOS.icmp:
715 if not dont_translate:
716 self.assertEqual(p[layer].dport, 20)
718 self.assertNotEqual(p[layer].sport, self.port_in)
720 self.assertEqual(p[layer].sport, self.port_in)
723 if not dont_translate:
724 self.assertNotEqual(p[layer].id, self.port_in)
726 self.assertEqual(p[layer].id, self.port_in)
727 self.assertEqual(data, p[Raw].load)
730 if not dont_translate:
731 dst_addr = self.nat_addr
733 dst_addr = self.pg0.remote_ip4
734 if proto != IP_PROTOS.icmp:
736 dport = p[layer].sport
740 pkts = self.create_stream_frag(self.pg1, dst_addr, sport, dport, data,
741 proto, echo_reply=True)
742 self.pg1.add_stream(pkts)
743 self.pg_enable_capture(self.pg_interfaces)
745 frags = self.pg0.get_capture(len(pkts))
746 p = self.reass_frags_and_verify(frags,
749 if proto != IP_PROTOS.icmp:
750 self.assertEqual(p[layer].sport, 20)
751 self.assertEqual(p[layer].dport, self.port_in)
753 self.assertEqual(p[layer].id, self.port_in)
754 self.assertEqual(data, p[Raw].load)
756 def reass_hairpinning(self, server_addr, server_in_port, server_out_port,
757 host_in_port, proto=IP_PROTOS.tcp,
760 layer = self.proto2layer(proto)
762 if proto == IP_PROTOS.tcp:
763 data = b"A" * 4 + b"B" * 16 + b"C" * 3
765 data = b"A" * 16 + b"B" * 16 + b"C" * 3
767 # send packet from host to server
768 pkts = self.create_stream_frag(self.pg0,
774 self.pg0.add_stream(pkts)
775 self.pg_enable_capture(self.pg_interfaces)
777 frags = self.pg0.get_capture(len(pkts))
778 p = self.reass_frags_and_verify(frags,
781 if proto != IP_PROTOS.icmp:
783 self.assertNotEqual(p[layer].sport, host_in_port)
784 self.assertEqual(p[layer].dport, server_in_port)
787 self.assertNotEqual(p[layer].id, host_in_port)
788 self.assertEqual(data, p[Raw].load)
790 def frag_out_of_order(self, proto=IP_PROTOS.tcp, dont_translate=False,
792 layer = self.proto2layer(proto)
794 if proto == IP_PROTOS.tcp:
795 data = b"A" * 4 + b"B" * 16 + b"C" * 3
797 data = b"A" * 16 + b"B" * 16 + b"C" * 3
798 self.port_in = random.randint(1025, 65535)
802 pkts = self.create_stream_frag(self.pg0, self.pg1.remote_ip4,
803 self.port_in, 20, data, proto)
805 self.pg0.add_stream(pkts)
806 self.pg_enable_capture(self.pg_interfaces)
808 frags = self.pg1.get_capture(len(pkts))
809 if not dont_translate:
810 p = self.reass_frags_and_verify(frags,
814 p = self.reass_frags_and_verify(frags,
817 if proto != IP_PROTOS.icmp:
818 if not dont_translate:
819 self.assertEqual(p[layer].dport, 20)
821 self.assertNotEqual(p[layer].sport, self.port_in)
823 self.assertEqual(p[layer].sport, self.port_in)
826 if not dont_translate:
827 self.assertNotEqual(p[layer].id, self.port_in)
829 self.assertEqual(p[layer].id, self.port_in)
830 self.assertEqual(data, p[Raw].load)
833 if not dont_translate:
834 dst_addr = self.nat_addr
836 dst_addr = self.pg0.remote_ip4
837 if proto != IP_PROTOS.icmp:
839 dport = p[layer].sport
843 pkts = self.create_stream_frag(self.pg1, dst_addr, sport, dport,
844 data, proto, echo_reply=True)
846 self.pg1.add_stream(pkts)
847 self.pg_enable_capture(self.pg_interfaces)
849 frags = self.pg0.get_capture(len(pkts))
850 p = self.reass_frags_and_verify(frags,
853 if proto != IP_PROTOS.icmp:
854 self.assertEqual(p[layer].sport, 20)
855 self.assertEqual(p[layer].dport, self.port_in)
857 self.assertEqual(p[layer].id, self.port_in)
858 self.assertEqual(data, p[Raw].load)
861 class TestNAT44EI(MethodHolder):
862 """ NAT44EI Test Cases """
864 max_translations = 10240
869 super(TestNAT44EI, cls).setUpClass()
870 cls.vapi.cli("set log class nat level debug")
872 cls.tcp_port_in = 6303
873 cls.tcp_port_out = 6303
874 cls.udp_port_in = 6304
875 cls.udp_port_out = 6304
876 cls.icmp_id_in = 6305
877 cls.icmp_id_out = 6305
878 cls.nat_addr = '10.0.0.3'
879 cls.ipfix_src_port = 4739
880 cls.ipfix_domain_id = 1
881 cls.tcp_external_port = 80
882 cls.udp_external_port = 69
884 cls.create_pg_interfaces(range(10))
885 cls.interfaces = list(cls.pg_interfaces[0:4])
887 for i in cls.interfaces:
892 cls.pg0.generate_remote_hosts(3)
893 cls.pg0.configure_ipv4_neighbors()
895 cls.pg1.generate_remote_hosts(1)
896 cls.pg1.configure_ipv4_neighbors()
898 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
899 cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 10})
900 cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 20})
902 cls.pg4._local_ip4 = "172.16.255.1"
903 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
904 cls.pg4.set_table_ip4(10)
905 cls.pg5._local_ip4 = "172.17.255.3"
906 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
907 cls.pg5.set_table_ip4(10)
908 cls.pg6._local_ip4 = "172.16.255.1"
909 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
910 cls.pg6.set_table_ip4(20)
911 for i in cls.overlapping_interfaces:
919 cls.pg9.generate_remote_hosts(2)
921 cls.vapi.sw_interface_add_del_address(
922 sw_if_index=cls.pg9.sw_if_index,
923 prefix="10.0.0.1/24")
926 cls.pg9.resolve_arp()
927 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
928 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
929 cls.pg9.resolve_arp()
932 super(TestNAT44EI, self).setUp()
933 self.vapi.nat44_plugin_enable_disable(
934 sessions=self.max_translations,
935 users=self.max_users, enable=1)
938 super(TestNAT44EI, self).tearDown()
939 if not self.vpp_dead:
940 self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
941 src_port=self.ipfix_src_port,
943 self.ipfix_src_port = 4739
944 self.ipfix_domain_id = 1
946 self.vapi.nat44_plugin_enable_disable(enable=0)
947 self.vapi.cli("clear logging")
949 def test_clear_sessions(self):
950 """ NAT44EI session clearing test """
952 self.nat44_add_address(self.nat_addr)
953 flags = self.config_flags.NAT_IS_INSIDE
954 self.vapi.nat44_interface_add_del_feature(
955 sw_if_index=self.pg0.sw_if_index,
956 flags=flags, is_add=1)
957 self.vapi.nat44_interface_add_del_feature(
958 sw_if_index=self.pg1.sw_if_index,
961 nat_config = self.vapi.nat_show_config()
962 self.assertEqual(0, nat_config.endpoint_dependent)
964 pkts = self.create_stream_in(self.pg0, self.pg1)
965 self.pg0.add_stream(pkts)
966 self.pg_enable_capture(self.pg_interfaces)
968 capture = self.pg1.get_capture(len(pkts))
969 self.verify_capture_out(capture)
971 sessions = self.statistics.get_counter('/nat44/total-sessions')
972 self.assertTrue(sessions[0][0] > 0)
973 self.logger.info("sessions before clearing: %s" % sessions[0][0])
975 self.vapi.cli("clear nat44 sessions")
977 sessions = self.statistics.get_counter('/nat44/total-sessions')
978 self.assertEqual(sessions[0][0], 0)
979 self.logger.info("sessions after clearing: %s" % sessions[0][0])
981 def test_dynamic(self):
982 """ NAT44EI dynamic translation test """
983 self.nat44_add_address(self.nat_addr)
984 flags = self.config_flags.NAT_IS_INSIDE
985 self.vapi.nat44_interface_add_del_feature(
986 sw_if_index=self.pg0.sw_if_index,
987 flags=flags, is_add=1)
988 self.vapi.nat44_interface_add_del_feature(
989 sw_if_index=self.pg1.sw_if_index,
993 tcpn = self.statistics.get_counter('/nat44/in2out/slowpath/tcp')[0]
994 udpn = self.statistics.get_counter('/nat44/in2out/slowpath/udp')[0]
995 icmpn = self.statistics.get_counter('/nat44/in2out/slowpath/icmp')[0]
996 drops = self.statistics.get_counter('/nat44/in2out/slowpath/drops')[0]
998 pkts = self.create_stream_in(self.pg0, self.pg1)
999 self.pg0.add_stream(pkts)
1000 self.pg_enable_capture(self.pg_interfaces)
1002 capture = self.pg1.get_capture(len(pkts))
1003 self.verify_capture_out(capture)
1005 if_idx = self.pg0.sw_if_index
1006 cnt = self.statistics.get_counter('/nat44/in2out/slowpath/tcp')[0]
1007 self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
1008 cnt = self.statistics.get_counter('/nat44/in2out/slowpath/udp')[0]
1009 self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
1010 cnt = self.statistics.get_counter('/nat44/in2out/slowpath/icmp')[0]
1011 self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
1012 cnt = self.statistics.get_counter('/nat44/in2out/slowpath/drops')[0]
1013 self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
1016 tcpn = self.statistics.get_counter('/nat44/out2in/slowpath/tcp')[0]
1017 udpn = self.statistics.get_counter('/nat44/out2in/slowpath/udp')[0]
1018 icmpn = self.statistics.get_counter('/nat44/out2in/slowpath/icmp')[0]
1019 drops = self.statistics.get_counter('/nat44/out2in/slowpath/drops')[0]
1021 pkts = self.create_stream_out(self.pg1)
1022 self.pg1.add_stream(pkts)
1023 self.pg_enable_capture(self.pg_interfaces)
1025 capture = self.pg0.get_capture(len(pkts))
1026 self.verify_capture_in(capture, self.pg0)
1028 if_idx = self.pg1.sw_if_index
1029 cnt = self.statistics.get_counter('/nat44/out2in/slowpath/tcp')[0]
1030 self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
1031 cnt = self.statistics.get_counter('/nat44/out2in/slowpath/udp')[0]
1032 self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
1033 cnt = self.statistics.get_counter('/nat44/out2in/slowpath/icmp')[0]
1034 self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
1035 cnt = self.statistics.get_counter('/nat44/out2in/slowpath/drops')[0]
1036 self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
1038 users = self.statistics.get_counter('/nat44/total-users')
1039 self.assertEqual(users[0][0], 1)
1040 sessions = self.statistics.get_counter('/nat44/total-sessions')
1041 self.assertEqual(sessions[0][0], 3)
1043 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1044 """ NAT44EI handling of client packets with TTL=1 """
1046 self.nat44_add_address(self.nat_addr)
1047 flags = self.config_flags.NAT_IS_INSIDE
1048 self.vapi.nat44_interface_add_del_feature(
1049 sw_if_index=self.pg0.sw_if_index,
1050 flags=flags, is_add=1)
1051 self.vapi.nat44_interface_add_del_feature(
1052 sw_if_index=self.pg1.sw_if_index,
1055 # Client side - generate traffic
1056 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1057 self.pg0.add_stream(pkts)
1058 self.pg_enable_capture(self.pg_interfaces)
1061 # Client side - verify ICMP type 11 packets
1062 capture = self.pg0.get_capture(len(pkts))
1063 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1065 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1066 """ NAT44EI handling of server packets with TTL=1 """
1068 self.nat44_add_address(self.nat_addr)
1069 flags = self.config_flags.NAT_IS_INSIDE
1070 self.vapi.nat44_interface_add_del_feature(
1071 sw_if_index=self.pg0.sw_if_index,
1072 flags=flags, is_add=1)
1073 self.vapi.nat44_interface_add_del_feature(
1074 sw_if_index=self.pg1.sw_if_index,
1077 # Client side - create sessions
1078 pkts = self.create_stream_in(self.pg0, self.pg1)
1079 self.pg0.add_stream(pkts)
1080 self.pg_enable_capture(self.pg_interfaces)
1083 # Server side - generate traffic
1084 capture = self.pg1.get_capture(len(pkts))
1085 self.verify_capture_out(capture)
1086 pkts = self.create_stream_out(self.pg1, ttl=1)
1087 self.pg1.add_stream(pkts)
1088 self.pg_enable_capture(self.pg_interfaces)
1091 # Server side - verify ICMP type 11 packets
1092 capture = self.pg1.get_capture(len(pkts))
1093 self.verify_capture_out_with_icmp_errors(capture,
1094 src_ip=self.pg1.local_ip4)
1096 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1097 """ NAT44EI handling of error responses to client packets with TTL=2
1100 self.nat44_add_address(self.nat_addr)
1101 flags = self.config_flags.NAT_IS_INSIDE
1102 self.vapi.nat44_interface_add_del_feature(
1103 sw_if_index=self.pg0.sw_if_index,
1104 flags=flags, is_add=1)
1105 self.vapi.nat44_interface_add_del_feature(
1106 sw_if_index=self.pg1.sw_if_index,
1109 # Client side - generate traffic
1110 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1111 self.pg0.add_stream(pkts)
1112 self.pg_enable_capture(self.pg_interfaces)
1115 # Server side - simulate ICMP type 11 response
1116 capture = self.pg1.get_capture(len(pkts))
1117 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1118 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1119 ICMP(type=11) / packet[IP] for packet in capture]
1120 self.pg1.add_stream(pkts)
1121 self.pg_enable_capture(self.pg_interfaces)
1124 # Client side - verify ICMP type 11 packets
1125 capture = self.pg0.get_capture(len(pkts))
1126 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1128 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1129 """ NAT44EI handling of error responses to server packets with TTL=2
1132 self.nat44_add_address(self.nat_addr)
1133 flags = self.config_flags.NAT_IS_INSIDE
1134 self.vapi.nat44_interface_add_del_feature(
1135 sw_if_index=self.pg0.sw_if_index,
1136 flags=flags, is_add=1)
1137 self.vapi.nat44_interface_add_del_feature(
1138 sw_if_index=self.pg1.sw_if_index,
1141 # Client side - create sessions
1142 pkts = self.create_stream_in(self.pg0, self.pg1)
1143 self.pg0.add_stream(pkts)
1144 self.pg_enable_capture(self.pg_interfaces)
1147 # Server side - generate traffic
1148 capture = self.pg1.get_capture(len(pkts))
1149 self.verify_capture_out(capture)
1150 pkts = self.create_stream_out(self.pg1, ttl=2)
1151 self.pg1.add_stream(pkts)
1152 self.pg_enable_capture(self.pg_interfaces)
1155 # Client side - simulate ICMP type 11 response
1156 capture = self.pg0.get_capture(len(pkts))
1157 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1158 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1159 ICMP(type=11) / packet[IP] for packet in capture]
1160 self.pg0.add_stream(pkts)
1161 self.pg_enable_capture(self.pg_interfaces)
1164 # Server side - verify ICMP type 11 packets
1165 capture = self.pg1.get_capture(len(pkts))
1166 self.verify_capture_out_with_icmp_errors(capture)
1168 def test_ping_out_interface_from_outside(self):
1169 """ NAT44EI ping out interface from outside network """
1171 self.nat44_add_address(self.nat_addr)
1172 flags = self.config_flags.NAT_IS_INSIDE
1173 self.vapi.nat44_interface_add_del_feature(
1174 sw_if_index=self.pg0.sw_if_index,
1175 flags=flags, is_add=1)
1176 self.vapi.nat44_interface_add_del_feature(
1177 sw_if_index=self.pg1.sw_if_index,
1180 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1181 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1182 ICMP(id=self.icmp_id_out, type='echo-request'))
1184 self.pg1.add_stream(pkts)
1185 self.pg_enable_capture(self.pg_interfaces)
1187 capture = self.pg1.get_capture(len(pkts))
1190 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1191 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1192 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1193 self.assertEqual(packet[ICMP].type, 0) # echo reply
1195 self.logger.error(ppp("Unexpected or invalid packet "
1196 "(outside network):", packet))
1199 def test_ping_internal_host_from_outside(self):
1200 """ NAT44EI ping internal host from outside network """
1202 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1203 flags = self.config_flags.NAT_IS_INSIDE
1204 self.vapi.nat44_interface_add_del_feature(
1205 sw_if_index=self.pg0.sw_if_index,
1206 flags=flags, is_add=1)
1207 self.vapi.nat44_interface_add_del_feature(
1208 sw_if_index=self.pg1.sw_if_index,
1212 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1213 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1214 ICMP(id=self.icmp_id_out, type='echo-request'))
1215 self.pg1.add_stream(pkt)
1216 self.pg_enable_capture(self.pg_interfaces)
1218 capture = self.pg0.get_capture(1)
1219 self.verify_capture_in(capture, self.pg0)
1220 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1223 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1224 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1225 ICMP(id=self.icmp_id_in, type='echo-reply'))
1226 self.pg0.add_stream(pkt)
1227 self.pg_enable_capture(self.pg_interfaces)
1229 capture = self.pg1.get_capture(1)
1230 self.verify_capture_out(capture, same_port=True)
1231 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1233 def test_forwarding(self):
1234 """ NAT44EI forwarding test """
1236 flags = self.config_flags.NAT_IS_INSIDE
1237 self.vapi.nat44_interface_add_del_feature(
1238 sw_if_index=self.pg0.sw_if_index,
1239 flags=flags, is_add=1)
1240 self.vapi.nat44_interface_add_del_feature(
1241 sw_if_index=self.pg1.sw_if_index,
1243 self.vapi.nat44_forwarding_enable_disable(enable=1)
1245 real_ip = self.pg0.remote_ip4
1246 alias_ip = self.nat_addr
1247 flags = self.config_flags.NAT_IS_ADDR_ONLY
1248 self.vapi.nat44_add_del_static_mapping(is_add=1,
1249 local_ip_address=real_ip,
1250 external_ip_address=alias_ip,
1251 external_sw_if_index=0xFFFFFFFF,
1255 # static mapping match
1257 pkts = self.create_stream_out(self.pg1)
1258 self.pg1.add_stream(pkts)
1259 self.pg_enable_capture(self.pg_interfaces)
1261 capture = self.pg0.get_capture(len(pkts))
1262 self.verify_capture_in(capture, self.pg0)
1264 pkts = self.create_stream_in(self.pg0, self.pg1)
1265 self.pg0.add_stream(pkts)
1266 self.pg_enable_capture(self.pg_interfaces)
1268 capture = self.pg1.get_capture(len(pkts))
1269 self.verify_capture_out(capture, same_port=True)
1271 # no static mapping match
1273 host0 = self.pg0.remote_hosts[0]
1274 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1276 pkts = self.create_stream_out(self.pg1,
1277 dst_ip=self.pg0.remote_ip4,
1278 use_inside_ports=True)
1279 self.pg1.add_stream(pkts)
1280 self.pg_enable_capture(self.pg_interfaces)
1282 capture = self.pg0.get_capture(len(pkts))
1283 self.verify_capture_in(capture, self.pg0)
1285 pkts = self.create_stream_in(self.pg0, self.pg1)
1286 self.pg0.add_stream(pkts)
1287 self.pg_enable_capture(self.pg_interfaces)
1289 capture = self.pg1.get_capture(len(pkts))
1290 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1293 self.pg0.remote_hosts[0] = host0
1296 self.vapi.nat44_forwarding_enable_disable(enable=0)
1297 flags = self.config_flags.NAT_IS_ADDR_ONLY
1298 self.vapi.nat44_add_del_static_mapping(
1300 local_ip_address=real_ip,
1301 external_ip_address=alias_ip,
1302 external_sw_if_index=0xFFFFFFFF,
1305 def test_static_in(self):
1306 """ NAT44EI 1:1 NAT initialized from inside network """
1308 nat_ip = "10.0.0.10"
1309 self.tcp_port_out = 6303
1310 self.udp_port_out = 6304
1311 self.icmp_id_out = 6305
1313 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1314 flags = self.config_flags.NAT_IS_INSIDE
1315 self.vapi.nat44_interface_add_del_feature(
1316 sw_if_index=self.pg0.sw_if_index,
1317 flags=flags, is_add=1)
1318 self.vapi.nat44_interface_add_del_feature(
1319 sw_if_index=self.pg1.sw_if_index,
1321 sm = self.vapi.nat44_static_mapping_dump()
1322 self.assertEqual(len(sm), 1)
1323 self.assertEqual(sm[0].tag, '')
1324 self.assertEqual(sm[0].protocol, 0)
1325 self.assertEqual(sm[0].local_port, 0)
1326 self.assertEqual(sm[0].external_port, 0)
1329 pkts = self.create_stream_in(self.pg0, self.pg1)
1330 self.pg0.add_stream(pkts)
1331 self.pg_enable_capture(self.pg_interfaces)
1333 capture = self.pg1.get_capture(len(pkts))
1334 self.verify_capture_out(capture, nat_ip, True)
1337 pkts = self.create_stream_out(self.pg1, nat_ip)
1338 self.pg1.add_stream(pkts)
1339 self.pg_enable_capture(self.pg_interfaces)
1341 capture = self.pg0.get_capture(len(pkts))
1342 self.verify_capture_in(capture, self.pg0)
1344 def test_static_out(self):
1345 """ NAT44EI 1:1 NAT initialized from outside network """
1347 nat_ip = "10.0.0.20"
1348 self.tcp_port_out = 6303
1349 self.udp_port_out = 6304
1350 self.icmp_id_out = 6305
1353 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1354 flags = self.config_flags.NAT_IS_INSIDE
1355 self.vapi.nat44_interface_add_del_feature(
1356 sw_if_index=self.pg0.sw_if_index,
1357 flags=flags, is_add=1)
1358 self.vapi.nat44_interface_add_del_feature(
1359 sw_if_index=self.pg1.sw_if_index,
1361 sm = self.vapi.nat44_static_mapping_dump()
1362 self.assertEqual(len(sm), 1)
1363 self.assertEqual(sm[0].tag, tag)
1366 pkts = self.create_stream_out(self.pg1, nat_ip)
1367 self.pg1.add_stream(pkts)
1368 self.pg_enable_capture(self.pg_interfaces)
1370 capture = self.pg0.get_capture(len(pkts))
1371 self.verify_capture_in(capture, self.pg0)
1374 pkts = self.create_stream_in(self.pg0, self.pg1)
1375 self.pg0.add_stream(pkts)
1376 self.pg_enable_capture(self.pg_interfaces)
1378 capture = self.pg1.get_capture(len(pkts))
1379 self.verify_capture_out(capture, nat_ip, True)
1381 def test_static_with_port_in(self):
1382 """ NAT44EI 1:1 NAPT initialized from inside network """
1384 self.tcp_port_out = 3606
1385 self.udp_port_out = 3607
1386 self.icmp_id_out = 3608
1388 self.nat44_add_address(self.nat_addr)
1389 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1390 self.tcp_port_in, self.tcp_port_out,
1391 proto=IP_PROTOS.tcp)
1392 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1393 self.udp_port_in, self.udp_port_out,
1394 proto=IP_PROTOS.udp)
1395 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1396 self.icmp_id_in, self.icmp_id_out,
1397 proto=IP_PROTOS.icmp)
1398 flags = self.config_flags.NAT_IS_INSIDE
1399 self.vapi.nat44_interface_add_del_feature(
1400 sw_if_index=self.pg0.sw_if_index,
1401 flags=flags, is_add=1)
1402 self.vapi.nat44_interface_add_del_feature(
1403 sw_if_index=self.pg1.sw_if_index,
1407 pkts = self.create_stream_in(self.pg0, self.pg1)
1408 self.pg0.add_stream(pkts)
1409 self.pg_enable_capture(self.pg_interfaces)
1411 capture = self.pg1.get_capture(len(pkts))
1412 self.verify_capture_out(capture)
1415 pkts = self.create_stream_out(self.pg1)
1416 self.pg1.add_stream(pkts)
1417 self.pg_enable_capture(self.pg_interfaces)
1419 capture = self.pg0.get_capture(len(pkts))
1420 self.verify_capture_in(capture, self.pg0)
1422 def test_static_with_port_out(self):
1423 """ NAT44EI 1:1 NAPT initialized from outside network """
1425 self.tcp_port_out = 30606
1426 self.udp_port_out = 30607
1427 self.icmp_id_out = 30608
1429 self.nat44_add_address(self.nat_addr)
1430 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1431 self.tcp_port_in, self.tcp_port_out,
1432 proto=IP_PROTOS.tcp)
1433 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1434 self.udp_port_in, self.udp_port_out,
1435 proto=IP_PROTOS.udp)
1436 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1437 self.icmp_id_in, self.icmp_id_out,
1438 proto=IP_PROTOS.icmp)
1439 flags = self.config_flags.NAT_IS_INSIDE
1440 self.vapi.nat44_interface_add_del_feature(
1441 sw_if_index=self.pg0.sw_if_index,
1442 flags=flags, is_add=1)
1443 self.vapi.nat44_interface_add_del_feature(
1444 sw_if_index=self.pg1.sw_if_index,
1448 pkts = self.create_stream_out(self.pg1)
1449 self.pg1.add_stream(pkts)
1450 self.pg_enable_capture(self.pg_interfaces)
1452 capture = self.pg0.get_capture(len(pkts))
1453 self.verify_capture_in(capture, self.pg0)
1456 pkts = self.create_stream_in(self.pg0, self.pg1)
1457 self.pg0.add_stream(pkts)
1458 self.pg_enable_capture(self.pg_interfaces)
1460 capture = self.pg1.get_capture(len(pkts))
1461 self.verify_capture_out(capture)
1463 def test_static_vrf_aware(self):
1464 """ NAT44EI 1:1 NAT VRF awareness """
1466 nat_ip1 = "10.0.0.30"
1467 nat_ip2 = "10.0.0.40"
1468 self.tcp_port_out = 6303
1469 self.udp_port_out = 6304
1470 self.icmp_id_out = 6305
1472 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1474 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1476 flags = self.config_flags.NAT_IS_INSIDE
1477 self.vapi.nat44_interface_add_del_feature(
1478 sw_if_index=self.pg3.sw_if_index,
1480 self.vapi.nat44_interface_add_del_feature(
1481 sw_if_index=self.pg0.sw_if_index,
1482 flags=flags, is_add=1)
1483 self.vapi.nat44_interface_add_del_feature(
1484 sw_if_index=self.pg4.sw_if_index,
1485 flags=flags, is_add=1)
1487 # inside interface VRF match NAT44EI static mapping VRF
1488 pkts = self.create_stream_in(self.pg4, self.pg3)
1489 self.pg4.add_stream(pkts)
1490 self.pg_enable_capture(self.pg_interfaces)
1492 capture = self.pg3.get_capture(len(pkts))
1493 self.verify_capture_out(capture, nat_ip1, True)
1495 # inside interface VRF don't match NAT44EI static mapping VRF (packets
1497 pkts = self.create_stream_in(self.pg0, self.pg3)
1498 self.pg0.add_stream(pkts)
1499 self.pg_enable_capture(self.pg_interfaces)
1501 self.pg3.assert_nothing_captured()
1503 def test_dynamic_to_static(self):
1504 """ NAT44EI Switch from dynamic translation to 1:1NAT """
1505 nat_ip = "10.0.0.10"
1506 self.tcp_port_out = 6303
1507 self.udp_port_out = 6304
1508 self.icmp_id_out = 6305
1510 self.nat44_add_address(self.nat_addr)
1511 flags = self.config_flags.NAT_IS_INSIDE
1512 self.vapi.nat44_interface_add_del_feature(
1513 sw_if_index=self.pg0.sw_if_index,
1514 flags=flags, is_add=1)
1515 self.vapi.nat44_interface_add_del_feature(
1516 sw_if_index=self.pg1.sw_if_index,
1520 pkts = self.create_stream_in(self.pg0, self.pg1)
1521 self.pg0.add_stream(pkts)
1522 self.pg_enable_capture(self.pg_interfaces)
1524 capture = self.pg1.get_capture(len(pkts))
1525 self.verify_capture_out(capture)
1528 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1529 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
1530 self.assertEqual(len(sessions), 0)
1531 pkts = self.create_stream_in(self.pg0, self.pg1)
1532 self.pg0.add_stream(pkts)
1533 self.pg_enable_capture(self.pg_interfaces)
1535 capture = self.pg1.get_capture(len(pkts))
1536 self.verify_capture_out(capture, nat_ip, True)
1538 def test_identity_nat(self):
1539 """ NAT44EI Identity NAT """
1540 flags = self.config_flags.NAT_IS_ADDR_ONLY
1541 self.vapi.nat44_add_del_identity_mapping(
1542 ip_address=self.pg0.remote_ip4, sw_if_index=0xFFFFFFFF,
1543 flags=flags, is_add=1)
1544 flags = self.config_flags.NAT_IS_INSIDE
1545 self.vapi.nat44_interface_add_del_feature(
1546 sw_if_index=self.pg0.sw_if_index,
1547 flags=flags, is_add=1)
1548 self.vapi.nat44_interface_add_del_feature(
1549 sw_if_index=self.pg1.sw_if_index,
1552 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1553 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1554 TCP(sport=12345, dport=56789))
1555 self.pg1.add_stream(p)
1556 self.pg_enable_capture(self.pg_interfaces)
1558 capture = self.pg0.get_capture(1)
1563 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1564 self.assertEqual(ip.src, self.pg1.remote_ip4)
1565 self.assertEqual(tcp.dport, 56789)
1566 self.assertEqual(tcp.sport, 12345)
1567 self.assert_packet_checksums_valid(p)
1569 self.logger.error(ppp("Unexpected or invalid packet:", p))
1572 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
1573 self.assertEqual(len(sessions), 0)
1574 flags = self.config_flags.NAT_IS_ADDR_ONLY
1575 self.vapi.nat44_add_del_identity_mapping(
1576 ip_address=self.pg0.remote_ip4, sw_if_index=0xFFFFFFFF,
1577 flags=flags, vrf_id=1, is_add=1)
1578 identity_mappings = self.vapi.nat44_identity_mapping_dump()
1579 self.assertEqual(len(identity_mappings), 2)
1581 def test_multiple_inside_interfaces(self):
1582 """ NAT44EI multiple non-overlapping address space inside interfaces
1585 self.nat44_add_address(self.nat_addr)
1586 flags = self.config_flags.NAT_IS_INSIDE
1587 self.vapi.nat44_interface_add_del_feature(
1588 sw_if_index=self.pg0.sw_if_index,
1589 flags=flags, is_add=1)
1590 self.vapi.nat44_interface_add_del_feature(
1591 sw_if_index=self.pg1.sw_if_index,
1592 flags=flags, is_add=1)
1593 self.vapi.nat44_interface_add_del_feature(
1594 sw_if_index=self.pg3.sw_if_index,
1597 # between two NAT44EI inside interfaces (no translation)
1598 pkts = self.create_stream_in(self.pg0, self.pg1)
1599 self.pg0.add_stream(pkts)
1600 self.pg_enable_capture(self.pg_interfaces)
1602 capture = self.pg1.get_capture(len(pkts))
1603 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1605 # from inside to interface without translation
1606 pkts = self.create_stream_in(self.pg0, self.pg2)
1607 self.pg0.add_stream(pkts)
1608 self.pg_enable_capture(self.pg_interfaces)
1610 capture = self.pg2.get_capture(len(pkts))
1611 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1613 # in2out 1st interface
1614 pkts = self.create_stream_in(self.pg0, self.pg3)
1615 self.pg0.add_stream(pkts)
1616 self.pg_enable_capture(self.pg_interfaces)
1618 capture = self.pg3.get_capture(len(pkts))
1619 self.verify_capture_out(capture)
1621 # out2in 1st interface
1622 pkts = self.create_stream_out(self.pg3)
1623 self.pg3.add_stream(pkts)
1624 self.pg_enable_capture(self.pg_interfaces)
1626 capture = self.pg0.get_capture(len(pkts))
1627 self.verify_capture_in(capture, self.pg0)
1629 # in2out 2nd interface
1630 pkts = self.create_stream_in(self.pg1, self.pg3)
1631 self.pg1.add_stream(pkts)
1632 self.pg_enable_capture(self.pg_interfaces)
1634 capture = self.pg3.get_capture(len(pkts))
1635 self.verify_capture_out(capture)
1637 # out2in 2nd interface
1638 pkts = self.create_stream_out(self.pg3)
1639 self.pg3.add_stream(pkts)
1640 self.pg_enable_capture(self.pg_interfaces)
1642 capture = self.pg1.get_capture(len(pkts))
1643 self.verify_capture_in(capture, self.pg1)
1645 def test_inside_overlapping_interfaces(self):
1646 """ NAT44EI multiple inside interfaces with overlapping address space
1649 static_nat_ip = "10.0.0.10"
1650 self.nat44_add_address(self.nat_addr)
1651 flags = self.config_flags.NAT_IS_INSIDE
1652 self.vapi.nat44_interface_add_del_feature(
1653 sw_if_index=self.pg3.sw_if_index,
1655 self.vapi.nat44_interface_add_del_feature(
1656 sw_if_index=self.pg4.sw_if_index,
1657 flags=flags, is_add=1)
1658 self.vapi.nat44_interface_add_del_feature(
1659 sw_if_index=self.pg5.sw_if_index,
1660 flags=flags, is_add=1)
1661 self.vapi.nat44_interface_add_del_feature(
1662 sw_if_index=self.pg6.sw_if_index,
1663 flags=flags, is_add=1)
1664 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1667 # between NAT44EI inside interfaces with same VRF (no translation)
1668 pkts = self.create_stream_in(self.pg4, self.pg5)
1669 self.pg4.add_stream(pkts)
1670 self.pg_enable_capture(self.pg_interfaces)
1672 capture = self.pg5.get_capture(len(pkts))
1673 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1675 # between NAT44EI inside interfaces with different VRF (hairpinning)
1676 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1677 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1678 TCP(sport=1234, dport=5678))
1679 self.pg4.add_stream(p)
1680 self.pg_enable_capture(self.pg_interfaces)
1682 capture = self.pg6.get_capture(1)
1687 self.assertEqual(ip.src, self.nat_addr)
1688 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1689 self.assertNotEqual(tcp.sport, 1234)
1690 self.assertEqual(tcp.dport, 5678)
1692 self.logger.error(ppp("Unexpected or invalid packet:", p))
1695 # in2out 1st interface
1696 pkts = self.create_stream_in(self.pg4, self.pg3)
1697 self.pg4.add_stream(pkts)
1698 self.pg_enable_capture(self.pg_interfaces)
1700 capture = self.pg3.get_capture(len(pkts))
1701 self.verify_capture_out(capture)
1703 # out2in 1st interface
1704 pkts = self.create_stream_out(self.pg3)
1705 self.pg3.add_stream(pkts)
1706 self.pg_enable_capture(self.pg_interfaces)
1708 capture = self.pg4.get_capture(len(pkts))
1709 self.verify_capture_in(capture, self.pg4)
1711 # in2out 2nd interface
1712 pkts = self.create_stream_in(self.pg5, self.pg3)
1713 self.pg5.add_stream(pkts)
1714 self.pg_enable_capture(self.pg_interfaces)
1716 capture = self.pg3.get_capture(len(pkts))
1717 self.verify_capture_out(capture)
1719 # out2in 2nd interface
1720 pkts = self.create_stream_out(self.pg3)
1721 self.pg3.add_stream(pkts)
1722 self.pg_enable_capture(self.pg_interfaces)
1724 capture = self.pg5.get_capture(len(pkts))
1725 self.verify_capture_in(capture, self.pg5)
1728 addresses = self.vapi.nat44_address_dump()
1729 self.assertEqual(len(addresses), 1)
1730 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4, 10)
1731 self.assertEqual(len(sessions), 3)
1732 for session in sessions:
1733 self.assertFalse(session.flags & self.config_flags.NAT_IS_STATIC)
1734 self.assertEqual(str(session.inside_ip_address),
1735 self.pg5.remote_ip4)
1736 self.assertEqual(session.outside_ip_address,
1737 addresses[0].ip_address)
1738 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1739 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1740 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1741 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1742 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1743 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1744 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1745 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1746 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1748 # in2out 3rd interface
1749 pkts = self.create_stream_in(self.pg6, self.pg3)
1750 self.pg6.add_stream(pkts)
1751 self.pg_enable_capture(self.pg_interfaces)
1753 capture = self.pg3.get_capture(len(pkts))
1754 self.verify_capture_out(capture, static_nat_ip, True)
1756 # out2in 3rd interface
1757 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1758 self.pg3.add_stream(pkts)
1759 self.pg_enable_capture(self.pg_interfaces)
1761 capture = self.pg6.get_capture(len(pkts))
1762 self.verify_capture_in(capture, self.pg6)
1764 # general user and session dump verifications
1765 users = self.vapi.nat44_user_dump()
1766 self.assertGreaterEqual(len(users), 3)
1767 addresses = self.vapi.nat44_address_dump()
1768 self.assertEqual(len(addresses), 1)
1770 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1772 for session in sessions:
1773 self.assertEqual(user.ip_address, session.inside_ip_address)
1774 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1775 self.assertTrue(session.protocol in
1776 [IP_PROTOS.tcp, IP_PROTOS.udp,
1778 self.assertFalse(session.flags &
1779 self.config_flags.NAT_IS_EXT_HOST_VALID)
1782 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4, 10)
1783 self.assertGreaterEqual(len(sessions), 4)
1784 for session in sessions:
1785 self.assertFalse(session.flags & self.config_flags.NAT_IS_STATIC)
1786 self.assertEqual(str(session.inside_ip_address),
1787 self.pg4.remote_ip4)
1788 self.assertEqual(session.outside_ip_address,
1789 addresses[0].ip_address)
1792 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4, 20)
1793 self.assertGreaterEqual(len(sessions), 3)
1794 for session in sessions:
1795 self.assertTrue(session.flags & self.config_flags.NAT_IS_STATIC)
1796 self.assertEqual(str(session.inside_ip_address),
1797 self.pg6.remote_ip4)
1798 self.assertEqual(str(session.outside_ip_address),
1800 self.assertTrue(session.inside_port in
1801 [self.tcp_port_in, self.udp_port_in,
1804 def test_hairpinning(self):
1805 """ NAT44EI hairpinning - 1:1 NAPT """
1807 host = self.pg0.remote_hosts[0]
1808 server = self.pg0.remote_hosts[1]
1811 server_in_port = 5678
1812 server_out_port = 8765
1814 self.nat44_add_address(self.nat_addr)
1815 flags = self.config_flags.NAT_IS_INSIDE
1816 self.vapi.nat44_interface_add_del_feature(
1817 sw_if_index=self.pg0.sw_if_index,
1818 flags=flags, is_add=1)
1819 self.vapi.nat44_interface_add_del_feature(
1820 sw_if_index=self.pg1.sw_if_index,
1823 # add static mapping for server
1824 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1825 server_in_port, server_out_port,
1826 proto=IP_PROTOS.tcp)
1828 cnt = self.statistics.get_counter('/nat44/hairpinning')[0]
1829 # send packet from host to server
1830 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1831 IP(src=host.ip4, dst=self.nat_addr) /
1832 TCP(sport=host_in_port, dport=server_out_port))
1833 self.pg0.add_stream(p)
1834 self.pg_enable_capture(self.pg_interfaces)
1836 capture = self.pg0.get_capture(1)
1841 self.assertEqual(ip.src, self.nat_addr)
1842 self.assertEqual(ip.dst, server.ip4)
1843 self.assertNotEqual(tcp.sport, host_in_port)
1844 self.assertEqual(tcp.dport, server_in_port)
1845 self.assert_packet_checksums_valid(p)
1846 host_out_port = tcp.sport
1848 self.logger.error(ppp("Unexpected or invalid packet:", p))
1851 after = self.statistics.get_counter('/nat44/hairpinning')[0]
1852 if_idx = self.pg0.sw_if_index
1853 self.assertEqual(after[if_idx] - cnt[if_idx], 1)
1855 # send reply from server to host
1856 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1857 IP(src=server.ip4, dst=self.nat_addr) /
1858 TCP(sport=server_in_port, dport=host_out_port))
1859 self.pg0.add_stream(p)
1860 self.pg_enable_capture(self.pg_interfaces)
1862 capture = self.pg0.get_capture(1)
1867 self.assertEqual(ip.src, self.nat_addr)
1868 self.assertEqual(ip.dst, host.ip4)
1869 self.assertEqual(tcp.sport, server_out_port)
1870 self.assertEqual(tcp.dport, host_in_port)
1871 self.assert_packet_checksums_valid(p)
1873 self.logger.error(ppp("Unexpected or invalid packet:", p))
1876 after = self.statistics.get_counter('/nat44/hairpinning')[0]
1877 if_idx = self.pg0.sw_if_index
1878 self.assertEqual(after[if_idx] - cnt[if_idx], 2)
1880 def test_hairpinning2(self):
1881 """ NAT44EI hairpinning - 1:1 NAT"""
1883 server1_nat_ip = "10.0.0.10"
1884 server2_nat_ip = "10.0.0.11"
1885 host = self.pg0.remote_hosts[0]
1886 server1 = self.pg0.remote_hosts[1]
1887 server2 = self.pg0.remote_hosts[2]
1888 server_tcp_port = 22
1889 server_udp_port = 20
1891 self.nat44_add_address(self.nat_addr)
1892 flags = self.config_flags.NAT_IS_INSIDE
1893 self.vapi.nat44_interface_add_del_feature(
1894 sw_if_index=self.pg0.sw_if_index,
1895 flags=flags, is_add=1)
1896 self.vapi.nat44_interface_add_del_feature(
1897 sw_if_index=self.pg1.sw_if_index,
1900 # add static mapping for servers
1901 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1902 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1906 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1907 IP(src=host.ip4, dst=server1_nat_ip) /
1908 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1910 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1911 IP(src=host.ip4, dst=server1_nat_ip) /
1912 UDP(sport=self.udp_port_in, dport=server_udp_port))
1914 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1915 IP(src=host.ip4, dst=server1_nat_ip) /
1916 ICMP(id=self.icmp_id_in, type='echo-request'))
1918 self.pg0.add_stream(pkts)
1919 self.pg_enable_capture(self.pg_interfaces)
1921 capture = self.pg0.get_capture(len(pkts))
1922 for packet in capture:
1924 self.assertEqual(packet[IP].src, self.nat_addr)
1925 self.assertEqual(packet[IP].dst, server1.ip4)
1926 if packet.haslayer(TCP):
1927 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1928 self.assertEqual(packet[TCP].dport, server_tcp_port)
1929 self.tcp_port_out = packet[TCP].sport
1930 self.assert_packet_checksums_valid(packet)
1931 elif packet.haslayer(UDP):
1932 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1933 self.assertEqual(packet[UDP].dport, server_udp_port)
1934 self.udp_port_out = packet[UDP].sport
1936 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1937 self.icmp_id_out = packet[ICMP].id
1939 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1944 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1945 IP(src=server1.ip4, dst=self.nat_addr) /
1946 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1948 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1949 IP(src=server1.ip4, dst=self.nat_addr) /
1950 UDP(sport=server_udp_port, dport=self.udp_port_out))
1952 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1953 IP(src=server1.ip4, dst=self.nat_addr) /
1954 ICMP(id=self.icmp_id_out, type='echo-reply'))
1956 self.pg0.add_stream(pkts)
1957 self.pg_enable_capture(self.pg_interfaces)
1959 capture = self.pg0.get_capture(len(pkts))
1960 for packet in capture:
1962 self.assertEqual(packet[IP].src, server1_nat_ip)
1963 self.assertEqual(packet[IP].dst, host.ip4)
1964 if packet.haslayer(TCP):
1965 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1966 self.assertEqual(packet[TCP].sport, server_tcp_port)
1967 self.assert_packet_checksums_valid(packet)
1968 elif packet.haslayer(UDP):
1969 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1970 self.assertEqual(packet[UDP].sport, server_udp_port)
1972 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1974 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1977 # server2 to server1
1979 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1980 IP(src=server2.ip4, dst=server1_nat_ip) /
1981 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1983 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1984 IP(src=server2.ip4, dst=server1_nat_ip) /
1985 UDP(sport=self.udp_port_in, dport=server_udp_port))
1987 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1988 IP(src=server2.ip4, dst=server1_nat_ip) /
1989 ICMP(id=self.icmp_id_in, type='echo-request'))
1991 self.pg0.add_stream(pkts)
1992 self.pg_enable_capture(self.pg_interfaces)
1994 capture = self.pg0.get_capture(len(pkts))
1995 for packet in capture:
1997 self.assertEqual(packet[IP].src, server2_nat_ip)
1998 self.assertEqual(packet[IP].dst, server1.ip4)
1999 if packet.haslayer(TCP):
2000 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2001 self.assertEqual(packet[TCP].dport, server_tcp_port)
2002 self.tcp_port_out = packet[TCP].sport
2003 self.assert_packet_checksums_valid(packet)
2004 elif packet.haslayer(UDP):
2005 self.assertEqual(packet[UDP].sport, self.udp_port_in)
2006 self.assertEqual(packet[UDP].dport, server_udp_port)
2007 self.udp_port_out = packet[UDP].sport
2009 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2010 self.icmp_id_out = packet[ICMP].id
2012 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2015 # server1 to server2
2017 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2018 IP(src=server1.ip4, dst=server2_nat_ip) /
2019 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2021 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2022 IP(src=server1.ip4, dst=server2_nat_ip) /
2023 UDP(sport=server_udp_port, dport=self.udp_port_out))
2025 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2026 IP(src=server1.ip4, dst=server2_nat_ip) /
2027 ICMP(id=self.icmp_id_out, type='echo-reply'))
2029 self.pg0.add_stream(pkts)
2030 self.pg_enable_capture(self.pg_interfaces)
2032 capture = self.pg0.get_capture(len(pkts))
2033 for packet in capture:
2035 self.assertEqual(packet[IP].src, server1_nat_ip)
2036 self.assertEqual(packet[IP].dst, server2.ip4)
2037 if packet.haslayer(TCP):
2038 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2039 self.assertEqual(packet[TCP].sport, server_tcp_port)
2040 self.assert_packet_checksums_valid(packet)
2041 elif packet.haslayer(UDP):
2042 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2043 self.assertEqual(packet[UDP].sport, server_udp_port)
2045 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2047 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2050 def test_hairpinning_avoid_inf_loop(self):
2051 """ NAT44 hairpinning - 1:1 NAPT avoid infinite loop """
2053 host = self.pg0.remote_hosts[0]
2054 server = self.pg0.remote_hosts[1]
2057 server_in_port = 5678
2058 server_out_port = 8765
2060 self.nat44_add_address(self.nat_addr)
2061 flags = self.config_flags.NAT_IS_INSIDE
2062 self.vapi.nat44_interface_add_del_feature(
2063 sw_if_index=self.pg0.sw_if_index,
2064 flags=flags, is_add=1)
2065 self.vapi.nat44_interface_add_del_feature(
2066 sw_if_index=self.pg1.sw_if_index,
2069 # add static mapping for server
2070 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2071 server_in_port, server_out_port,
2072 proto=IP_PROTOS.tcp)
2074 # add another static mapping that maps pg0.local_ip4 address to itself
2075 self.nat44_add_static_mapping(self.pg0.local_ip4, self.pg0.local_ip4)
2077 # send packet from host to VPP (the packet should get dropped)
2078 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2079 IP(src=host.ip4, dst=self.pg0.local_ip4) /
2080 TCP(sport=host_in_port, dport=server_out_port))
2081 self.pg0.add_stream(p)
2082 self.pg_enable_capture(self.pg_interfaces)
2084 # Here VPP used to crash due to an infinite loop
2086 cnt = self.statistics.get_counter('/nat44/hairpinning')[0]
2087 # send packet from host to server
2088 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2089 IP(src=host.ip4, dst=self.nat_addr) /
2090 TCP(sport=host_in_port, dport=server_out_port))
2091 self.pg0.add_stream(p)
2092 self.pg_enable_capture(self.pg_interfaces)
2094 capture = self.pg0.get_capture(1)
2099 self.assertEqual(ip.src, self.nat_addr)
2100 self.assertEqual(ip.dst, server.ip4)
2101 self.assertNotEqual(tcp.sport, host_in_port)
2102 self.assertEqual(tcp.dport, server_in_port)
2103 self.assert_packet_checksums_valid(p)
2104 host_out_port = tcp.sport
2106 self.logger.error(ppp("Unexpected or invalid packet:", p))
2109 after = self.statistics.get_counter('/nat44/hairpinning')[0]
2110 if_idx = self.pg0.sw_if_index
2111 self.assertEqual(after[if_idx] - cnt[if_idx], 1)
2113 # send reply from server to host
2114 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2115 IP(src=server.ip4, dst=self.nat_addr) /
2116 TCP(sport=server_in_port, dport=host_out_port))
2117 self.pg0.add_stream(p)
2118 self.pg_enable_capture(self.pg_interfaces)
2120 capture = self.pg0.get_capture(1)
2125 self.assertEqual(ip.src, self.nat_addr)
2126 self.assertEqual(ip.dst, host.ip4)
2127 self.assertEqual(tcp.sport, server_out_port)
2128 self.assertEqual(tcp.dport, host_in_port)
2129 self.assert_packet_checksums_valid(p)
2131 self.logger.error(ppp("Unexpected or invalid packet:", p))
2134 after = self.statistics.get_counter('/nat44/hairpinning')[0]
2135 if_idx = self.pg0.sw_if_index
2136 self.assertEqual(after[if_idx] - cnt[if_idx], 2)
2138 def test_interface_addr(self):
2139 """ NAT44EI acquire addresses from interface """
2140 self.vapi.nat44_add_del_interface_addr(
2142 sw_if_index=self.pg7.sw_if_index)
2144 # no address in NAT pool
2145 addresses = self.vapi.nat44_address_dump()
2146 self.assertEqual(0, len(addresses))
2148 # configure interface address and check NAT address pool
2149 self.pg7.config_ip4()
2150 addresses = self.vapi.nat44_address_dump()
2151 self.assertEqual(1, len(addresses))
2152 self.assertEqual(str(addresses[0].ip_address), self.pg7.local_ip4)
2154 # remove interface address and check NAT address pool
2155 self.pg7.unconfig_ip4()
2156 addresses = self.vapi.nat44_address_dump()
2157 self.assertEqual(0, len(addresses))
2159 def test_interface_addr_static_mapping(self):
2160 """ NAT44EI Static mapping with addresses from interface """
2163 self.vapi.nat44_add_del_interface_addr(
2165 sw_if_index=self.pg7.sw_if_index)
2166 self.nat44_add_static_mapping(
2168 external_sw_if_index=self.pg7.sw_if_index,
2171 # static mappings with external interface
2172 static_mappings = self.vapi.nat44_static_mapping_dump()
2173 self.assertEqual(1, len(static_mappings))
2174 self.assertEqual(self.pg7.sw_if_index,
2175 static_mappings[0].external_sw_if_index)
2176 self.assertEqual(static_mappings[0].tag, tag)
2178 # configure interface address and check static mappings
2179 self.pg7.config_ip4()
2180 static_mappings = self.vapi.nat44_static_mapping_dump()
2181 self.assertEqual(2, len(static_mappings))
2183 for sm in static_mappings:
2184 if sm.external_sw_if_index == 0xFFFFFFFF:
2185 self.assertEqual(str(sm.external_ip_address),
2187 self.assertEqual(sm.tag, tag)
2189 self.assertTrue(resolved)
2191 # remove interface address and check static mappings
2192 self.pg7.unconfig_ip4()
2193 static_mappings = self.vapi.nat44_static_mapping_dump()
2194 self.assertEqual(1, len(static_mappings))
2195 self.assertEqual(self.pg7.sw_if_index,
2196 static_mappings[0].external_sw_if_index)
2197 self.assertEqual(static_mappings[0].tag, tag)
2199 # configure interface address again and check static mappings
2200 self.pg7.config_ip4()
2201 static_mappings = self.vapi.nat44_static_mapping_dump()
2202 self.assertEqual(2, len(static_mappings))
2204 for sm in static_mappings:
2205 if sm.external_sw_if_index == 0xFFFFFFFF:
2206 self.assertEqual(str(sm.external_ip_address),
2208 self.assertEqual(sm.tag, tag)
2210 self.assertTrue(resolved)
2212 # remove static mapping
2213 self.nat44_add_static_mapping(
2215 external_sw_if_index=self.pg7.sw_if_index,
2218 static_mappings = self.vapi.nat44_static_mapping_dump()
2219 self.assertEqual(0, len(static_mappings))
2221 def test_interface_addr_identity_nat(self):
2222 """ NAT44EI Identity NAT with addresses from interface """
2225 self.vapi.nat44_add_del_interface_addr(
2227 sw_if_index=self.pg7.sw_if_index)
2228 self.vapi.nat44_add_del_identity_mapping(
2230 sw_if_index=self.pg7.sw_if_index,
2232 protocol=IP_PROTOS.tcp,
2235 # identity mappings with external interface
2236 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2237 self.assertEqual(1, len(identity_mappings))
2238 self.assertEqual(self.pg7.sw_if_index,
2239 identity_mappings[0].sw_if_index)
2241 # configure interface address and check identity mappings
2242 self.pg7.config_ip4()
2243 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2245 self.assertEqual(2, len(identity_mappings))
2246 for sm in identity_mappings:
2247 if sm.sw_if_index == 0xFFFFFFFF:
2248 self.assertEqual(str(identity_mappings[0].ip_address),
2250 self.assertEqual(port, identity_mappings[0].port)
2251 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2253 self.assertTrue(resolved)
2255 # remove interface address and check identity mappings
2256 self.pg7.unconfig_ip4()
2257 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2258 self.assertEqual(1, len(identity_mappings))
2259 self.assertEqual(self.pg7.sw_if_index,
2260 identity_mappings[0].sw_if_index)
2262 def test_ipfix_nat44_sess(self):
2263 """ NAT44EI IPFIX logging NAT44EI session created/deleted """
2264 self.ipfix_domain_id = 10
2265 self.ipfix_src_port = 20202
2266 collector_port = 30303
2267 bind_layers(UDP, IPFIX, dport=30303)
2268 self.nat44_add_address(self.nat_addr)
2269 flags = self.config_flags.NAT_IS_INSIDE
2270 self.vapi.nat44_interface_add_del_feature(
2271 sw_if_index=self.pg0.sw_if_index,
2272 flags=flags, is_add=1)
2273 self.vapi.nat44_interface_add_del_feature(
2274 sw_if_index=self.pg1.sw_if_index,
2276 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
2277 src_address=self.pg3.local_ip4,
2279 template_interval=10,
2280 collector_port=collector_port)
2281 self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
2282 src_port=self.ipfix_src_port,
2285 pkts = self.create_stream_in(self.pg0, self.pg1)
2286 self.pg0.add_stream(pkts)
2287 self.pg_enable_capture(self.pg_interfaces)
2289 capture = self.pg1.get_capture(len(pkts))
2290 self.verify_capture_out(capture)
2291 self.nat44_add_address(self.nat_addr, is_add=0)
2292 self.vapi.ipfix_flush()
2293 capture = self.pg3.get_capture(7)
2294 ipfix = IPFIXDecoder()
2295 # first load template
2297 self.assertTrue(p.haslayer(IPFIX))
2298 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2299 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2300 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2301 self.assertEqual(p[UDP].dport, collector_port)
2302 self.assertEqual(p[IPFIX].observationDomainID,
2303 self.ipfix_domain_id)
2304 if p.haslayer(Template):
2305 ipfix.add_template(p.getlayer(Template))
2306 # verify events in data set
2308 if p.haslayer(Data):
2309 data = ipfix.decode_data_set(p.getlayer(Set))
2310 self.verify_ipfix_nat44_ses(data)
2312 def test_ipfix_addr_exhausted(self):
2313 """ NAT44EI IPFIX logging NAT addresses exhausted """
2314 flags = self.config_flags.NAT_IS_INSIDE
2315 self.vapi.nat44_interface_add_del_feature(
2316 sw_if_index=self.pg0.sw_if_index,
2317 flags=flags, is_add=1)
2318 self.vapi.nat44_interface_add_del_feature(
2319 sw_if_index=self.pg1.sw_if_index,
2321 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
2322 src_address=self.pg3.local_ip4,
2324 template_interval=10)
2325 self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
2326 src_port=self.ipfix_src_port,
2329 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2330 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2332 self.pg0.add_stream(p)
2333 self.pg_enable_capture(self.pg_interfaces)
2335 self.pg1.assert_nothing_captured()
2337 self.vapi.ipfix_flush()
2338 capture = self.pg3.get_capture(7)
2339 ipfix = IPFIXDecoder()
2340 # first load template
2342 self.assertTrue(p.haslayer(IPFIX))
2343 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2344 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2345 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2346 self.assertEqual(p[UDP].dport, 4739)
2347 self.assertEqual(p[IPFIX].observationDomainID,
2348 self.ipfix_domain_id)
2349 if p.haslayer(Template):
2350 ipfix.add_template(p.getlayer(Template))
2351 # verify events in data set
2353 if p.haslayer(Data):
2354 data = ipfix.decode_data_set(p.getlayer(Set))
2355 self.verify_ipfix_addr_exhausted(data)
2357 def test_ipfix_max_sessions(self):
2358 """ NAT44EI IPFIX logging maximum session entries exceeded """
2359 self.nat44_add_address(self.nat_addr)
2360 flags = self.config_flags.NAT_IS_INSIDE
2361 self.vapi.nat44_interface_add_del_feature(
2362 sw_if_index=self.pg0.sw_if_index,
2363 flags=flags, is_add=1)
2364 self.vapi.nat44_interface_add_del_feature(
2365 sw_if_index=self.pg1.sw_if_index,
2368 max_sessions = self.max_translations
2371 for i in range(0, max_sessions):
2372 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2373 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2374 IP(src=src, dst=self.pg1.remote_ip4) /
2377 self.pg0.add_stream(pkts)
2378 self.pg_enable_capture(self.pg_interfaces)
2381 self.pg1.get_capture(max_sessions)
2382 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
2383 src_address=self.pg3.local_ip4,
2385 template_interval=10)
2386 self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
2387 src_port=self.ipfix_src_port,
2390 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2391 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2393 self.pg0.add_stream(p)
2394 self.pg_enable_capture(self.pg_interfaces)
2396 self.pg1.assert_nothing_captured()
2398 self.vapi.ipfix_flush()
2399 capture = self.pg3.get_capture(7)
2400 ipfix = IPFIXDecoder()
2401 # first load template
2403 self.assertTrue(p.haslayer(IPFIX))
2404 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2405 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2406 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2407 self.assertEqual(p[UDP].dport, 4739)
2408 self.assertEqual(p[IPFIX].observationDomainID,
2409 self.ipfix_domain_id)
2410 if p.haslayer(Template):
2411 ipfix.add_template(p.getlayer(Template))
2412 # verify events in data set
2414 if p.haslayer(Data):
2415 data = ipfix.decode_data_set(p.getlayer(Set))
2416 self.verify_ipfix_max_sessions(data, max_sessions)
2418 def test_syslog_apmap(self):
2419 """ NAT44EI syslog address and port mapping creation and deletion """
2420 self.vapi.syslog_set_filter(
2421 self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
2422 self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
2423 self.nat44_add_address(self.nat_addr)
2424 flags = self.config_flags.NAT_IS_INSIDE
2425 self.vapi.nat44_interface_add_del_feature(
2426 sw_if_index=self.pg0.sw_if_index,
2427 flags=flags, is_add=1)
2428 self.vapi.nat44_interface_add_del_feature(
2429 sw_if_index=self.pg1.sw_if_index,
2432 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2433 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2434 TCP(sport=self.tcp_port_in, dport=20))
2435 self.pg0.add_stream(p)
2436 self.pg_enable_capture(self.pg_interfaces)
2438 capture = self.pg1.get_capture(1)
2439 self.tcp_port_out = capture[0][TCP].sport
2440 capture = self.pg3.get_capture(1)
2441 self.verify_syslog_apmap(capture[0][Raw].load)
2443 self.pg_enable_capture(self.pg_interfaces)
2445 self.nat44_add_address(self.nat_addr, is_add=0)
2446 capture = self.pg3.get_capture(1)
2447 self.verify_syslog_apmap(capture[0][Raw].load, False)
2449 def test_pool_addr_fib(self):
2450 """ NAT44EI add pool addresses to FIB """
2451 static_addr = '10.0.0.10'
2452 self.nat44_add_address(self.nat_addr)
2453 flags = self.config_flags.NAT_IS_INSIDE
2454 self.vapi.nat44_interface_add_del_feature(
2455 sw_if_index=self.pg0.sw_if_index,
2456 flags=flags, is_add=1)
2457 self.vapi.nat44_interface_add_del_feature(
2458 sw_if_index=self.pg1.sw_if_index,
2460 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2463 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2464 ARP(op=ARP.who_has, pdst=self.nat_addr,
2465 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2466 self.pg1.add_stream(p)
2467 self.pg_enable_capture(self.pg_interfaces)
2469 capture = self.pg1.get_capture(1)
2470 self.assertTrue(capture[0].haslayer(ARP))
2471 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2474 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2475 ARP(op=ARP.who_has, pdst=static_addr,
2476 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2477 self.pg1.add_stream(p)
2478 self.pg_enable_capture(self.pg_interfaces)
2480 capture = self.pg1.get_capture(1)
2481 self.assertTrue(capture[0].haslayer(ARP))
2482 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2484 # send ARP to non-NAT44EI interface
2485 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2486 ARP(op=ARP.who_has, pdst=self.nat_addr,
2487 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2488 self.pg2.add_stream(p)
2489 self.pg_enable_capture(self.pg_interfaces)
2491 self.pg1.assert_nothing_captured()
2493 # remove addresses and verify
2494 self.nat44_add_address(self.nat_addr, is_add=0)
2495 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2498 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2499 ARP(op=ARP.who_has, pdst=self.nat_addr,
2500 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2501 self.pg1.add_stream(p)
2502 self.pg_enable_capture(self.pg_interfaces)
2504 self.pg1.assert_nothing_captured()
2506 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2507 ARP(op=ARP.who_has, pdst=static_addr,
2508 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2509 self.pg1.add_stream(p)
2510 self.pg_enable_capture(self.pg_interfaces)
2512 self.pg1.assert_nothing_captured()
2514 def test_vrf_mode(self):
2515 """ NAT44EI tenant VRF aware address pool mode """
2519 nat_ip1 = "10.0.0.10"
2520 nat_ip2 = "10.0.0.11"
2522 self.pg0.unconfig_ip4()
2523 self.pg1.unconfig_ip4()
2524 self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id1})
2525 self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id2})
2526 self.pg0.set_table_ip4(vrf_id1)
2527 self.pg1.set_table_ip4(vrf_id2)
2528 self.pg0.config_ip4()
2529 self.pg1.config_ip4()
2530 self.pg0.resolve_arp()
2531 self.pg1.resolve_arp()
2533 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2534 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2535 flags = self.config_flags.NAT_IS_INSIDE
2536 self.vapi.nat44_interface_add_del_feature(
2537 sw_if_index=self.pg0.sw_if_index,
2538 flags=flags, is_add=1)
2539 self.vapi.nat44_interface_add_del_feature(
2540 sw_if_index=self.pg1.sw_if_index,
2541 flags=flags, is_add=1)
2542 self.vapi.nat44_interface_add_del_feature(
2543 sw_if_index=self.pg2.sw_if_index,
2548 pkts = self.create_stream_in(self.pg0, self.pg2)
2549 self.pg0.add_stream(pkts)
2550 self.pg_enable_capture(self.pg_interfaces)
2552 capture = self.pg2.get_capture(len(pkts))
2553 self.verify_capture_out(capture, nat_ip1)
2556 pkts = self.create_stream_in(self.pg1, self.pg2)
2557 self.pg1.add_stream(pkts)
2558 self.pg_enable_capture(self.pg_interfaces)
2560 capture = self.pg2.get_capture(len(pkts))
2561 self.verify_capture_out(capture, nat_ip2)
2564 self.pg0.unconfig_ip4()
2565 self.pg1.unconfig_ip4()
2566 self.pg0.set_table_ip4(0)
2567 self.pg1.set_table_ip4(0)
2568 self.pg0.config_ip4()
2569 self.pg1.config_ip4()
2570 self.pg0.resolve_arp()
2571 self.pg1.resolve_arp()
2572 self.vapi.ip_table_add_del(is_add=0, table={'table_id': vrf_id1})
2573 self.vapi.ip_table_add_del(is_add=0, table={'table_id': vrf_id2})
2575 def test_vrf_feature_independent(self):
2576 """ NAT44EI tenant VRF independent address pool mode """
2578 nat_ip1 = "10.0.0.10"
2579 nat_ip2 = "10.0.0.11"
2581 self.nat44_add_address(nat_ip1)
2582 self.nat44_add_address(nat_ip2, vrf_id=99)
2583 flags = self.config_flags.NAT_IS_INSIDE
2584 self.vapi.nat44_interface_add_del_feature(
2585 sw_if_index=self.pg0.sw_if_index,
2586 flags=flags, is_add=1)
2587 self.vapi.nat44_interface_add_del_feature(
2588 sw_if_index=self.pg1.sw_if_index,
2589 flags=flags, is_add=1)
2590 self.vapi.nat44_interface_add_del_feature(
2591 sw_if_index=self.pg2.sw_if_index,
2595 pkts = self.create_stream_in(self.pg0, self.pg2)
2596 self.pg0.add_stream(pkts)
2597 self.pg_enable_capture(self.pg_interfaces)
2599 capture = self.pg2.get_capture(len(pkts))
2600 self.verify_capture_out(capture, nat_ip1)
2603 pkts = self.create_stream_in(self.pg1, self.pg2)
2604 self.pg1.add_stream(pkts)
2605 self.pg_enable_capture(self.pg_interfaces)
2607 capture = self.pg2.get_capture(len(pkts))
2608 self.verify_capture_out(capture, nat_ip1)
2610 def test_dynamic_ipless_interfaces(self):
2611 """ NAT44EI interfaces without configured IP address """
2612 self.create_routes_and_neigbors()
2613 self.nat44_add_address(self.nat_addr)
2614 flags = self.config_flags.NAT_IS_INSIDE
2615 self.vapi.nat44_interface_add_del_feature(
2616 sw_if_index=self.pg7.sw_if_index,
2617 flags=flags, is_add=1)
2618 self.vapi.nat44_interface_add_del_feature(
2619 sw_if_index=self.pg8.sw_if_index,
2623 pkts = self.create_stream_in(self.pg7, self.pg8)
2624 self.pg7.add_stream(pkts)
2625 self.pg_enable_capture(self.pg_interfaces)
2627 capture = self.pg8.get_capture(len(pkts))
2628 self.verify_capture_out(capture)
2631 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2632 self.pg8.add_stream(pkts)
2633 self.pg_enable_capture(self.pg_interfaces)
2635 capture = self.pg7.get_capture(len(pkts))
2636 self.verify_capture_in(capture, self.pg7)
2638 def test_static_ipless_interfaces(self):
2639 """ NAT44EI interfaces without configured IP address - 1:1 NAT """
2641 self.create_routes_and_neigbors()
2642 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2643 flags = self.config_flags.NAT_IS_INSIDE
2644 self.vapi.nat44_interface_add_del_feature(
2645 sw_if_index=self.pg7.sw_if_index,
2646 flags=flags, is_add=1)
2647 self.vapi.nat44_interface_add_del_feature(
2648 sw_if_index=self.pg8.sw_if_index,
2652 pkts = self.create_stream_out(self.pg8)
2653 self.pg8.add_stream(pkts)
2654 self.pg_enable_capture(self.pg_interfaces)
2656 capture = self.pg7.get_capture(len(pkts))
2657 self.verify_capture_in(capture, self.pg7)
2660 pkts = self.create_stream_in(self.pg7, self.pg8)
2661 self.pg7.add_stream(pkts)
2662 self.pg_enable_capture(self.pg_interfaces)
2664 capture = self.pg8.get_capture(len(pkts))
2665 self.verify_capture_out(capture, self.nat_addr, True)
2667 def test_static_with_port_ipless_interfaces(self):
2668 """ NAT44EI interfaces without configured IP address - 1:1 NAPT """
2670 self.tcp_port_out = 30606
2671 self.udp_port_out = 30607
2672 self.icmp_id_out = 30608
2674 self.create_routes_and_neigbors()
2675 self.nat44_add_address(self.nat_addr)
2676 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2677 self.tcp_port_in, self.tcp_port_out,
2678 proto=IP_PROTOS.tcp)
2679 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2680 self.udp_port_in, self.udp_port_out,
2681 proto=IP_PROTOS.udp)
2682 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2683 self.icmp_id_in, self.icmp_id_out,
2684 proto=IP_PROTOS.icmp)
2685 flags = self.config_flags.NAT_IS_INSIDE
2686 self.vapi.nat44_interface_add_del_feature(
2687 sw_if_index=self.pg7.sw_if_index,
2688 flags=flags, is_add=1)
2689 self.vapi.nat44_interface_add_del_feature(
2690 sw_if_index=self.pg8.sw_if_index,
2694 pkts = self.create_stream_out(self.pg8)
2695 self.pg8.add_stream(pkts)
2696 self.pg_enable_capture(self.pg_interfaces)
2698 capture = self.pg7.get_capture(len(pkts))
2699 self.verify_capture_in(capture, self.pg7)
2702 pkts = self.create_stream_in(self.pg7, self.pg8)
2703 self.pg7.add_stream(pkts)
2704 self.pg_enable_capture(self.pg_interfaces)
2706 capture = self.pg8.get_capture(len(pkts))
2707 self.verify_capture_out(capture)
2709 def test_static_unknown_proto(self):
2710 """ NAT44EI 1:1 translate packet with unknown protocol """
2711 nat_ip = "10.0.0.10"
2712 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2713 flags = self.config_flags.NAT_IS_INSIDE
2714 self.vapi.nat44_interface_add_del_feature(
2715 sw_if_index=self.pg0.sw_if_index,
2716 flags=flags, is_add=1)
2717 self.vapi.nat44_interface_add_del_feature(
2718 sw_if_index=self.pg1.sw_if_index,
2722 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2723 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2725 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2726 TCP(sport=1234, dport=1234))
2727 self.pg0.add_stream(p)
2728 self.pg_enable_capture(self.pg_interfaces)
2730 p = self.pg1.get_capture(1)
2733 self.assertEqual(packet[IP].src, nat_ip)
2734 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2735 self.assertEqual(packet.haslayer(GRE), 1)
2736 self.assert_packet_checksums_valid(packet)
2738 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2742 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2743 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2745 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2746 TCP(sport=1234, dport=1234))
2747 self.pg1.add_stream(p)
2748 self.pg_enable_capture(self.pg_interfaces)
2750 p = self.pg0.get_capture(1)
2753 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2754 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2755 self.assertEqual(packet.haslayer(GRE), 1)
2756 self.assert_packet_checksums_valid(packet)
2758 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2761 def test_hairpinning_static_unknown_proto(self):
2762 """ NAT44EI 1:1 translate packet with unknown protocol - hairpinning
2765 host = self.pg0.remote_hosts[0]
2766 server = self.pg0.remote_hosts[1]
2768 host_nat_ip = "10.0.0.10"
2769 server_nat_ip = "10.0.0.11"
2771 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2772 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2773 flags = self.config_flags.NAT_IS_INSIDE
2774 self.vapi.nat44_interface_add_del_feature(
2775 sw_if_index=self.pg0.sw_if_index,
2776 flags=flags, is_add=1)
2777 self.vapi.nat44_interface_add_del_feature(
2778 sw_if_index=self.pg1.sw_if_index,
2782 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2783 IP(src=host.ip4, dst=server_nat_ip) /
2785 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2786 TCP(sport=1234, dport=1234))
2787 self.pg0.add_stream(p)
2788 self.pg_enable_capture(self.pg_interfaces)
2790 p = self.pg0.get_capture(1)
2793 self.assertEqual(packet[IP].src, host_nat_ip)
2794 self.assertEqual(packet[IP].dst, server.ip4)
2795 self.assertEqual(packet.haslayer(GRE), 1)
2796 self.assert_packet_checksums_valid(packet)
2798 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2802 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2803 IP(src=server.ip4, dst=host_nat_ip) /
2805 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2806 TCP(sport=1234, dport=1234))
2807 self.pg0.add_stream(p)
2808 self.pg_enable_capture(self.pg_interfaces)
2810 p = self.pg0.get_capture(1)
2813 self.assertEqual(packet[IP].src, server_nat_ip)
2814 self.assertEqual(packet[IP].dst, host.ip4)
2815 self.assertEqual(packet.haslayer(GRE), 1)
2816 self.assert_packet_checksums_valid(packet)
2818 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2821 def test_output_feature(self):
2822 """ NAT44EI output feature (in2out postrouting) """
2823 self.nat44_add_address(self.nat_addr)
2824 flags = self.config_flags.NAT_IS_INSIDE
2825 self.vapi.nat44_interface_add_del_output_feature(
2826 is_add=1, flags=flags,
2827 sw_if_index=self.pg0.sw_if_index)
2828 self.vapi.nat44_interface_add_del_output_feature(
2829 is_add=1, flags=flags,
2830 sw_if_index=self.pg1.sw_if_index)
2831 self.vapi.nat44_interface_add_del_output_feature(
2833 sw_if_index=self.pg3.sw_if_index)
2836 pkts = self.create_stream_in(self.pg0, self.pg3)
2837 self.pg0.add_stream(pkts)
2838 self.pg_enable_capture(self.pg_interfaces)
2840 capture = self.pg3.get_capture(len(pkts))
2841 self.verify_capture_out(capture)
2844 pkts = self.create_stream_out(self.pg3)
2845 self.pg3.add_stream(pkts)
2846 self.pg_enable_capture(self.pg_interfaces)
2848 capture = self.pg0.get_capture(len(pkts))
2849 self.verify_capture_in(capture, self.pg0)
2851 # from non-NAT interface to NAT inside interface
2852 pkts = self.create_stream_in(self.pg2, self.pg0)
2853 self.pg2.add_stream(pkts)
2854 self.pg_enable_capture(self.pg_interfaces)
2856 capture = self.pg0.get_capture(len(pkts))
2857 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2859 def test_output_feature_vrf_aware(self):
2860 """ NAT44EI output feature VRF aware (in2out postrouting) """
2861 nat_ip_vrf10 = "10.0.0.10"
2862 nat_ip_vrf20 = "10.0.0.20"
2864 r1 = VppIpRoute(self, self.pg3.remote_ip4, 32,
2865 [VppRoutePath(self.pg3.remote_ip4,
2866 self.pg3.sw_if_index)],
2868 r2 = VppIpRoute(self, self.pg3.remote_ip4, 32,
2869 [VppRoutePath(self.pg3.remote_ip4,
2870 self.pg3.sw_if_index)],
2875 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2876 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2877 flags = self.config_flags.NAT_IS_INSIDE
2878 self.vapi.nat44_interface_add_del_output_feature(
2879 is_add=1, flags=flags,
2880 sw_if_index=self.pg4.sw_if_index)
2881 self.vapi.nat44_interface_add_del_output_feature(
2882 is_add=1, flags=flags,
2883 sw_if_index=self.pg6.sw_if_index)
2884 self.vapi.nat44_interface_add_del_output_feature(
2886 sw_if_index=self.pg3.sw_if_index)
2889 pkts = self.create_stream_in(self.pg4, self.pg3)
2890 self.pg4.add_stream(pkts)
2891 self.pg_enable_capture(self.pg_interfaces)
2893 capture = self.pg3.get_capture(len(pkts))
2894 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2897 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2898 self.pg3.add_stream(pkts)
2899 self.pg_enable_capture(self.pg_interfaces)
2901 capture = self.pg4.get_capture(len(pkts))
2902 self.verify_capture_in(capture, self.pg4)
2905 pkts = self.create_stream_in(self.pg6, self.pg3)
2906 self.pg6.add_stream(pkts)
2907 self.pg_enable_capture(self.pg_interfaces)
2909 capture = self.pg3.get_capture(len(pkts))
2910 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2913 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2914 self.pg3.add_stream(pkts)
2915 self.pg_enable_capture(self.pg_interfaces)
2917 capture = self.pg6.get_capture(len(pkts))
2918 self.verify_capture_in(capture, self.pg6)
2920 def test_output_feature_hairpinning(self):
2921 """ NAT44EI output feature hairpinning (in2out postrouting) """
2922 host = self.pg0.remote_hosts[0]
2923 server = self.pg0.remote_hosts[1]
2926 server_in_port = 5678
2927 server_out_port = 8765
2929 self.nat44_add_address(self.nat_addr)
2930 flags = self.config_flags.NAT_IS_INSIDE
2931 self.vapi.nat44_interface_add_del_output_feature(
2932 is_add=1, flags=flags,
2933 sw_if_index=self.pg0.sw_if_index)
2934 self.vapi.nat44_interface_add_del_output_feature(
2936 sw_if_index=self.pg1.sw_if_index)
2938 # add static mapping for server
2939 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2940 server_in_port, server_out_port,
2941 proto=IP_PROTOS.tcp)
2943 # send packet from host to server
2944 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2945 IP(src=host.ip4, dst=self.nat_addr) /
2946 TCP(sport=host_in_port, dport=server_out_port))
2947 self.pg0.add_stream(p)
2948 self.pg_enable_capture(self.pg_interfaces)
2950 capture = self.pg0.get_capture(1)
2955 self.assertEqual(ip.src, self.nat_addr)
2956 self.assertEqual(ip.dst, server.ip4)
2957 self.assertNotEqual(tcp.sport, host_in_port)
2958 self.assertEqual(tcp.dport, server_in_port)
2959 self.assert_packet_checksums_valid(p)
2960 host_out_port = tcp.sport
2962 self.logger.error(ppp("Unexpected or invalid packet:", p))
2965 # send reply from server to host
2966 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2967 IP(src=server.ip4, dst=self.nat_addr) /
2968 TCP(sport=server_in_port, dport=host_out_port))
2969 self.pg0.add_stream(p)
2970 self.pg_enable_capture(self.pg_interfaces)
2972 capture = self.pg0.get_capture(1)
2977 self.assertEqual(ip.src, self.nat_addr)
2978 self.assertEqual(ip.dst, host.ip4)
2979 self.assertEqual(tcp.sport, server_out_port)
2980 self.assertEqual(tcp.dport, host_in_port)
2981 self.assert_packet_checksums_valid(p)
2983 self.logger.error(ppp("Unexpected or invalid packet:", p))
2986 def test_one_armed_nat44(self):
2987 """ NAT44EI One armed NAT """
2988 remote_host = self.pg9.remote_hosts[0]
2989 local_host = self.pg9.remote_hosts[1]
2992 self.nat44_add_address(self.nat_addr)
2993 flags = self.config_flags.NAT_IS_INSIDE
2994 self.vapi.nat44_interface_add_del_feature(
2995 sw_if_index=self.pg9.sw_if_index,
2997 self.vapi.nat44_interface_add_del_feature(
2998 sw_if_index=self.pg9.sw_if_index,
2999 flags=flags, is_add=1)
3002 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3003 IP(src=local_host.ip4, dst=remote_host.ip4) /
3004 TCP(sport=12345, dport=80))
3005 self.pg9.add_stream(p)
3006 self.pg_enable_capture(self.pg_interfaces)
3008 capture = self.pg9.get_capture(1)
3013 self.assertEqual(ip.src, self.nat_addr)
3014 self.assertEqual(ip.dst, remote_host.ip4)
3015 self.assertNotEqual(tcp.sport, 12345)
3016 external_port = tcp.sport
3017 self.assertEqual(tcp.dport, 80)
3018 self.assert_packet_checksums_valid(p)
3020 self.logger.error(ppp("Unexpected or invalid packet:", p))
3024 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3025 IP(src=remote_host.ip4, dst=self.nat_addr) /
3026 TCP(sport=80, dport=external_port))
3027 self.pg9.add_stream(p)
3028 self.pg_enable_capture(self.pg_interfaces)
3030 capture = self.pg9.get_capture(1)
3035 self.assertEqual(ip.src, remote_host.ip4)
3036 self.assertEqual(ip.dst, local_host.ip4)
3037 self.assertEqual(tcp.sport, 80)
3038 self.assertEqual(tcp.dport, 12345)
3039 self.assert_packet_checksums_valid(p)
3041 self.logger.error(ppp("Unexpected or invalid packet:", p))
3044 err = self.statistics.get_err_counter(
3045 '/err/nat44-classify/next in2out')
3046 self.assertEqual(err, 1)
3047 err = self.statistics.get_err_counter(
3048 '/err/nat44-classify/next out2in')
3049 self.assertEqual(err, 1)
3051 def test_del_session(self):
3052 """ NAT44EI delete session """
3053 self.nat44_add_address(self.nat_addr)
3054 flags = self.config_flags.NAT_IS_INSIDE
3055 self.vapi.nat44_interface_add_del_feature(
3056 sw_if_index=self.pg0.sw_if_index,
3057 flags=flags, is_add=1)
3058 self.vapi.nat44_interface_add_del_feature(
3059 sw_if_index=self.pg1.sw_if_index,
3062 pkts = self.create_stream_in(self.pg0, self.pg1)
3063 self.pg0.add_stream(pkts)
3064 self.pg_enable_capture(self.pg_interfaces)
3066 self.pg1.get_capture(len(pkts))
3068 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3069 nsessions = len(sessions)
3071 self.vapi.nat44_del_session(address=sessions[0].inside_ip_address,
3072 port=sessions[0].inside_port,
3073 protocol=sessions[0].protocol,
3074 flags=self.config_flags.NAT_IS_INSIDE)
3075 self.vapi.nat44_del_session(address=sessions[1].outside_ip_address,
3076 port=sessions[1].outside_port,
3077 protocol=sessions[1].protocol)
3079 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3080 self.assertEqual(nsessions - len(sessions), 2)
3082 self.vapi.nat44_del_session(address=sessions[0].inside_ip_address,
3083 port=sessions[0].inside_port,
3084 protocol=sessions[0].protocol,
3085 flags=self.config_flags.NAT_IS_INSIDE)
3087 self.verify_no_nat44_user()
3089 def test_frag_in_order(self):
3090 """ NAT44EI translate fragments arriving in order """
3092 self.nat44_add_address(self.nat_addr)
3093 flags = self.config_flags.NAT_IS_INSIDE
3094 self.vapi.nat44_interface_add_del_feature(
3095 sw_if_index=self.pg0.sw_if_index,
3096 flags=flags, is_add=1)
3097 self.vapi.nat44_interface_add_del_feature(
3098 sw_if_index=self.pg1.sw_if_index,
3101 self.frag_in_order(proto=IP_PROTOS.tcp)
3102 self.frag_in_order(proto=IP_PROTOS.udp)
3103 self.frag_in_order(proto=IP_PROTOS.icmp)
3105 def test_frag_forwarding(self):
3106 """ NAT44EI forwarding fragment test """
3107 self.vapi.nat44_add_del_interface_addr(
3109 sw_if_index=self.pg1.sw_if_index)
3110 flags = self.config_flags.NAT_IS_INSIDE
3111 self.vapi.nat44_interface_add_del_feature(
3112 sw_if_index=self.pg0.sw_if_index,
3113 flags=flags, is_add=1)
3114 self.vapi.nat44_interface_add_del_feature(
3115 sw_if_index=self.pg1.sw_if_index,
3117 self.vapi.nat44_forwarding_enable_disable(enable=1)
3119 data = b"A" * 16 + b"B" * 16 + b"C" * 3
3120 pkts = self.create_stream_frag(self.pg1,
3121 self.pg0.remote_ip4,
3125 proto=IP_PROTOS.udp)
3126 self.pg1.add_stream(pkts)
3127 self.pg_enable_capture(self.pg_interfaces)
3129 frags = self.pg0.get_capture(len(pkts))
3130 p = self.reass_frags_and_verify(frags,
3131 self.pg1.remote_ip4,
3132 self.pg0.remote_ip4)
3133 self.assertEqual(p[UDP].sport, 4789)
3134 self.assertEqual(p[UDP].dport, 4789)
3135 self.assertEqual(data, p[Raw].load)
3137 def test_reass_hairpinning(self):
3138 """ NAT44EI fragments hairpinning """
3140 server_addr = self.pg0.remote_hosts[1].ip4
3141 host_in_port = random.randint(1025, 65535)
3142 server_in_port = random.randint(1025, 65535)
3143 server_out_port = random.randint(1025, 65535)
3145 self.nat44_add_address(self.nat_addr)
3146 flags = self.config_flags.NAT_IS_INSIDE
3147 self.vapi.nat44_interface_add_del_feature(
3148 sw_if_index=self.pg0.sw_if_index,
3149 flags=flags, is_add=1)
3150 self.vapi.nat44_interface_add_del_feature(
3151 sw_if_index=self.pg1.sw_if_index,
3153 # add static mapping for server
3154 self.nat44_add_static_mapping(server_addr, self.nat_addr,
3157 proto=IP_PROTOS.tcp)
3158 self.nat44_add_static_mapping(server_addr, self.nat_addr,
3161 proto=IP_PROTOS.udp)
3162 self.nat44_add_static_mapping(server_addr, self.nat_addr)
3164 self.reass_hairpinning(server_addr, server_in_port, server_out_port,
3165 host_in_port, proto=IP_PROTOS.tcp)
3166 self.reass_hairpinning(server_addr, server_in_port, server_out_port,
3167 host_in_port, proto=IP_PROTOS.udp)
3168 self.reass_hairpinning(server_addr, server_in_port, server_out_port,
3169 host_in_port, proto=IP_PROTOS.icmp)
3171 def test_frag_out_of_order(self):
3172 """ NAT44EI translate fragments arriving out of order """
3174 self.nat44_add_address(self.nat_addr)
3175 flags = self.config_flags.NAT_IS_INSIDE
3176 self.vapi.nat44_interface_add_del_feature(
3177 sw_if_index=self.pg0.sw_if_index,
3178 flags=flags, is_add=1)
3179 self.vapi.nat44_interface_add_del_feature(
3180 sw_if_index=self.pg1.sw_if_index,
3183 self.frag_out_of_order(proto=IP_PROTOS.tcp)
3184 self.frag_out_of_order(proto=IP_PROTOS.udp)
3185 self.frag_out_of_order(proto=IP_PROTOS.icmp)
3187 def test_port_restricted(self):
3188 """ NAT44EI Port restricted NAT44EI (MAP-E CE) """
3189 self.nat44_add_address(self.nat_addr)
3190 flags = self.config_flags.NAT_IS_INSIDE
3191 self.vapi.nat44_interface_add_del_feature(
3192 sw_if_index=self.pg0.sw_if_index,
3193 flags=flags, is_add=1)
3194 self.vapi.nat44_interface_add_del_feature(
3195 sw_if_index=self.pg1.sw_if_index,
3197 self.vapi.nat_set_addr_and_port_alloc_alg(alg=1,
3202 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3203 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3204 TCP(sport=4567, dport=22))
3205 self.pg0.add_stream(p)
3206 self.pg_enable_capture(self.pg_interfaces)
3208 capture = self.pg1.get_capture(1)
3213 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3214 self.assertEqual(ip.src, self.nat_addr)
3215 self.assertEqual(tcp.dport, 22)
3216 self.assertNotEqual(tcp.sport, 4567)
3217 self.assertEqual((tcp.sport >> 6) & 63, 10)
3218 self.assert_packet_checksums_valid(p)
3220 self.logger.error(ppp("Unexpected or invalid packet:", p))
3223 def test_port_range(self):
3224 """ NAT44EI External address port range """
3225 self.nat44_add_address(self.nat_addr)
3226 flags = self.config_flags.NAT_IS_INSIDE
3227 self.vapi.nat44_interface_add_del_feature(
3228 sw_if_index=self.pg0.sw_if_index,
3229 flags=flags, is_add=1)
3230 self.vapi.nat44_interface_add_del_feature(
3231 sw_if_index=self.pg1.sw_if_index,
3233 self.vapi.nat_set_addr_and_port_alloc_alg(alg=2,
3238 for port in range(0, 5):
3239 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3240 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3241 TCP(sport=1125 + port))
3243 self.pg0.add_stream(pkts)
3244 self.pg_enable_capture(self.pg_interfaces)
3246 capture = self.pg1.get_capture(3)
3249 self.assertGreaterEqual(tcp.sport, 1025)
3250 self.assertLessEqual(tcp.sport, 1027)
3252 def test_multiple_outside_vrf(self):
3253 """ NAT44EI Multiple outside VRF """
3257 self.pg1.unconfig_ip4()
3258 self.pg2.unconfig_ip4()
3259 self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id1})
3260 self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id2})
3261 self.pg1.set_table_ip4(vrf_id1)
3262 self.pg2.set_table_ip4(vrf_id2)
3263 self.pg1.config_ip4()
3264 self.pg2.config_ip4()
3265 self.pg1.resolve_arp()
3266 self.pg2.resolve_arp()
3268 self.nat44_add_address(self.nat_addr)
3269 flags = self.config_flags.NAT_IS_INSIDE
3270 self.vapi.nat44_interface_add_del_feature(
3271 sw_if_index=self.pg0.sw_if_index,
3272 flags=flags, is_add=1)
3273 self.vapi.nat44_interface_add_del_feature(
3274 sw_if_index=self.pg1.sw_if_index,
3276 self.vapi.nat44_interface_add_del_feature(
3277 sw_if_index=self.pg2.sw_if_index,
3282 pkts = self.create_stream_in(self.pg0, self.pg1)
3283 self.pg0.add_stream(pkts)
3284 self.pg_enable_capture(self.pg_interfaces)
3286 capture = self.pg1.get_capture(len(pkts))
3287 self.verify_capture_out(capture, self.nat_addr)
3289 pkts = self.create_stream_out(self.pg1, self.nat_addr)
3290 self.pg1.add_stream(pkts)
3291 self.pg_enable_capture(self.pg_interfaces)
3293 capture = self.pg0.get_capture(len(pkts))
3294 self.verify_capture_in(capture, self.pg0)
3296 self.tcp_port_in = 60303
3297 self.udp_port_in = 60304
3298 self.icmp_id_in = 60305
3301 pkts = self.create_stream_in(self.pg0, self.pg2)
3302 self.pg0.add_stream(pkts)
3303 self.pg_enable_capture(self.pg_interfaces)
3305 capture = self.pg2.get_capture(len(pkts))
3306 self.verify_capture_out(capture, self.nat_addr)
3308 pkts = self.create_stream_out(self.pg2, self.nat_addr)
3309 self.pg2.add_stream(pkts)
3310 self.pg_enable_capture(self.pg_interfaces)
3312 capture = self.pg0.get_capture(len(pkts))
3313 self.verify_capture_in(capture, self.pg0)
3316 self.nat44_add_address(self.nat_addr, is_add=0)
3317 self.pg1.unconfig_ip4()
3318 self.pg2.unconfig_ip4()
3319 self.pg1.set_table_ip4(0)
3320 self.pg2.set_table_ip4(0)
3321 self.pg1.config_ip4()
3322 self.pg2.config_ip4()
3323 self.pg1.resolve_arp()
3324 self.pg2.resolve_arp()
3326 def test_mss_clamping(self):
3327 """ NAT44EI TCP MSS clamping """
3328 self.nat44_add_address(self.nat_addr)
3329 flags = self.config_flags.NAT_IS_INSIDE
3330 self.vapi.nat44_interface_add_del_feature(
3331 sw_if_index=self.pg0.sw_if_index,
3332 flags=flags, is_add=1)
3333 self.vapi.nat44_interface_add_del_feature(
3334 sw_if_index=self.pg1.sw_if_index,
3337 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3338 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3339 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3340 flags="S", options=[('MSS', 1400)]))
3342 self.vapi.nat_set_mss_clamping(enable=1, mss_value=1000)
3343 self.pg0.add_stream(p)
3344 self.pg_enable_capture(self.pg_interfaces)
3346 capture = self.pg1.get_capture(1)
3347 # Negotiated MSS value greater than configured - changed
3348 self.verify_mss_value(capture[0], 1000)
3350 self.vapi.nat_set_mss_clamping(enable=0, mss_value=1500)
3351 self.pg0.add_stream(p)
3352 self.pg_enable_capture(self.pg_interfaces)
3354 capture = self.pg1.get_capture(1)
3355 # MSS clamping disabled - negotiated MSS unchanged
3356 self.verify_mss_value(capture[0], 1400)
3358 self.vapi.nat_set_mss_clamping(enable=1, mss_value=1500)
3359 self.pg0.add_stream(p)
3360 self.pg_enable_capture(self.pg_interfaces)
3362 capture = self.pg1.get_capture(1)
3363 # Negotiated MSS value smaller than configured - unchanged
3364 self.verify_mss_value(capture[0], 1400)
3366 def test_ha_send(self):
3367 """ NAT44EI Send HA session synchronization events (active) """
3368 flags = self.config_flags.NAT_IS_INSIDE
3369 self.vapi.nat44_interface_add_del_feature(
3370 sw_if_index=self.pg0.sw_if_index,
3371 flags=flags, is_add=1)
3372 self.vapi.nat44_interface_add_del_feature(
3373 sw_if_index=self.pg1.sw_if_index,
3375 self.nat44_add_address(self.nat_addr)
3377 self.vapi.nat_ha_set_listener(ip_address=self.pg3.local_ip4,
3380 self.vapi.nat_ha_set_failover(ip_address=self.pg3.remote_ip4,
3381 port=12346, session_refresh_interval=10)
3382 bind_layers(UDP, HANATStateSync, sport=12345)
3385 pkts = self.create_stream_in(self.pg0, self.pg1)
3386 self.pg0.add_stream(pkts)
3387 self.pg_enable_capture(self.pg_interfaces)
3389 capture = self.pg1.get_capture(len(pkts))
3390 self.verify_capture_out(capture)
3391 # active send HA events
3392 self.vapi.nat_ha_flush()
3393 stats = self.statistics.get_counter('/nat44/ha/add-event-send')
3394 self.assertEqual(stats[0][0], 3)
3395 capture = self.pg3.get_capture(1)
3397 self.assert_packet_checksums_valid(p)
3401 hanat = p[HANATStateSync]
3403 self.logger.error(ppp("Invalid packet:", p))
3406 self.assertEqual(ip.src, self.pg3.local_ip4)
3407 self.assertEqual(ip.dst, self.pg3.remote_ip4)
3408 self.assertEqual(udp.sport, 12345)
3409 self.assertEqual(udp.dport, 12346)
3410 self.assertEqual(hanat.version, 1)
3411 self.assertEqual(hanat.thread_index, 0)
3412 self.assertEqual(hanat.count, 3)
3413 seq = hanat.sequence_number
3414 for event in hanat.events:
3415 self.assertEqual(event.event_type, 1)
3416 self.assertEqual(event.in_addr, self.pg0.remote_ip4)
3417 self.assertEqual(event.out_addr, self.nat_addr)
3418 self.assertEqual(event.fib_index, 0)
3420 # ACK received events
3421 ack = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
3422 IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
3423 UDP(sport=12346, dport=12345) /
3424 HANATStateSync(sequence_number=seq, flags='ACK'))
3425 self.pg3.add_stream(ack)
3427 stats = self.statistics.get_counter('/nat44/ha/ack-recv')
3428 self.assertEqual(stats[0][0], 1)
3430 # delete one session
3431 self.pg_enable_capture(self.pg_interfaces)
3432 self.vapi.nat44_del_session(address=self.pg0.remote_ip4,
3433 port=self.tcp_port_in,
3434 protocol=IP_PROTOS.tcp,
3435 flags=self.config_flags.NAT_IS_INSIDE)
3436 self.vapi.nat_ha_flush()
3437 stats = self.statistics.get_counter('/nat44/ha/del-event-send')
3438 self.assertEqual(stats[0][0], 1)
3439 capture = self.pg3.get_capture(1)
3442 hanat = p[HANATStateSync]
3444 self.logger.error(ppp("Invalid packet:", p))
3447 self.assertGreater(hanat.sequence_number, seq)
3449 # do not send ACK, active retry send HA event again
3450 self.pg_enable_capture(self.pg_interfaces)
3452 stats = self.statistics.get_counter('/nat44/ha/retry-count')
3453 self.assertEqual(stats[0][0], 3)
3454 stats = self.statistics.get_counter('/nat44/ha/missed-count')
3455 self.assertEqual(stats[0][0], 1)
3456 capture = self.pg3.get_capture(3)
3457 for packet in capture:
3458 self.assertEqual(packet, p)
3460 # session counters refresh
3461 pkts = self.create_stream_out(self.pg1)
3462 self.pg1.add_stream(pkts)
3463 self.pg_enable_capture(self.pg_interfaces)
3465 self.pg0.get_capture(2)
3466 self.vapi.nat_ha_flush()
3467 stats = self.statistics.get_counter('/nat44/ha/refresh-event-send')
3468 self.assertEqual(stats[0][0], 2)
3469 capture = self.pg3.get_capture(1)
3471 self.assert_packet_checksums_valid(p)
3475 hanat = p[HANATStateSync]
3477 self.logger.error(ppp("Invalid packet:", p))
3480 self.assertEqual(ip.src, self.pg3.local_ip4)
3481 self.assertEqual(ip.dst, self.pg3.remote_ip4)
3482 self.assertEqual(udp.sport, 12345)
3483 self.assertEqual(udp.dport, 12346)
3484 self.assertEqual(hanat.version, 1)
3485 self.assertEqual(hanat.count, 2)
3486 seq = hanat.sequence_number
3487 for event in hanat.events:
3488 self.assertEqual(event.event_type, 3)
3489 self.assertEqual(event.out_addr, self.nat_addr)
3490 self.assertEqual(event.fib_index, 0)
3491 self.assertEqual(event.total_pkts, 2)
3492 self.assertGreater(event.total_bytes, 0)
3494 ack = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
3495 IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
3496 UDP(sport=12346, dport=12345) /
3497 HANATStateSync(sequence_number=seq, flags='ACK'))
3498 self.pg3.add_stream(ack)
3500 stats = self.statistics.get_counter('/nat44/ha/ack-recv')
3501 self.assertEqual(stats[0][0], 2)
3503 def test_ha_recv(self):
3504 """ NAT44EI Receive HA session synchronization events (passive) """
3505 self.nat44_add_address(self.nat_addr)
3506 flags = self.config_flags.NAT_IS_INSIDE
3507 self.vapi.nat44_interface_add_del_feature(
3508 sw_if_index=self.pg0.sw_if_index,
3509 flags=flags, is_add=1)
3510 self.vapi.nat44_interface_add_del_feature(
3511 sw_if_index=self.pg1.sw_if_index,
3513 self.vapi.nat_ha_set_listener(ip_address=self.pg3.local_ip4,
3516 bind_layers(UDP, HANATStateSync, sport=12345)
3518 self.tcp_port_out = random.randint(1025, 65535)
3519 self.udp_port_out = random.randint(1025, 65535)
3521 # send HA session add events to failover/passive
3522 p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
3523 IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
3524 UDP(sport=12346, dport=12345) /
3525 HANATStateSync(sequence_number=1, events=[
3526 Event(event_type='add', protocol='tcp',
3527 in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
3528 in_port=self.tcp_port_in, out_port=self.tcp_port_out,
3529 eh_addr=self.pg1.remote_ip4,
3530 ehn_addr=self.pg1.remote_ip4,
3531 eh_port=self.tcp_external_port,
3532 ehn_port=self.tcp_external_port, fib_index=0),
3533 Event(event_type='add', protocol='udp',
3534 in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
3535 in_port=self.udp_port_in, out_port=self.udp_port_out,
3536 eh_addr=self.pg1.remote_ip4,
3537 ehn_addr=self.pg1.remote_ip4,
3538 eh_port=self.udp_external_port,
3539 ehn_port=self.udp_external_port, fib_index=0)]))
3541 self.pg3.add_stream(p)
3542 self.pg_enable_capture(self.pg_interfaces)
3545 capture = self.pg3.get_capture(1)
3548 hanat = p[HANATStateSync]
3550 self.logger.error(ppp("Invalid packet:", p))
3553 self.assertEqual(hanat.sequence_number, 1)
3554 self.assertEqual(hanat.flags, 'ACK')
3555 self.assertEqual(hanat.version, 1)
3556 self.assertEqual(hanat.thread_index, 0)
3557 stats = self.statistics.get_counter('/nat44/ha/ack-send')
3558 self.assertEqual(stats[0][0], 1)
3559 stats = self.statistics.get_counter('/nat44/ha/add-event-recv')
3560 self.assertEqual(stats[0][0], 2)
3561 users = self.statistics.get_counter('/nat44/total-users')
3562 self.assertEqual(users[0][0], 1)
3563 sessions = self.statistics.get_counter('/nat44/total-sessions')
3564 self.assertEqual(sessions[0][0], 2)
3565 users = self.vapi.nat44_user_dump()
3566 self.assertEqual(len(users), 1)
3567 self.assertEqual(str(users[0].ip_address),
3568 self.pg0.remote_ip4)
3569 # there should be 2 sessions created by HA
3570 sessions = self.vapi.nat44_user_session_dump(users[0].ip_address,
3572 self.assertEqual(len(sessions), 2)
3573 for session in sessions:
3574 self.assertEqual(str(session.inside_ip_address),
3575 self.pg0.remote_ip4)
3576 self.assertEqual(str(session.outside_ip_address),
3578 self.assertIn(session.inside_port,
3579 [self.tcp_port_in, self.udp_port_in])
3580 self.assertIn(session.outside_port,
3581 [self.tcp_port_out, self.udp_port_out])
3582 self.assertIn(session.protocol, [IP_PROTOS.tcp, IP_PROTOS.udp])
3584 # send HA session delete event to failover/passive
3585 p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
3586 IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
3587 UDP(sport=12346, dport=12345) /
3588 HANATStateSync(sequence_number=2, events=[
3589 Event(event_type='del', protocol='udp',
3590 in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
3591 in_port=self.udp_port_in, out_port=self.udp_port_out,
3592 eh_addr=self.pg1.remote_ip4,
3593 ehn_addr=self.pg1.remote_ip4,
3594 eh_port=self.udp_external_port,
3595 ehn_port=self.udp_external_port, fib_index=0)]))
3597 self.pg3.add_stream(p)
3598 self.pg_enable_capture(self.pg_interfaces)
3601 capture = self.pg3.get_capture(1)
3604 hanat = p[HANATStateSync]
3606 self.logger.error(ppp("Invalid packet:", p))
3609 self.assertEqual(hanat.sequence_number, 2)
3610 self.assertEqual(hanat.flags, 'ACK')
3611 self.assertEqual(hanat.version, 1)
3612 users = self.vapi.nat44_user_dump()
3613 self.assertEqual(len(users), 1)
3614 self.assertEqual(str(users[0].ip_address),
3615 self.pg0.remote_ip4)
3616 # now we should have only 1 session, 1 deleted by HA
3617 sessions = self.vapi.nat44_user_session_dump(users[0].ip_address,
3619 self.assertEqual(len(sessions), 1)
3620 stats = self.statistics.get_counter('/nat44/ha/del-event-recv')
3621 self.assertEqual(stats[0][0], 1)
3623 stats = self.statistics.get_err_counter('/err/nat-ha/pkts-processed')
3624 self.assertEqual(stats, 2)
3626 # send HA session refresh event to failover/passive
3627 p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
3628 IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
3629 UDP(sport=12346, dport=12345) /
3630 HANATStateSync(sequence_number=3, events=[
3631 Event(event_type='refresh', protocol='tcp',
3632 in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
3633 in_port=self.tcp_port_in, out_port=self.tcp_port_out,
3634 eh_addr=self.pg1.remote_ip4,
3635 ehn_addr=self.pg1.remote_ip4,
3636 eh_port=self.tcp_external_port,
3637 ehn_port=self.tcp_external_port, fib_index=0,
3638 total_bytes=1024, total_pkts=2)]))
3639 self.pg3.add_stream(p)
3640 self.pg_enable_capture(self.pg_interfaces)
3643 capture = self.pg3.get_capture(1)
3646 hanat = p[HANATStateSync]
3648 self.logger.error(ppp("Invalid packet:", p))
3651 self.assertEqual(hanat.sequence_number, 3)
3652 self.assertEqual(hanat.flags, 'ACK')
3653 self.assertEqual(hanat.version, 1)
3654 users = self.vapi.nat44_user_dump()
3655 self.assertEqual(len(users), 1)
3656 self.assertEqual(str(users[0].ip_address),
3657 self.pg0.remote_ip4)
3658 sessions = self.vapi.nat44_user_session_dump(users[0].ip_address,
3660 self.assertEqual(len(sessions), 1)
3661 session = sessions[0]
3662 self.assertEqual(session.total_bytes, 1024)
3663 self.assertEqual(session.total_pkts, 2)
3664 stats = self.statistics.get_counter('/nat44/ha/refresh-event-recv')
3665 self.assertEqual(stats[0][0], 1)
3667 stats = self.statistics.get_err_counter('/err/nat-ha/pkts-processed')
3668 self.assertEqual(stats, 3)
3670 # send packet to test session created by HA
3671 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3672 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3673 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out))
3674 self.pg1.add_stream(p)
3675 self.pg_enable_capture(self.pg_interfaces)
3677 capture = self.pg0.get_capture(1)
3683 self.logger.error(ppp("Invalid packet:", p))
3686 self.assertEqual(ip.src, self.pg1.remote_ip4)
3687 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3688 self.assertEqual(tcp.sport, self.tcp_external_port)
3689 self.assertEqual(tcp.dport, self.tcp_port_in)
3691 def show_commands_at_teardown(self):
3692 self.logger.info(self.vapi.cli("show nat44 addresses"))
3693 self.logger.info(self.vapi.cli("show nat44 interfaces"))
3694 self.logger.info(self.vapi.cli("show nat44 static mappings"))
3695 self.logger.info(self.vapi.cli("show nat44 interface address"))
3696 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3697 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
3698 self.logger.info(self.vapi.cli("show nat timeouts"))
3700 self.vapi.cli("show nat addr-port-assignment-alg"))
3701 self.logger.info(self.vapi.cli("show nat ha"))
3704 class TestNAT44Out2InDPO(MethodHolder):
3705 """ NAT44EI Test Cases using out2in DPO """
3708 def setUpClass(cls):
3709 super(TestNAT44Out2InDPO, cls).setUpClass()
3710 cls.vapi.cli("set log class nat level debug")
3712 cls.tcp_port_in = 6303
3713 cls.tcp_port_out = 6303
3714 cls.udp_port_in = 6304
3715 cls.udp_port_out = 6304
3716 cls.icmp_id_in = 6305
3717 cls.icmp_id_out = 6305
3718 cls.nat_addr = '10.0.0.3'
3719 cls.dst_ip4 = '192.168.70.1'
3721 cls.create_pg_interfaces(range(2))
3724 cls.pg0.config_ip4()
3725 cls.pg0.resolve_arp()
3728 cls.pg1.config_ip6()
3729 cls.pg1.resolve_ndp()
3731 r1 = VppIpRoute(cls, "::", 0,
3732 [VppRoutePath(cls.pg1.remote_ip6,
3733 cls.pg1.sw_if_index)],
3738 super(TestNAT44Out2InDPO, self).setUp()
3739 flags = self.nat44_config_flags.NAT44_API_IS_OUT2IN_DPO
3740 self.vapi.nat44_plugin_enable_disable(enable=1, flags=flags)
3743 super(TestNAT44Out2InDPO, self).tearDown()
3744 if not self.vpp_dead:
3745 self.vapi.nat44_plugin_enable_disable(enable=0)
3746 self.vapi.cli("clear logging")
3748 def configure_xlat(self):
3749 self.dst_ip6_pfx = '1:2:3::'
3750 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3752 self.dst_ip6_pfx_len = 96
3753 self.src_ip6_pfx = '4:5:6::'
3754 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3756 self.src_ip6_pfx_len = 96
3757 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
3758 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
3759 '\x00\x00\x00\x00', 0)
3761 @unittest.skip('Temporary disabled')
3762 def test_464xlat_ce(self):
3763 """ Test 464XLAT CE with NAT44EI """
3765 nat_config = self.vapi.nat_show_config()
3766 self.assertEqual(1, nat_config.out2in_dpo)
3768 self.configure_xlat()
3770 flags = self.config_flags.NAT_IS_INSIDE
3771 self.vapi.nat44_interface_add_del_feature(
3772 sw_if_index=self.pg0.sw_if_index,
3773 flags=flags, is_add=1)
3774 self.vapi.nat44_add_del_address_range(first_ip_address=self.nat_addr_n,
3775 last_ip_address=self.nat_addr_n,
3776 vrf_id=0xFFFFFFFF, is_add=1)
3778 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3779 self.dst_ip6_pfx_len)
3780 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
3781 self.src_ip6_pfx_len)
3784 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3785 self.pg0.add_stream(pkts)
3786 self.pg_enable_capture(self.pg_interfaces)
3788 capture = self.pg1.get_capture(len(pkts))
3789 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
3792 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
3794 self.pg1.add_stream(pkts)
3795 self.pg_enable_capture(self.pg_interfaces)
3797 capture = self.pg0.get_capture(len(pkts))
3798 self.verify_capture_in(capture, self.pg0)
3800 self.vapi.nat44_interface_add_del_feature(
3801 sw_if_index=self.pg0.sw_if_index,
3803 self.vapi.nat44_add_del_address_range(
3804 first_ip_address=self.nat_addr_n,
3805 last_ip_address=self.nat_addr_n,
3808 @unittest.skip('Temporary disabled')
3809 def test_464xlat_ce_no_nat(self):
3810 """ Test 464XLAT CE without NAT44EI """
3812 self.configure_xlat()
3814 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3815 self.dst_ip6_pfx_len)
3816 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
3817 self.src_ip6_pfx_len)
3819 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3820 self.pg0.add_stream(pkts)
3821 self.pg_enable_capture(self.pg_interfaces)
3823 capture = self.pg1.get_capture(len(pkts))
3824 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
3825 nat_ip=out_dst_ip6, same_port=True)
3827 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
3828 self.pg1.add_stream(pkts)
3829 self.pg_enable_capture(self.pg_interfaces)
3831 capture = self.pg0.get_capture(len(pkts))
3832 self.verify_capture_in(capture, self.pg0)
3835 if __name__ == '__main__':
3836 unittest.main(testRunner=VppTestRunner)