9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
12 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
13 ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
29 def clear_nat44(self):
31 Clear NAT44 configuration.
33 if hasattr(self, 'pg7') and hasattr(self, 'pg8'):
34 # I found no elegant way to do this
35 self.vapi.ip_add_del_route(
36 dst_address=self.pg7.remote_ip4n,
37 dst_address_length=32,
38 next_hop_address=self.pg7.remote_ip4n,
39 next_hop_sw_if_index=self.pg7.sw_if_index,
41 self.vapi.ip_add_del_route(
42 dst_address=self.pg8.remote_ip4n,
43 dst_address_length=32,
44 next_hop_address=self.pg8.remote_ip4n,
45 next_hop_sw_if_index=self.pg8.sw_if_index,
48 for intf in [self.pg7, self.pg8]:
49 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
51 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
56 if self.pg7.has_ip4_config:
57 self.pg7.unconfig_ip4()
59 self.vapi.nat44_forwarding_enable_disable(0)
61 interfaces = self.vapi.nat44_interface_addr_dump()
62 for intf in interfaces:
63 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
64 twice_nat=intf.twice_nat,
67 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
68 domain_id=self.ipfix_domain_id)
69 self.ipfix_src_port = 4739
70 self.ipfix_domain_id = 1
72 interfaces = self.vapi.nat44_interface_dump()
73 for intf in interfaces:
74 if intf.is_inside > 1:
75 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
78 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
82 interfaces = self.vapi.nat44_interface_output_feature_dump()
83 for intf in interfaces:
84 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
88 static_mappings = self.vapi.nat44_static_mapping_dump()
89 for sm in static_mappings:
90 self.vapi.nat44_add_del_static_mapping(
92 sm.external_ip_address,
93 local_port=sm.local_port,
94 external_port=sm.external_port,
95 addr_only=sm.addr_only,
98 twice_nat=sm.twice_nat,
99 self_twice_nat=sm.self_twice_nat,
100 out2in_only=sm.out2in_only,
102 external_sw_if_index=sm.external_sw_if_index,
105 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
106 for lb_sm in lb_static_mappings:
107 self.vapi.nat44_add_del_lb_static_mapping(
111 twice_nat=lb_sm.twice_nat,
112 self_twice_nat=lb_sm.self_twice_nat,
113 out2in_only=lb_sm.out2in_only,
119 identity_mappings = self.vapi.nat44_identity_mapping_dump()
120 for id_m in identity_mappings:
121 self.vapi.nat44_add_del_identity_mapping(
122 addr_only=id_m.addr_only,
125 sw_if_index=id_m.sw_if_index,
127 protocol=id_m.protocol,
130 adresses = self.vapi.nat44_address_dump()
131 for addr in adresses:
132 self.vapi.nat44_add_del_address_range(addr.ip_address,
134 twice_nat=addr.twice_nat,
137 self.vapi.nat_set_reass()
138 self.vapi.nat_set_reass(is_ip6=1)
139 self.verify_no_nat44_user()
140 self.vapi.nat_set_timeouts()
141 self.vapi.nat_set_addr_and_port_alloc_alg()
143 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
144 local_port=0, external_port=0, vrf_id=0,
145 is_add=1, external_sw_if_index=0xFFFFFFFF,
146 proto=0, twice_nat=0, self_twice_nat=0,
147 out2in_only=0, tag=""):
149 Add/delete NAT44 static mapping
151 :param local_ip: Local IP address
152 :param external_ip: External IP address
153 :param local_port: Local port number (Optional)
154 :param external_port: External port number (Optional)
155 :param vrf_id: VRF ID (Default 0)
156 :param is_add: 1 if add, 0 if delete (Default add)
157 :param external_sw_if_index: External interface instead of IP address
158 :param proto: IP protocol (Mandatory if port specified)
159 :param twice_nat: 1 if translate external host address and port
160 :param self_twice_nat: 1 if translate external host address and port
161 whenever external host address equals
162 local address of internal host
163 :param out2in_only: if 1 rule is matching only out2in direction
164 :param tag: Opaque string tag
167 if local_port and external_port:
169 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
170 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
171 self.vapi.nat44_add_del_static_mapping(
174 external_sw_if_index,
186 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
188 Add/delete NAT44 address
190 :param ip: IP address
191 :param is_add: 1 if add, 0 if delete (Default add)
192 :param twice_nat: twice NAT address for extenal hosts
194 nat_addr = socket.inet_pton(socket.AF_INET, ip)
195 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
199 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
201 Create packet stream for inside network
203 :param in_if: Inside interface
204 :param out_if: Outside interface
205 :param dst_ip: Destination address
206 :param ttl: TTL of generated packets
209 dst_ip = out_if.remote_ip4
213 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
214 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
215 TCP(sport=self.tcp_port_in, dport=20))
219 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
220 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
221 UDP(sport=self.udp_port_in, dport=20))
225 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
226 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
227 ICMP(id=self.icmp_id_in, type='echo-request'))
232 def compose_ip6(self, ip4, pref, plen):
234 Compose IPv4-embedded IPv6 addresses
236 :param ip4: IPv4 address
237 :param pref: IPv6 prefix
238 :param plen: IPv6 prefix length
239 :returns: IPv4-embedded IPv6 addresses
241 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
242 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
257 pref_n[10] = ip4_n[3]
261 pref_n[10] = ip4_n[2]
262 pref_n[11] = ip4_n[3]
265 pref_n[10] = ip4_n[1]
266 pref_n[11] = ip4_n[2]
267 pref_n[12] = ip4_n[3]
269 pref_n[12] = ip4_n[0]
270 pref_n[13] = ip4_n[1]
271 pref_n[14] = ip4_n[2]
272 pref_n[15] = ip4_n[3]
273 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
275 def extract_ip4(self, ip6, plen):
277 Extract IPv4 address embedded in IPv6 addresses
279 :param ip6: IPv6 address
280 :param plen: IPv6 prefix length
281 :returns: extracted IPv4 address
283 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
315 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
317 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
319 Create IPv6 packet stream for inside network
321 :param in_if: Inside interface
322 :param out_if: Outside interface
323 :param ttl: Hop Limit of generated packets
324 :param pref: NAT64 prefix
325 :param plen: NAT64 prefix length
329 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
331 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
334 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
335 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
336 TCP(sport=self.tcp_port_in, dport=20))
340 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
341 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
342 UDP(sport=self.udp_port_in, dport=20))
346 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
347 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
348 ICMPv6EchoRequest(id=self.icmp_id_in))
353 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
354 use_inside_ports=False):
356 Create packet stream for outside network
358 :param out_if: Outside interface
359 :param dst_ip: Destination IP address (Default use global NAT address)
360 :param ttl: TTL of generated packets
361 :param use_inside_ports: Use inside NAT ports as destination ports
362 instead of outside ports
365 dst_ip = self.nat_addr
366 if not use_inside_ports:
367 tcp_port = self.tcp_port_out
368 udp_port = self.udp_port_out
369 icmp_id = self.icmp_id_out
371 tcp_port = self.tcp_port_in
372 udp_port = self.udp_port_in
373 icmp_id = self.icmp_id_in
376 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
377 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
378 TCP(dport=tcp_port, sport=20))
382 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
383 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
384 UDP(dport=udp_port, sport=20))
388 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
389 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
390 ICMP(id=icmp_id, type='echo-reply'))
395 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
397 Create packet stream for outside network
399 :param out_if: Outside interface
400 :param dst_ip: Destination IP address (Default use global NAT address)
401 :param hl: HL of generated packets
405 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
406 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
407 TCP(dport=self.tcp_port_out, sport=20))
411 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
412 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
413 UDP(dport=self.udp_port_out, sport=20))
417 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
418 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
419 ICMPv6EchoReply(id=self.icmp_id_out))
424 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
425 dst_ip=None, is_ip6=False):
427 Verify captured packets on outside network
429 :param capture: Captured packets
430 :param nat_ip: Translated IP address (Default use global NAT address)
431 :param same_port: Sorce port number is not translated (Default False)
432 :param dst_ip: Destination IP address (Default do not verify)
433 :param is_ip6: If L3 protocol is IPv6 (Default False)
437 ICMP46 = ICMPv6EchoRequest
442 nat_ip = self.nat_addr
443 for packet in capture:
446 self.assert_packet_checksums_valid(packet)
447 self.assertEqual(packet[IP46].src, nat_ip)
448 if dst_ip is not None:
449 self.assertEqual(packet[IP46].dst, dst_ip)
450 if packet.haslayer(TCP):
452 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
455 packet[TCP].sport, self.tcp_port_in)
456 self.tcp_port_out = packet[TCP].sport
457 self.assert_packet_checksums_valid(packet)
458 elif packet.haslayer(UDP):
460 self.assertEqual(packet[UDP].sport, self.udp_port_in)
463 packet[UDP].sport, self.udp_port_in)
464 self.udp_port_out = packet[UDP].sport
467 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
469 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
470 self.icmp_id_out = packet[ICMP46].id
471 self.assert_packet_checksums_valid(packet)
473 self.logger.error(ppp("Unexpected or invalid packet "
474 "(outside network):", packet))
477 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
480 Verify captured packets on outside network
482 :param capture: Captured packets
483 :param nat_ip: Translated IP address
484 :param same_port: Sorce port number is not translated (Default False)
485 :param dst_ip: Destination IP address (Default do not verify)
487 return self.verify_capture_out(capture, nat_ip, same_port, dst_ip,
490 def verify_capture_in(self, capture, in_if):
492 Verify captured packets on inside network
494 :param capture: Captured packets
495 :param in_if: Inside interface
497 for packet in capture:
499 self.assert_packet_checksums_valid(packet)
500 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
501 if packet.haslayer(TCP):
502 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
503 elif packet.haslayer(UDP):
504 self.assertEqual(packet[UDP].dport, self.udp_port_in)
506 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
508 self.logger.error(ppp("Unexpected or invalid packet "
509 "(inside network):", packet))
512 def verify_capture_in_ip6(self, capture, src_ip, dst_ip):
514 Verify captured IPv6 packets on inside network
516 :param capture: Captured packets
517 :param src_ip: Source IP
518 :param dst_ip: Destination IP address
520 for packet in capture:
522 self.assertEqual(packet[IPv6].src, src_ip)
523 self.assertEqual(packet[IPv6].dst, dst_ip)
524 self.assert_packet_checksums_valid(packet)
525 if packet.haslayer(TCP):
526 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
527 elif packet.haslayer(UDP):
528 self.assertEqual(packet[UDP].dport, self.udp_port_in)
530 self.assertEqual(packet[ICMPv6EchoReply].id,
533 self.logger.error(ppp("Unexpected or invalid packet "
534 "(inside network):", packet))
537 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
539 Verify captured packet that don't have to be translated
541 :param capture: Captured packets
542 :param ingress_if: Ingress interface
543 :param egress_if: Egress interface
545 for packet in capture:
547 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
548 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
549 if packet.haslayer(TCP):
550 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
551 elif packet.haslayer(UDP):
552 self.assertEqual(packet[UDP].sport, self.udp_port_in)
554 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
556 self.logger.error(ppp("Unexpected or invalid packet "
557 "(inside network):", packet))
560 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
563 Verify captured packets with ICMP errors on outside network
565 :param capture: Captured packets
566 :param src_ip: Translated IP address or IP address of VPP
567 (Default use global NAT address)
568 :param icmp_type: Type of error ICMP packet
569 we are expecting (Default 11)
572 src_ip = self.nat_addr
573 for packet in capture:
575 self.assertEqual(packet[IP].src, src_ip)
576 self.assertTrue(packet.haslayer(ICMP))
578 self.assertEqual(icmp.type, icmp_type)
579 self.assertTrue(icmp.haslayer(IPerror))
580 inner_ip = icmp[IPerror]
581 if inner_ip.haslayer(TCPerror):
582 self.assertEqual(inner_ip[TCPerror].dport,
584 elif inner_ip.haslayer(UDPerror):
585 self.assertEqual(inner_ip[UDPerror].dport,
588 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
590 self.logger.error(ppp("Unexpected or invalid packet "
591 "(outside network):", packet))
594 def verify_capture_in_with_icmp_errors(self, capture, in_if, icmp_type=11):
596 Verify captured packets with ICMP errors on inside network
598 :param capture: Captured packets
599 :param in_if: Inside interface
600 :param icmp_type: Type of error ICMP packet
601 we are expecting (Default 11)
603 for packet in capture:
605 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
606 self.assertTrue(packet.haslayer(ICMP))
608 self.assertEqual(icmp.type, icmp_type)
609 self.assertTrue(icmp.haslayer(IPerror))
610 inner_ip = icmp[IPerror]
611 if inner_ip.haslayer(TCPerror):
612 self.assertEqual(inner_ip[TCPerror].sport,
614 elif inner_ip.haslayer(UDPerror):
615 self.assertEqual(inner_ip[UDPerror].sport,
618 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
620 self.logger.error(ppp("Unexpected or invalid packet "
621 "(inside network):", packet))
624 def create_stream_frag(self, src_if, dst, sport, dport, data):
626 Create fragmented packet stream
628 :param src_if: Source interface
629 :param dst: Destination IPv4 address
630 :param sport: Source TCP port
631 :param dport: Destination TCP port
632 :param data: Payload data
635 id = random.randint(0, 65535)
636 p = (IP(src=src_if.remote_ip4, dst=dst) /
637 TCP(sport=sport, dport=dport) /
639 p = p.__class__(str(p))
640 chksum = p['TCP'].chksum
642 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
643 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
644 TCP(sport=sport, dport=dport, chksum=chksum) /
647 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
648 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
649 proto=IP_PROTOS.tcp) /
652 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
653 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
659 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
660 pref=None, plen=0, frag_size=128):
662 Create fragmented packet stream
664 :param src_if: Source interface
665 :param dst: Destination IPv4 address
666 :param sport: Source TCP port
667 :param dport: Destination TCP port
668 :param data: Payload data
669 :param pref: NAT64 prefix
670 :param plen: NAT64 prefix length
671 :param fragsize: size of fragments
675 dst_ip6 = ''.join(['64:ff9b::', dst])
677 dst_ip6 = self.compose_ip6(dst, pref, plen)
679 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
680 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
681 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
682 TCP(sport=sport, dport=dport) /
685 return fragment6(p, frag_size)
687 def reass_frags_and_verify(self, frags, src, dst):
689 Reassemble and verify fragmented packet
691 :param frags: Captured fragments
692 :param src: Source IPv4 address to verify
693 :param dst: Destination IPv4 address to verify
695 :returns: Reassembled IPv4 packet
697 buffer = StringIO.StringIO()
699 self.assertEqual(p[IP].src, src)
700 self.assertEqual(p[IP].dst, dst)
701 self.assert_ip_checksum_valid(p)
702 buffer.seek(p[IP].frag * 8)
703 buffer.write(p[IP].payload)
704 ip = frags[0].getlayer(IP)
705 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
706 proto=frags[0][IP].proto)
707 if ip.proto == IP_PROTOS.tcp:
708 p = (ip / TCP(buffer.getvalue()))
709 self.assert_tcp_checksum_valid(p)
710 elif ip.proto == IP_PROTOS.udp:
711 p = (ip / UDP(buffer.getvalue()))
714 def reass_frags_and_verify_ip6(self, frags, src, dst):
716 Reassemble and verify fragmented packet
718 :param frags: Captured fragments
719 :param src: Source IPv6 address to verify
720 :param dst: Destination IPv6 address to verify
722 :returns: Reassembled IPv6 packet
724 buffer = StringIO.StringIO()
726 self.assertEqual(p[IPv6].src, src)
727 self.assertEqual(p[IPv6].dst, dst)
728 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
729 buffer.write(p[IPv6ExtHdrFragment].payload)
730 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
731 nh=frags[0][IPv6ExtHdrFragment].nh)
732 if ip.nh == IP_PROTOS.tcp:
733 p = (ip / TCP(buffer.getvalue()))
734 elif ip.nh == IP_PROTOS.udp:
735 p = (ip / UDP(buffer.getvalue()))
736 self.assert_packet_checksums_valid(p)
739 def initiate_tcp_session(self, in_if, out_if):
741 Initiates TCP session
743 :param in_if: Inside interface
744 :param out_if: Outside interface
748 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
749 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
750 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
753 self.pg_enable_capture(self.pg_interfaces)
755 capture = out_if.get_capture(1)
757 self.tcp_port_out = p[TCP].sport
759 # SYN + ACK packet out->in
760 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
761 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
762 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
765 self.pg_enable_capture(self.pg_interfaces)
770 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
771 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
772 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
775 self.pg_enable_capture(self.pg_interfaces)
777 out_if.get_capture(1)
780 self.logger.error("TCP 3 way handshake failed")
783 def verify_ipfix_nat44_ses(self, data):
785 Verify IPFIX NAT44 session create/delete event
787 :param data: Decoded IPFIX data records
789 nat44_ses_create_num = 0
790 nat44_ses_delete_num = 0
791 self.assertEqual(6, len(data))
794 self.assertIn(ord(record[230]), [4, 5])
795 if ord(record[230]) == 4:
796 nat44_ses_create_num += 1
798 nat44_ses_delete_num += 1
800 self.assertEqual(self.pg0.remote_ip4n, record[8])
801 # postNATSourceIPv4Address
802 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
805 self.assertEqual(struct.pack("!I", 0), record[234])
806 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
807 if IP_PROTOS.icmp == ord(record[4]):
808 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
809 self.assertEqual(struct.pack("!H", self.icmp_id_out),
811 elif IP_PROTOS.tcp == ord(record[4]):
812 self.assertEqual(struct.pack("!H", self.tcp_port_in),
814 self.assertEqual(struct.pack("!H", self.tcp_port_out),
816 elif IP_PROTOS.udp == ord(record[4]):
817 self.assertEqual(struct.pack("!H", self.udp_port_in),
819 self.assertEqual(struct.pack("!H", self.udp_port_out),
822 self.fail("Invalid protocol")
823 self.assertEqual(3, nat44_ses_create_num)
824 self.assertEqual(3, nat44_ses_delete_num)
826 def verify_ipfix_addr_exhausted(self, data):
828 Verify IPFIX NAT addresses event
830 :param data: Decoded IPFIX data records
832 self.assertEqual(1, len(data))
835 self.assertEqual(ord(record[230]), 3)
837 self.assertEqual(struct.pack("!I", 0), record[283])
839 def verify_ipfix_max_sessions(self, data, limit):
841 Verify IPFIX maximum session entries exceeded event
843 :param data: Decoded IPFIX data records
844 :param limit: Number of maximum session entries that can be created.
846 self.assertEqual(1, len(data))
849 self.assertEqual(ord(record[230]), 13)
850 # natQuotaExceededEvent
851 self.assertEqual(struct.pack("I", 1), record[466])
853 self.assertEqual(struct.pack("I", limit), record[471])
855 def verify_ipfix_max_bibs(self, data, limit):
857 Verify IPFIX maximum BIB entries exceeded event
859 :param data: Decoded IPFIX data records
860 :param limit: Number of maximum BIB entries that can be created.
862 self.assertEqual(1, len(data))
865 self.assertEqual(ord(record[230]), 13)
866 # natQuotaExceededEvent
867 self.assertEqual(struct.pack("I", 2), record[466])
869 self.assertEqual(struct.pack("I", limit), record[472])
871 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
873 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
875 :param data: Decoded IPFIX data records
876 :param limit: Number of maximum fragments pending reassembly
877 :param src_addr: IPv6 source address
879 self.assertEqual(1, len(data))
882 self.assertEqual(ord(record[230]), 13)
883 # natQuotaExceededEvent
884 self.assertEqual(struct.pack("I", 5), record[466])
885 # maxFragmentsPendingReassembly
886 self.assertEqual(struct.pack("I", limit), record[475])
888 self.assertEqual(src_addr, record[27])
890 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
892 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
894 :param data: Decoded IPFIX data records
895 :param limit: Number of maximum fragments pending reassembly
896 :param src_addr: IPv4 source address
898 self.assertEqual(1, len(data))
901 self.assertEqual(ord(record[230]), 13)
902 # natQuotaExceededEvent
903 self.assertEqual(struct.pack("I", 5), record[466])
904 # maxFragmentsPendingReassembly
905 self.assertEqual(struct.pack("I", limit), record[475])
907 self.assertEqual(src_addr, record[8])
909 def verify_ipfix_bib(self, data, is_create, src_addr):
911 Verify IPFIX NAT64 BIB create and delete events
913 :param data: Decoded IPFIX data records
914 :param is_create: Create event if nonzero value otherwise delete event
915 :param src_addr: IPv6 source address
917 self.assertEqual(1, len(data))
921 self.assertEqual(ord(record[230]), 10)
923 self.assertEqual(ord(record[230]), 11)
925 self.assertEqual(src_addr, record[27])
926 # postNATSourceIPv4Address
927 self.assertEqual(self.nat_addr_n, record[225])
929 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
931 self.assertEqual(struct.pack("!I", 0), record[234])
932 # sourceTransportPort
933 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
934 # postNAPTSourceTransportPort
935 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
937 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
940 Verify IPFIX NAT64 session create and delete events
942 :param data: Decoded IPFIX data records
943 :param is_create: Create event if nonzero value otherwise delete event
944 :param src_addr: IPv6 source address
945 :param dst_addr: IPv4 destination address
946 :param dst_port: destination TCP port
948 self.assertEqual(1, len(data))
952 self.assertEqual(ord(record[230]), 6)
954 self.assertEqual(ord(record[230]), 7)
956 self.assertEqual(src_addr, record[27])
957 # destinationIPv6Address
958 self.assertEqual(socket.inet_pton(socket.AF_INET6,
959 self.compose_ip6(dst_addr,
963 # postNATSourceIPv4Address
964 self.assertEqual(self.nat_addr_n, record[225])
965 # postNATDestinationIPv4Address
966 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
969 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
971 self.assertEqual(struct.pack("!I", 0), record[234])
972 # sourceTransportPort
973 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
974 # postNAPTSourceTransportPort
975 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
976 # destinationTransportPort
977 self.assertEqual(struct.pack("!H", dst_port), record[11])
978 # postNAPTDestinationTransportPort
979 self.assertEqual(struct.pack("!H", dst_port), record[228])
981 def verify_no_nat44_user(self):
982 """ Verify that there is no NAT44 user """
983 users = self.vapi.nat44_user_dump()
984 self.assertEqual(len(users), 0)
986 def verify_ipfix_max_entries_per_user(self, data, limit, src_addr):
988 Verify IPFIX maximum entries per user exceeded event
990 :param data: Decoded IPFIX data records
991 :param limit: Number of maximum entries per user
992 :param src_addr: IPv4 source address
994 self.assertEqual(1, len(data))
997 self.assertEqual(ord(record[230]), 13)
998 # natQuotaExceededEvent
999 self.assertEqual(struct.pack("I", 3), record[466])
1001 self.assertEqual(struct.pack("I", limit), record[473])
1003 self.assertEqual(src_addr, record[8])
1006 class TestNAT44(MethodHolder):
1007 """ NAT44 Test Cases """
1010 def setUpClass(cls):
1011 super(TestNAT44, cls).setUpClass()
1012 cls.vapi.cli("set log class nat level debug")
1015 cls.tcp_port_in = 6303
1016 cls.tcp_port_out = 6303
1017 cls.udp_port_in = 6304
1018 cls.udp_port_out = 6304
1019 cls.icmp_id_in = 6305
1020 cls.icmp_id_out = 6305
1021 cls.nat_addr = '10.0.0.3'
1022 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
1023 cls.ipfix_src_port = 4739
1024 cls.ipfix_domain_id = 1
1025 cls.tcp_external_port = 80
1027 cls.create_pg_interfaces(range(10))
1028 cls.interfaces = list(cls.pg_interfaces[0:4])
1030 for i in cls.interfaces:
1035 cls.pg0.generate_remote_hosts(3)
1036 cls.pg0.configure_ipv4_neighbors()
1038 cls.pg1.generate_remote_hosts(1)
1039 cls.pg1.configure_ipv4_neighbors()
1041 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
1042 cls.vapi.ip_table_add_del(10, is_add=1)
1043 cls.vapi.ip_table_add_del(20, is_add=1)
1045 cls.pg4._local_ip4 = "172.16.255.1"
1046 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1047 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
1048 cls.pg4.set_table_ip4(10)
1049 cls.pg5._local_ip4 = "172.17.255.3"
1050 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1051 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
1052 cls.pg5.set_table_ip4(10)
1053 cls.pg6._local_ip4 = "172.16.255.1"
1054 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1055 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
1056 cls.pg6.set_table_ip4(20)
1057 for i in cls.overlapping_interfaces:
1065 cls.pg9.generate_remote_hosts(2)
1066 cls.pg9.config_ip4()
1067 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
1068 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
1072 cls.pg9.resolve_arp()
1073 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
1074 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
1075 cls.pg9.resolve_arp()
1078 super(TestNAT44, cls).tearDownClass()
1081 def test_dynamic(self):
1082 """ NAT44 dynamic translation test """
1084 self.nat44_add_address(self.nat_addr)
1085 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1086 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1090 pkts = self.create_stream_in(self.pg0, self.pg1)
1091 self.pg0.add_stream(pkts)
1092 self.pg_enable_capture(self.pg_interfaces)
1094 capture = self.pg1.get_capture(len(pkts))
1095 self.verify_capture_out(capture)
1098 pkts = self.create_stream_out(self.pg1)
1099 self.pg1.add_stream(pkts)
1100 self.pg_enable_capture(self.pg_interfaces)
1102 capture = self.pg0.get_capture(len(pkts))
1103 self.verify_capture_in(capture, self.pg0)
1105 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1106 """ NAT44 handling of client packets with TTL=1 """
1108 self.nat44_add_address(self.nat_addr)
1109 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1110 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1113 # Client side - generate traffic
1114 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1115 self.pg0.add_stream(pkts)
1116 self.pg_enable_capture(self.pg_interfaces)
1119 # Client side - verify ICMP type 11 packets
1120 capture = self.pg0.get_capture(len(pkts))
1121 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1123 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1124 """ NAT44 handling of server packets with TTL=1 """
1126 self.nat44_add_address(self.nat_addr)
1127 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1128 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1131 # Client side - create sessions
1132 pkts = self.create_stream_in(self.pg0, self.pg1)
1133 self.pg0.add_stream(pkts)
1134 self.pg_enable_capture(self.pg_interfaces)
1137 # Server side - generate traffic
1138 capture = self.pg1.get_capture(len(pkts))
1139 self.verify_capture_out(capture)
1140 pkts = self.create_stream_out(self.pg1, ttl=1)
1141 self.pg1.add_stream(pkts)
1142 self.pg_enable_capture(self.pg_interfaces)
1145 # Server side - verify ICMP type 11 packets
1146 capture = self.pg1.get_capture(len(pkts))
1147 self.verify_capture_out_with_icmp_errors(capture,
1148 src_ip=self.pg1.local_ip4)
1150 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1151 """ NAT44 handling of error responses to client packets with TTL=2 """
1153 self.nat44_add_address(self.nat_addr)
1154 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1155 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1158 # Client side - generate traffic
1159 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1160 self.pg0.add_stream(pkts)
1161 self.pg_enable_capture(self.pg_interfaces)
1164 # Server side - simulate ICMP type 11 response
1165 capture = self.pg1.get_capture(len(pkts))
1166 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1167 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1168 ICMP(type=11) / packet[IP] for packet in capture]
1169 self.pg1.add_stream(pkts)
1170 self.pg_enable_capture(self.pg_interfaces)
1173 # Client side - verify ICMP type 11 packets
1174 capture = self.pg0.get_capture(len(pkts))
1175 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1177 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1178 """ NAT44 handling of error responses to server packets with TTL=2 """
1180 self.nat44_add_address(self.nat_addr)
1181 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1182 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1185 # Client side - create sessions
1186 pkts = self.create_stream_in(self.pg0, self.pg1)
1187 self.pg0.add_stream(pkts)
1188 self.pg_enable_capture(self.pg_interfaces)
1191 # Server side - generate traffic
1192 capture = self.pg1.get_capture(len(pkts))
1193 self.verify_capture_out(capture)
1194 pkts = self.create_stream_out(self.pg1, ttl=2)
1195 self.pg1.add_stream(pkts)
1196 self.pg_enable_capture(self.pg_interfaces)
1199 # Client side - simulate ICMP type 11 response
1200 capture = self.pg0.get_capture(len(pkts))
1201 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1202 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1203 ICMP(type=11) / packet[IP] for packet in capture]
1204 self.pg0.add_stream(pkts)
1205 self.pg_enable_capture(self.pg_interfaces)
1208 # Server side - verify ICMP type 11 packets
1209 capture = self.pg1.get_capture(len(pkts))
1210 self.verify_capture_out_with_icmp_errors(capture)
1212 def test_ping_out_interface_from_outside(self):
1213 """ Ping NAT44 out interface from outside network """
1215 self.nat44_add_address(self.nat_addr)
1216 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1217 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1220 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1221 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1222 ICMP(id=self.icmp_id_out, type='echo-request'))
1224 self.pg1.add_stream(pkts)
1225 self.pg_enable_capture(self.pg_interfaces)
1227 capture = self.pg1.get_capture(len(pkts))
1230 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1231 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1232 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1233 self.assertEqual(packet[ICMP].type, 0) # echo reply
1235 self.logger.error(ppp("Unexpected or invalid packet "
1236 "(outside network):", packet))
1239 def test_ping_internal_host_from_outside(self):
1240 """ Ping internal host from outside network """
1242 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1243 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1244 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1248 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1249 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1250 ICMP(id=self.icmp_id_out, type='echo-request'))
1251 self.pg1.add_stream(pkt)
1252 self.pg_enable_capture(self.pg_interfaces)
1254 capture = self.pg0.get_capture(1)
1255 self.verify_capture_in(capture, self.pg0)
1256 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1259 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1260 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1261 ICMP(id=self.icmp_id_in, type='echo-reply'))
1262 self.pg0.add_stream(pkt)
1263 self.pg_enable_capture(self.pg_interfaces)
1265 capture = self.pg1.get_capture(1)
1266 self.verify_capture_out(capture, same_port=True)
1267 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1269 def test_forwarding(self):
1270 """ NAT44 forwarding test """
1272 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1273 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1275 self.vapi.nat44_forwarding_enable_disable(1)
1277 real_ip = self.pg0.remote_ip4n
1278 alias_ip = self.nat_addr_n
1279 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1280 external_ip=alias_ip)
1283 # static mapping match
1285 pkts = self.create_stream_out(self.pg1)
1286 self.pg1.add_stream(pkts)
1287 self.pg_enable_capture(self.pg_interfaces)
1289 capture = self.pg0.get_capture(len(pkts))
1290 self.verify_capture_in(capture, self.pg0)
1292 pkts = self.create_stream_in(self.pg0, self.pg1)
1293 self.pg0.add_stream(pkts)
1294 self.pg_enable_capture(self.pg_interfaces)
1296 capture = self.pg1.get_capture(len(pkts))
1297 self.verify_capture_out(capture, same_port=True)
1299 # no static mapping match
1301 host0 = self.pg0.remote_hosts[0]
1302 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1304 pkts = self.create_stream_out(self.pg1,
1305 dst_ip=self.pg0.remote_ip4,
1306 use_inside_ports=True)
1307 self.pg1.add_stream(pkts)
1308 self.pg_enable_capture(self.pg_interfaces)
1310 capture = self.pg0.get_capture(len(pkts))
1311 self.verify_capture_in(capture, self.pg0)
1313 pkts = self.create_stream_in(self.pg0, self.pg1)
1314 self.pg0.add_stream(pkts)
1315 self.pg_enable_capture(self.pg_interfaces)
1317 capture = self.pg1.get_capture(len(pkts))
1318 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1321 self.pg0.remote_hosts[0] = host0
1324 self.vapi.nat44_forwarding_enable_disable(0)
1325 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1326 external_ip=alias_ip,
1329 def test_static_in(self):
1330 """ 1:1 NAT initialized from inside network """
1332 nat_ip = "10.0.0.10"
1333 self.tcp_port_out = 6303
1334 self.udp_port_out = 6304
1335 self.icmp_id_out = 6305
1337 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1338 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1339 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1341 sm = self.vapi.nat44_static_mapping_dump()
1342 self.assertEqual(len(sm), 1)
1343 self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1344 self.assertEqual(sm[0].protocol, 0)
1345 self.assertEqual(sm[0].local_port, 0)
1346 self.assertEqual(sm[0].external_port, 0)
1349 pkts = self.create_stream_in(self.pg0, self.pg1)
1350 self.pg0.add_stream(pkts)
1351 self.pg_enable_capture(self.pg_interfaces)
1353 capture = self.pg1.get_capture(len(pkts))
1354 self.verify_capture_out(capture, nat_ip, True)
1357 pkts = self.create_stream_out(self.pg1, nat_ip)
1358 self.pg1.add_stream(pkts)
1359 self.pg_enable_capture(self.pg_interfaces)
1361 capture = self.pg0.get_capture(len(pkts))
1362 self.verify_capture_in(capture, self.pg0)
1364 def test_static_out(self):
1365 """ 1:1 NAT initialized from outside network """
1367 nat_ip = "10.0.0.20"
1368 self.tcp_port_out = 6303
1369 self.udp_port_out = 6304
1370 self.icmp_id_out = 6305
1373 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1374 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1375 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1377 sm = self.vapi.nat44_static_mapping_dump()
1378 self.assertEqual(len(sm), 1)
1379 self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1382 pkts = self.create_stream_out(self.pg1, nat_ip)
1383 self.pg1.add_stream(pkts)
1384 self.pg_enable_capture(self.pg_interfaces)
1386 capture = self.pg0.get_capture(len(pkts))
1387 self.verify_capture_in(capture, self.pg0)
1390 pkts = self.create_stream_in(self.pg0, self.pg1)
1391 self.pg0.add_stream(pkts)
1392 self.pg_enable_capture(self.pg_interfaces)
1394 capture = self.pg1.get_capture(len(pkts))
1395 self.verify_capture_out(capture, nat_ip, True)
1397 def test_static_with_port_in(self):
1398 """ 1:1 NAPT initialized from inside network """
1400 self.tcp_port_out = 3606
1401 self.udp_port_out = 3607
1402 self.icmp_id_out = 3608
1404 self.nat44_add_address(self.nat_addr)
1405 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1406 self.tcp_port_in, self.tcp_port_out,
1407 proto=IP_PROTOS.tcp)
1408 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1409 self.udp_port_in, self.udp_port_out,
1410 proto=IP_PROTOS.udp)
1411 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1412 self.icmp_id_in, self.icmp_id_out,
1413 proto=IP_PROTOS.icmp)
1414 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1415 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1419 pkts = self.create_stream_in(self.pg0, self.pg1)
1420 self.pg0.add_stream(pkts)
1421 self.pg_enable_capture(self.pg_interfaces)
1423 capture = self.pg1.get_capture(len(pkts))
1424 self.verify_capture_out(capture)
1427 pkts = self.create_stream_out(self.pg1)
1428 self.pg1.add_stream(pkts)
1429 self.pg_enable_capture(self.pg_interfaces)
1431 capture = self.pg0.get_capture(len(pkts))
1432 self.verify_capture_in(capture, self.pg0)
1434 def test_static_with_port_out(self):
1435 """ 1:1 NAPT initialized from outside network """
1437 self.tcp_port_out = 30606
1438 self.udp_port_out = 30607
1439 self.icmp_id_out = 30608
1441 self.nat44_add_address(self.nat_addr)
1442 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1443 self.tcp_port_in, self.tcp_port_out,
1444 proto=IP_PROTOS.tcp)
1445 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1446 self.udp_port_in, self.udp_port_out,
1447 proto=IP_PROTOS.udp)
1448 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1449 self.icmp_id_in, self.icmp_id_out,
1450 proto=IP_PROTOS.icmp)
1451 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1452 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1456 pkts = self.create_stream_out(self.pg1)
1457 self.pg1.add_stream(pkts)
1458 self.pg_enable_capture(self.pg_interfaces)
1460 capture = self.pg0.get_capture(len(pkts))
1461 self.verify_capture_in(capture, self.pg0)
1464 pkts = self.create_stream_in(self.pg0, self.pg1)
1465 self.pg0.add_stream(pkts)
1466 self.pg_enable_capture(self.pg_interfaces)
1468 capture = self.pg1.get_capture(len(pkts))
1469 self.verify_capture_out(capture)
1471 def test_static_vrf_aware(self):
1472 """ 1:1 NAT VRF awareness """
1474 nat_ip1 = "10.0.0.30"
1475 nat_ip2 = "10.0.0.40"
1476 self.tcp_port_out = 6303
1477 self.udp_port_out = 6304
1478 self.icmp_id_out = 6305
1480 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1482 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1484 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1486 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1487 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1489 # inside interface VRF match NAT44 static mapping VRF
1490 pkts = self.create_stream_in(self.pg4, self.pg3)
1491 self.pg4.add_stream(pkts)
1492 self.pg_enable_capture(self.pg_interfaces)
1494 capture = self.pg3.get_capture(len(pkts))
1495 self.verify_capture_out(capture, nat_ip1, True)
1497 # inside interface VRF don't match NAT44 static mapping VRF (packets
1499 pkts = self.create_stream_in(self.pg0, self.pg3)
1500 self.pg0.add_stream(pkts)
1501 self.pg_enable_capture(self.pg_interfaces)
1503 self.pg3.assert_nothing_captured()
1505 def test_dynamic_to_static(self):
1506 """ Switch from dynamic translation to 1:1NAT """
1507 nat_ip = "10.0.0.10"
1508 self.tcp_port_out = 6303
1509 self.udp_port_out = 6304
1510 self.icmp_id_out = 6305
1512 self.nat44_add_address(self.nat_addr)
1513 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1514 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1518 pkts = self.create_stream_in(self.pg0, self.pg1)
1519 self.pg0.add_stream(pkts)
1520 self.pg_enable_capture(self.pg_interfaces)
1522 capture = self.pg1.get_capture(len(pkts))
1523 self.verify_capture_out(capture)
1526 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1527 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
1528 self.assertEqual(len(sessions), 0)
1529 pkts = self.create_stream_in(self.pg0, self.pg1)
1530 self.pg0.add_stream(pkts)
1531 self.pg_enable_capture(self.pg_interfaces)
1533 capture = self.pg1.get_capture(len(pkts))
1534 self.verify_capture_out(capture, nat_ip, True)
1536 def test_identity_nat(self):
1537 """ Identity NAT """
1539 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1540 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1541 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1544 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1545 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1546 TCP(sport=12345, dport=56789))
1547 self.pg1.add_stream(p)
1548 self.pg_enable_capture(self.pg_interfaces)
1550 capture = self.pg0.get_capture(1)
1555 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1556 self.assertEqual(ip.src, self.pg1.remote_ip4)
1557 self.assertEqual(tcp.dport, 56789)
1558 self.assertEqual(tcp.sport, 12345)
1559 self.assert_packet_checksums_valid(p)
1561 self.logger.error(ppp("Unexpected or invalid packet:", p))
1564 def test_multiple_inside_interfaces(self):
1565 """ NAT44 multiple non-overlapping address space inside interfaces """
1567 self.nat44_add_address(self.nat_addr)
1568 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1569 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1570 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1573 # between two NAT44 inside interfaces (no translation)
1574 pkts = self.create_stream_in(self.pg0, self.pg1)
1575 self.pg0.add_stream(pkts)
1576 self.pg_enable_capture(self.pg_interfaces)
1578 capture = self.pg1.get_capture(len(pkts))
1579 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1581 # from NAT44 inside to interface without NAT44 feature (no translation)
1582 pkts = self.create_stream_in(self.pg0, self.pg2)
1583 self.pg0.add_stream(pkts)
1584 self.pg_enable_capture(self.pg_interfaces)
1586 capture = self.pg2.get_capture(len(pkts))
1587 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1589 # in2out 1st interface
1590 pkts = self.create_stream_in(self.pg0, self.pg3)
1591 self.pg0.add_stream(pkts)
1592 self.pg_enable_capture(self.pg_interfaces)
1594 capture = self.pg3.get_capture(len(pkts))
1595 self.verify_capture_out(capture)
1597 # out2in 1st interface
1598 pkts = self.create_stream_out(self.pg3)
1599 self.pg3.add_stream(pkts)
1600 self.pg_enable_capture(self.pg_interfaces)
1602 capture = self.pg0.get_capture(len(pkts))
1603 self.verify_capture_in(capture, self.pg0)
1605 # in2out 2nd interface
1606 pkts = self.create_stream_in(self.pg1, self.pg3)
1607 self.pg1.add_stream(pkts)
1608 self.pg_enable_capture(self.pg_interfaces)
1610 capture = self.pg3.get_capture(len(pkts))
1611 self.verify_capture_out(capture)
1613 # out2in 2nd interface
1614 pkts = self.create_stream_out(self.pg3)
1615 self.pg3.add_stream(pkts)
1616 self.pg_enable_capture(self.pg_interfaces)
1618 capture = self.pg1.get_capture(len(pkts))
1619 self.verify_capture_in(capture, self.pg1)
1621 def test_inside_overlapping_interfaces(self):
1622 """ NAT44 multiple inside interfaces with overlapping address space """
1624 static_nat_ip = "10.0.0.10"
1625 self.nat44_add_address(self.nat_addr)
1626 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1628 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1629 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1630 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1631 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1634 # between NAT44 inside interfaces with same VRF (no translation)
1635 pkts = self.create_stream_in(self.pg4, self.pg5)
1636 self.pg4.add_stream(pkts)
1637 self.pg_enable_capture(self.pg_interfaces)
1639 capture = self.pg5.get_capture(len(pkts))
1640 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1642 # between NAT44 inside interfaces with different VRF (hairpinning)
1643 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1644 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1645 TCP(sport=1234, dport=5678))
1646 self.pg4.add_stream(p)
1647 self.pg_enable_capture(self.pg_interfaces)
1649 capture = self.pg6.get_capture(1)
1654 self.assertEqual(ip.src, self.nat_addr)
1655 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1656 self.assertNotEqual(tcp.sport, 1234)
1657 self.assertEqual(tcp.dport, 5678)
1659 self.logger.error(ppp("Unexpected or invalid packet:", p))
1662 # in2out 1st interface
1663 pkts = self.create_stream_in(self.pg4, self.pg3)
1664 self.pg4.add_stream(pkts)
1665 self.pg_enable_capture(self.pg_interfaces)
1667 capture = self.pg3.get_capture(len(pkts))
1668 self.verify_capture_out(capture)
1670 # out2in 1st interface
1671 pkts = self.create_stream_out(self.pg3)
1672 self.pg3.add_stream(pkts)
1673 self.pg_enable_capture(self.pg_interfaces)
1675 capture = self.pg4.get_capture(len(pkts))
1676 self.verify_capture_in(capture, self.pg4)
1678 # in2out 2nd interface
1679 pkts = self.create_stream_in(self.pg5, self.pg3)
1680 self.pg5.add_stream(pkts)
1681 self.pg_enable_capture(self.pg_interfaces)
1683 capture = self.pg3.get_capture(len(pkts))
1684 self.verify_capture_out(capture)
1686 # out2in 2nd interface
1687 pkts = self.create_stream_out(self.pg3)
1688 self.pg3.add_stream(pkts)
1689 self.pg_enable_capture(self.pg_interfaces)
1691 capture = self.pg5.get_capture(len(pkts))
1692 self.verify_capture_in(capture, self.pg5)
1695 addresses = self.vapi.nat44_address_dump()
1696 self.assertEqual(len(addresses), 1)
1697 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1698 self.assertEqual(len(sessions), 3)
1699 for session in sessions:
1700 self.assertFalse(session.is_static)
1701 self.assertEqual(session.inside_ip_address[0:4],
1702 self.pg5.remote_ip4n)
1703 self.assertEqual(session.outside_ip_address,
1704 addresses[0].ip_address)
1705 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1706 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1707 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1708 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1709 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1710 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1711 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1712 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1713 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1715 # in2out 3rd interface
1716 pkts = self.create_stream_in(self.pg6, self.pg3)
1717 self.pg6.add_stream(pkts)
1718 self.pg_enable_capture(self.pg_interfaces)
1720 capture = self.pg3.get_capture(len(pkts))
1721 self.verify_capture_out(capture, static_nat_ip, True)
1723 # out2in 3rd interface
1724 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1725 self.pg3.add_stream(pkts)
1726 self.pg_enable_capture(self.pg_interfaces)
1728 capture = self.pg6.get_capture(len(pkts))
1729 self.verify_capture_in(capture, self.pg6)
1731 # general user and session dump verifications
1732 users = self.vapi.nat44_user_dump()
1733 self.assertTrue(len(users) >= 3)
1734 addresses = self.vapi.nat44_address_dump()
1735 self.assertEqual(len(addresses), 1)
1737 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1739 for session in sessions:
1740 self.assertEqual(user.ip_address, session.inside_ip_address)
1741 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1742 self.assertTrue(session.protocol in
1743 [IP_PROTOS.tcp, IP_PROTOS.udp,
1745 self.assertFalse(session.ext_host_valid)
1748 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1749 self.assertTrue(len(sessions) >= 4)
1750 for session in sessions:
1751 self.assertFalse(session.is_static)
1752 self.assertEqual(session.inside_ip_address[0:4],
1753 self.pg4.remote_ip4n)
1754 self.assertEqual(session.outside_ip_address,
1755 addresses[0].ip_address)
1758 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1759 self.assertTrue(len(sessions) >= 3)
1760 for session in sessions:
1761 self.assertTrue(session.is_static)
1762 self.assertEqual(session.inside_ip_address[0:4],
1763 self.pg6.remote_ip4n)
1764 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1765 map(int, static_nat_ip.split('.')))
1766 self.assertTrue(session.inside_port in
1767 [self.tcp_port_in, self.udp_port_in,
1770 def test_hairpinning(self):
1771 """ NAT44 hairpinning - 1:1 NAPT """
1773 host = self.pg0.remote_hosts[0]
1774 server = self.pg0.remote_hosts[1]
1777 server_in_port = 5678
1778 server_out_port = 8765
1780 self.nat44_add_address(self.nat_addr)
1781 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1782 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1784 # add static mapping for server
1785 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1786 server_in_port, server_out_port,
1787 proto=IP_PROTOS.tcp)
1789 # send packet from host to server
1790 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1791 IP(src=host.ip4, dst=self.nat_addr) /
1792 TCP(sport=host_in_port, dport=server_out_port))
1793 self.pg0.add_stream(p)
1794 self.pg_enable_capture(self.pg_interfaces)
1796 capture = self.pg0.get_capture(1)
1801 self.assertEqual(ip.src, self.nat_addr)
1802 self.assertEqual(ip.dst, server.ip4)
1803 self.assertNotEqual(tcp.sport, host_in_port)
1804 self.assertEqual(tcp.dport, server_in_port)
1805 self.assert_packet_checksums_valid(p)
1806 host_out_port = tcp.sport
1808 self.logger.error(ppp("Unexpected or invalid packet:", p))
1811 # send reply from server to host
1812 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1813 IP(src=server.ip4, dst=self.nat_addr) /
1814 TCP(sport=server_in_port, dport=host_out_port))
1815 self.pg0.add_stream(p)
1816 self.pg_enable_capture(self.pg_interfaces)
1818 capture = self.pg0.get_capture(1)
1823 self.assertEqual(ip.src, self.nat_addr)
1824 self.assertEqual(ip.dst, host.ip4)
1825 self.assertEqual(tcp.sport, server_out_port)
1826 self.assertEqual(tcp.dport, host_in_port)
1827 self.assert_packet_checksums_valid(p)
1829 self.logger.error(ppp("Unexpected or invalid packet:", p))
1832 def test_hairpinning2(self):
1833 """ NAT44 hairpinning - 1:1 NAT"""
1835 server1_nat_ip = "10.0.0.10"
1836 server2_nat_ip = "10.0.0.11"
1837 host = self.pg0.remote_hosts[0]
1838 server1 = self.pg0.remote_hosts[1]
1839 server2 = self.pg0.remote_hosts[2]
1840 server_tcp_port = 22
1841 server_udp_port = 20
1843 self.nat44_add_address(self.nat_addr)
1844 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1845 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1848 # add static mapping for servers
1849 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1850 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1854 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1855 IP(src=host.ip4, dst=server1_nat_ip) /
1856 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1858 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1859 IP(src=host.ip4, dst=server1_nat_ip) /
1860 UDP(sport=self.udp_port_in, dport=server_udp_port))
1862 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1863 IP(src=host.ip4, dst=server1_nat_ip) /
1864 ICMP(id=self.icmp_id_in, type='echo-request'))
1866 self.pg0.add_stream(pkts)
1867 self.pg_enable_capture(self.pg_interfaces)
1869 capture = self.pg0.get_capture(len(pkts))
1870 for packet in capture:
1872 self.assertEqual(packet[IP].src, self.nat_addr)
1873 self.assertEqual(packet[IP].dst, server1.ip4)
1874 if packet.haslayer(TCP):
1875 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1876 self.assertEqual(packet[TCP].dport, server_tcp_port)
1877 self.tcp_port_out = packet[TCP].sport
1878 self.assert_packet_checksums_valid(packet)
1879 elif packet.haslayer(UDP):
1880 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1881 self.assertEqual(packet[UDP].dport, server_udp_port)
1882 self.udp_port_out = packet[UDP].sport
1884 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1885 self.icmp_id_out = packet[ICMP].id
1887 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1892 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1893 IP(src=server1.ip4, dst=self.nat_addr) /
1894 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1896 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1897 IP(src=server1.ip4, dst=self.nat_addr) /
1898 UDP(sport=server_udp_port, dport=self.udp_port_out))
1900 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1901 IP(src=server1.ip4, dst=self.nat_addr) /
1902 ICMP(id=self.icmp_id_out, type='echo-reply'))
1904 self.pg0.add_stream(pkts)
1905 self.pg_enable_capture(self.pg_interfaces)
1907 capture = self.pg0.get_capture(len(pkts))
1908 for packet in capture:
1910 self.assertEqual(packet[IP].src, server1_nat_ip)
1911 self.assertEqual(packet[IP].dst, host.ip4)
1912 if packet.haslayer(TCP):
1913 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1914 self.assertEqual(packet[TCP].sport, server_tcp_port)
1915 self.assert_packet_checksums_valid(packet)
1916 elif packet.haslayer(UDP):
1917 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1918 self.assertEqual(packet[UDP].sport, server_udp_port)
1920 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1922 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1925 # server2 to server1
1927 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1928 IP(src=server2.ip4, dst=server1_nat_ip) /
1929 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1931 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1932 IP(src=server2.ip4, dst=server1_nat_ip) /
1933 UDP(sport=self.udp_port_in, dport=server_udp_port))
1935 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1936 IP(src=server2.ip4, dst=server1_nat_ip) /
1937 ICMP(id=self.icmp_id_in, type='echo-request'))
1939 self.pg0.add_stream(pkts)
1940 self.pg_enable_capture(self.pg_interfaces)
1942 capture = self.pg0.get_capture(len(pkts))
1943 for packet in capture:
1945 self.assertEqual(packet[IP].src, server2_nat_ip)
1946 self.assertEqual(packet[IP].dst, server1.ip4)
1947 if packet.haslayer(TCP):
1948 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1949 self.assertEqual(packet[TCP].dport, server_tcp_port)
1950 self.tcp_port_out = packet[TCP].sport
1951 self.assert_packet_checksums_valid(packet)
1952 elif packet.haslayer(UDP):
1953 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1954 self.assertEqual(packet[UDP].dport, server_udp_port)
1955 self.udp_port_out = packet[UDP].sport
1957 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1958 self.icmp_id_out = packet[ICMP].id
1960 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1963 # server1 to server2
1965 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1966 IP(src=server1.ip4, dst=server2_nat_ip) /
1967 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1969 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1970 IP(src=server1.ip4, dst=server2_nat_ip) /
1971 UDP(sport=server_udp_port, dport=self.udp_port_out))
1973 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1974 IP(src=server1.ip4, dst=server2_nat_ip) /
1975 ICMP(id=self.icmp_id_out, type='echo-reply'))
1977 self.pg0.add_stream(pkts)
1978 self.pg_enable_capture(self.pg_interfaces)
1980 capture = self.pg0.get_capture(len(pkts))
1981 for packet in capture:
1983 self.assertEqual(packet[IP].src, server1_nat_ip)
1984 self.assertEqual(packet[IP].dst, server2.ip4)
1985 if packet.haslayer(TCP):
1986 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1987 self.assertEqual(packet[TCP].sport, server_tcp_port)
1988 self.assert_packet_checksums_valid(packet)
1989 elif packet.haslayer(UDP):
1990 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1991 self.assertEqual(packet[UDP].sport, server_udp_port)
1993 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1995 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1998 def test_max_translations_per_user(self):
1999 """ MAX translations per user - recycle the least recently used """
2001 self.nat44_add_address(self.nat_addr)
2002 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2003 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2006 # get maximum number of translations per user
2007 nat44_config = self.vapi.nat_show_config()
2009 # send more than maximum number of translations per user packets
2010 pkts_num = nat44_config.max_translations_per_user + 5
2012 for port in range(0, pkts_num):
2013 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2014 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2015 TCP(sport=1025 + port))
2017 self.pg0.add_stream(pkts)
2018 self.pg_enable_capture(self.pg_interfaces)
2021 # verify number of translated packet
2022 self.pg1.get_capture(pkts_num)
2024 users = self.vapi.nat44_user_dump()
2026 if user.ip_address == self.pg0.remote_ip4n:
2027 self.assertEqual(user.nsessions,
2028 nat44_config.max_translations_per_user)
2029 self.assertEqual(user.nstaticsessions, 0)
2032 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
2034 proto=IP_PROTOS.tcp)
2035 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2036 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2037 TCP(sport=tcp_port))
2038 self.pg0.add_stream(p)
2039 self.pg_enable_capture(self.pg_interfaces)
2041 self.pg1.get_capture(1)
2042 users = self.vapi.nat44_user_dump()
2044 if user.ip_address == self.pg0.remote_ip4n:
2045 self.assertEqual(user.nsessions,
2046 nat44_config.max_translations_per_user - 1)
2047 self.assertEqual(user.nstaticsessions, 1)
2049 def test_interface_addr(self):
2050 """ Acquire NAT44 addresses from interface """
2051 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2053 # no address in NAT pool
2054 adresses = self.vapi.nat44_address_dump()
2055 self.assertEqual(0, len(adresses))
2057 # configure interface address and check NAT address pool
2058 self.pg7.config_ip4()
2059 adresses = self.vapi.nat44_address_dump()
2060 self.assertEqual(1, len(adresses))
2061 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2063 # remove interface address and check NAT address pool
2064 self.pg7.unconfig_ip4()
2065 adresses = self.vapi.nat44_address_dump()
2066 self.assertEqual(0, len(adresses))
2068 def test_interface_addr_static_mapping(self):
2069 """ Static mapping with addresses from interface """
2072 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2073 self.nat44_add_static_mapping(
2075 external_sw_if_index=self.pg7.sw_if_index,
2078 # static mappings with external interface
2079 static_mappings = self.vapi.nat44_static_mapping_dump()
2080 self.assertEqual(1, len(static_mappings))
2081 self.assertEqual(self.pg7.sw_if_index,
2082 static_mappings[0].external_sw_if_index)
2083 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2085 # configure interface address and check static mappings
2086 self.pg7.config_ip4()
2087 static_mappings = self.vapi.nat44_static_mapping_dump()
2088 self.assertEqual(2, len(static_mappings))
2090 for sm in static_mappings:
2091 if sm.external_sw_if_index == 0xFFFFFFFF:
2092 self.assertEqual(sm.external_ip_address[0:4],
2093 self.pg7.local_ip4n)
2094 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2096 self.assertTrue(resolved)
2098 # remove interface address and check static mappings
2099 self.pg7.unconfig_ip4()
2100 static_mappings = self.vapi.nat44_static_mapping_dump()
2101 self.assertEqual(1, len(static_mappings))
2102 self.assertEqual(self.pg7.sw_if_index,
2103 static_mappings[0].external_sw_if_index)
2104 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2106 # configure interface address again and check static mappings
2107 self.pg7.config_ip4()
2108 static_mappings = self.vapi.nat44_static_mapping_dump()
2109 self.assertEqual(2, len(static_mappings))
2111 for sm in static_mappings:
2112 if sm.external_sw_if_index == 0xFFFFFFFF:
2113 self.assertEqual(sm.external_ip_address[0:4],
2114 self.pg7.local_ip4n)
2115 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2117 self.assertTrue(resolved)
2119 # remove static mapping
2120 self.nat44_add_static_mapping(
2122 external_sw_if_index=self.pg7.sw_if_index,
2125 static_mappings = self.vapi.nat44_static_mapping_dump()
2126 self.assertEqual(0, len(static_mappings))
2128 def test_interface_addr_identity_nat(self):
2129 """ Identity NAT with addresses from interface """
2132 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2133 self.vapi.nat44_add_del_identity_mapping(
2134 sw_if_index=self.pg7.sw_if_index,
2136 protocol=IP_PROTOS.tcp,
2139 # identity mappings with external interface
2140 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2141 self.assertEqual(1, len(identity_mappings))
2142 self.assertEqual(self.pg7.sw_if_index,
2143 identity_mappings[0].sw_if_index)
2145 # configure interface address and check identity mappings
2146 self.pg7.config_ip4()
2147 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2149 self.assertEqual(2, len(identity_mappings))
2150 for sm in identity_mappings:
2151 if sm.sw_if_index == 0xFFFFFFFF:
2152 self.assertEqual(identity_mappings[0].ip_address,
2153 self.pg7.local_ip4n)
2154 self.assertEqual(port, identity_mappings[0].port)
2155 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2157 self.assertTrue(resolved)
2159 # remove interface address and check identity mappings
2160 self.pg7.unconfig_ip4()
2161 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2162 self.assertEqual(1, len(identity_mappings))
2163 self.assertEqual(self.pg7.sw_if_index,
2164 identity_mappings[0].sw_if_index)
2166 def test_ipfix_nat44_sess(self):
2167 """ IPFIX logging NAT44 session created/delted """
2168 self.ipfix_domain_id = 10
2169 self.ipfix_src_port = 20202
2170 colector_port = 30303
2171 bind_layers(UDP, IPFIX, dport=30303)
2172 self.nat44_add_address(self.nat_addr)
2173 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2174 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2176 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2177 src_address=self.pg3.local_ip4n,
2179 template_interval=10,
2180 collector_port=colector_port)
2181 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2182 src_port=self.ipfix_src_port)
2184 pkts = self.create_stream_in(self.pg0, self.pg1)
2185 self.pg0.add_stream(pkts)
2186 self.pg_enable_capture(self.pg_interfaces)
2188 capture = self.pg1.get_capture(len(pkts))
2189 self.verify_capture_out(capture)
2190 self.nat44_add_address(self.nat_addr, is_add=0)
2191 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2192 capture = self.pg3.get_capture(9)
2193 ipfix = IPFIXDecoder()
2194 # first load template
2196 self.assertTrue(p.haslayer(IPFIX))
2197 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2198 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2199 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2200 self.assertEqual(p[UDP].dport, colector_port)
2201 self.assertEqual(p[IPFIX].observationDomainID,
2202 self.ipfix_domain_id)
2203 if p.haslayer(Template):
2204 ipfix.add_template(p.getlayer(Template))
2205 # verify events in data set
2207 if p.haslayer(Data):
2208 data = ipfix.decode_data_set(p.getlayer(Set))
2209 self.verify_ipfix_nat44_ses(data)
2211 def test_ipfix_addr_exhausted(self):
2212 """ IPFIX logging NAT addresses exhausted """
2213 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2214 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2216 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2217 src_address=self.pg3.local_ip4n,
2219 template_interval=10)
2220 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2221 src_port=self.ipfix_src_port)
2223 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2224 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2226 self.pg0.add_stream(p)
2227 self.pg_enable_capture(self.pg_interfaces)
2229 self.pg1.assert_nothing_captured()
2231 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2232 capture = self.pg3.get_capture(9)
2233 ipfix = IPFIXDecoder()
2234 # first load template
2236 self.assertTrue(p.haslayer(IPFIX))
2237 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2238 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2239 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2240 self.assertEqual(p[UDP].dport, 4739)
2241 self.assertEqual(p[IPFIX].observationDomainID,
2242 self.ipfix_domain_id)
2243 if p.haslayer(Template):
2244 ipfix.add_template(p.getlayer(Template))
2245 # verify events in data set
2247 if p.haslayer(Data):
2248 data = ipfix.decode_data_set(p.getlayer(Set))
2249 self.verify_ipfix_addr_exhausted(data)
2251 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2252 def test_ipfix_max_sessions(self):
2253 """ IPFIX logging maximum session entries exceeded """
2254 self.nat44_add_address(self.nat_addr)
2255 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2256 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2259 nat44_config = self.vapi.nat_show_config()
2260 max_sessions = 10 * nat44_config.translation_buckets
2263 for i in range(0, max_sessions):
2264 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2265 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2266 IP(src=src, dst=self.pg1.remote_ip4) /
2269 self.pg0.add_stream(pkts)
2270 self.pg_enable_capture(self.pg_interfaces)
2273 self.pg1.get_capture(max_sessions)
2274 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2275 src_address=self.pg3.local_ip4n,
2277 template_interval=10)
2278 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2279 src_port=self.ipfix_src_port)
2281 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2282 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2284 self.pg0.add_stream(p)
2285 self.pg_enable_capture(self.pg_interfaces)
2287 self.pg1.assert_nothing_captured()
2289 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2290 capture = self.pg3.get_capture(9)
2291 ipfix = IPFIXDecoder()
2292 # first load template
2294 self.assertTrue(p.haslayer(IPFIX))
2295 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2296 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2297 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2298 self.assertEqual(p[UDP].dport, 4739)
2299 self.assertEqual(p[IPFIX].observationDomainID,
2300 self.ipfix_domain_id)
2301 if p.haslayer(Template):
2302 ipfix.add_template(p.getlayer(Template))
2303 # verify events in data set
2305 if p.haslayer(Data):
2306 data = ipfix.decode_data_set(p.getlayer(Set))
2307 self.verify_ipfix_max_sessions(data, max_sessions)
2309 def test_pool_addr_fib(self):
2310 """ NAT44 add pool addresses to FIB """
2311 static_addr = '10.0.0.10'
2312 self.nat44_add_address(self.nat_addr)
2313 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2314 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2316 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2319 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2320 ARP(op=ARP.who_has, pdst=self.nat_addr,
2321 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2322 self.pg1.add_stream(p)
2323 self.pg_enable_capture(self.pg_interfaces)
2325 capture = self.pg1.get_capture(1)
2326 self.assertTrue(capture[0].haslayer(ARP))
2327 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2330 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2331 ARP(op=ARP.who_has, pdst=static_addr,
2332 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2333 self.pg1.add_stream(p)
2334 self.pg_enable_capture(self.pg_interfaces)
2336 capture = self.pg1.get_capture(1)
2337 self.assertTrue(capture[0].haslayer(ARP))
2338 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2340 # send ARP to non-NAT44 interface
2341 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2342 ARP(op=ARP.who_has, pdst=self.nat_addr,
2343 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2344 self.pg2.add_stream(p)
2345 self.pg_enable_capture(self.pg_interfaces)
2347 self.pg1.assert_nothing_captured()
2349 # remove addresses and verify
2350 self.nat44_add_address(self.nat_addr, is_add=0)
2351 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2354 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2355 ARP(op=ARP.who_has, pdst=self.nat_addr,
2356 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2357 self.pg1.add_stream(p)
2358 self.pg_enable_capture(self.pg_interfaces)
2360 self.pg1.assert_nothing_captured()
2362 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2363 ARP(op=ARP.who_has, pdst=static_addr,
2364 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2365 self.pg1.add_stream(p)
2366 self.pg_enable_capture(self.pg_interfaces)
2368 self.pg1.assert_nothing_captured()
2370 def test_vrf_mode(self):
2371 """ NAT44 tenant VRF aware address pool mode """
2375 nat_ip1 = "10.0.0.10"
2376 nat_ip2 = "10.0.0.11"
2378 self.pg0.unconfig_ip4()
2379 self.pg1.unconfig_ip4()
2380 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2381 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2382 self.pg0.set_table_ip4(vrf_id1)
2383 self.pg1.set_table_ip4(vrf_id2)
2384 self.pg0.config_ip4()
2385 self.pg1.config_ip4()
2386 self.pg0.resolve_arp()
2387 self.pg1.resolve_arp()
2389 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2390 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2391 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2392 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2393 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2398 pkts = self.create_stream_in(self.pg0, self.pg2)
2399 self.pg0.add_stream(pkts)
2400 self.pg_enable_capture(self.pg_interfaces)
2402 capture = self.pg2.get_capture(len(pkts))
2403 self.verify_capture_out(capture, nat_ip1)
2406 pkts = self.create_stream_in(self.pg1, self.pg2)
2407 self.pg1.add_stream(pkts)
2408 self.pg_enable_capture(self.pg_interfaces)
2410 capture = self.pg2.get_capture(len(pkts))
2411 self.verify_capture_out(capture, nat_ip2)
2414 self.pg0.unconfig_ip4()
2415 self.pg1.unconfig_ip4()
2416 self.pg0.set_table_ip4(0)
2417 self.pg1.set_table_ip4(0)
2418 self.pg0.config_ip4()
2419 self.pg1.config_ip4()
2420 self.pg0.resolve_arp()
2421 self.pg1.resolve_arp()
2422 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2423 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2425 def test_vrf_feature_independent(self):
2426 """ NAT44 tenant VRF independent address pool mode """
2428 nat_ip1 = "10.0.0.10"
2429 nat_ip2 = "10.0.0.11"
2431 self.nat44_add_address(nat_ip1)
2432 self.nat44_add_address(nat_ip2, vrf_id=99)
2433 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2434 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2435 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2439 pkts = self.create_stream_in(self.pg0, self.pg2)
2440 self.pg0.add_stream(pkts)
2441 self.pg_enable_capture(self.pg_interfaces)
2443 capture = self.pg2.get_capture(len(pkts))
2444 self.verify_capture_out(capture, nat_ip1)
2447 pkts = self.create_stream_in(self.pg1, self.pg2)
2448 self.pg1.add_stream(pkts)
2449 self.pg_enable_capture(self.pg_interfaces)
2451 capture = self.pg2.get_capture(len(pkts))
2452 self.verify_capture_out(capture, nat_ip1)
2454 def test_dynamic_ipless_interfaces(self):
2455 """ NAT44 interfaces without configured IP address """
2457 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2458 mactobinary(self.pg7.remote_mac),
2459 self.pg7.remote_ip4n,
2461 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2462 mactobinary(self.pg8.remote_mac),
2463 self.pg8.remote_ip4n,
2466 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2467 dst_address_length=32,
2468 next_hop_address=self.pg7.remote_ip4n,
2469 next_hop_sw_if_index=self.pg7.sw_if_index)
2470 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2471 dst_address_length=32,
2472 next_hop_address=self.pg8.remote_ip4n,
2473 next_hop_sw_if_index=self.pg8.sw_if_index)
2475 self.nat44_add_address(self.nat_addr)
2476 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2477 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2481 pkts = self.create_stream_in(self.pg7, self.pg8)
2482 self.pg7.add_stream(pkts)
2483 self.pg_enable_capture(self.pg_interfaces)
2485 capture = self.pg8.get_capture(len(pkts))
2486 self.verify_capture_out(capture)
2489 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2490 self.pg8.add_stream(pkts)
2491 self.pg_enable_capture(self.pg_interfaces)
2493 capture = self.pg7.get_capture(len(pkts))
2494 self.verify_capture_in(capture, self.pg7)
2496 def test_static_ipless_interfaces(self):
2497 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2499 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2500 mactobinary(self.pg7.remote_mac),
2501 self.pg7.remote_ip4n,
2503 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2504 mactobinary(self.pg8.remote_mac),
2505 self.pg8.remote_ip4n,
2508 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2509 dst_address_length=32,
2510 next_hop_address=self.pg7.remote_ip4n,
2511 next_hop_sw_if_index=self.pg7.sw_if_index)
2512 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2513 dst_address_length=32,
2514 next_hop_address=self.pg8.remote_ip4n,
2515 next_hop_sw_if_index=self.pg8.sw_if_index)
2517 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2518 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2519 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2523 pkts = self.create_stream_out(self.pg8)
2524 self.pg8.add_stream(pkts)
2525 self.pg_enable_capture(self.pg_interfaces)
2527 capture = self.pg7.get_capture(len(pkts))
2528 self.verify_capture_in(capture, self.pg7)
2531 pkts = self.create_stream_in(self.pg7, self.pg8)
2532 self.pg7.add_stream(pkts)
2533 self.pg_enable_capture(self.pg_interfaces)
2535 capture = self.pg8.get_capture(len(pkts))
2536 self.verify_capture_out(capture, self.nat_addr, True)
2538 def test_static_with_port_ipless_interfaces(self):
2539 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2541 self.tcp_port_out = 30606
2542 self.udp_port_out = 30607
2543 self.icmp_id_out = 30608
2545 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2546 mactobinary(self.pg7.remote_mac),
2547 self.pg7.remote_ip4n,
2549 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2550 mactobinary(self.pg8.remote_mac),
2551 self.pg8.remote_ip4n,
2554 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2555 dst_address_length=32,
2556 next_hop_address=self.pg7.remote_ip4n,
2557 next_hop_sw_if_index=self.pg7.sw_if_index)
2558 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2559 dst_address_length=32,
2560 next_hop_address=self.pg8.remote_ip4n,
2561 next_hop_sw_if_index=self.pg8.sw_if_index)
2563 self.nat44_add_address(self.nat_addr)
2564 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2565 self.tcp_port_in, self.tcp_port_out,
2566 proto=IP_PROTOS.tcp)
2567 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2568 self.udp_port_in, self.udp_port_out,
2569 proto=IP_PROTOS.udp)
2570 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2571 self.icmp_id_in, self.icmp_id_out,
2572 proto=IP_PROTOS.icmp)
2573 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2574 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2578 pkts = self.create_stream_out(self.pg8)
2579 self.pg8.add_stream(pkts)
2580 self.pg_enable_capture(self.pg_interfaces)
2582 capture = self.pg7.get_capture(len(pkts))
2583 self.verify_capture_in(capture, self.pg7)
2586 pkts = self.create_stream_in(self.pg7, self.pg8)
2587 self.pg7.add_stream(pkts)
2588 self.pg_enable_capture(self.pg_interfaces)
2590 capture = self.pg8.get_capture(len(pkts))
2591 self.verify_capture_out(capture)
2593 def test_static_unknown_proto(self):
2594 """ 1:1 NAT translate packet with unknown protocol """
2595 nat_ip = "10.0.0.10"
2596 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2597 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2598 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2602 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2603 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2605 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2606 TCP(sport=1234, dport=1234))
2607 self.pg0.add_stream(p)
2608 self.pg_enable_capture(self.pg_interfaces)
2610 p = self.pg1.get_capture(1)
2613 self.assertEqual(packet[IP].src, nat_ip)
2614 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2615 self.assertTrue(packet.haslayer(GRE))
2616 self.assert_packet_checksums_valid(packet)
2618 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2622 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2623 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2625 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2626 TCP(sport=1234, dport=1234))
2627 self.pg1.add_stream(p)
2628 self.pg_enable_capture(self.pg_interfaces)
2630 p = self.pg0.get_capture(1)
2633 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2634 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2635 self.assertTrue(packet.haslayer(GRE))
2636 self.assert_packet_checksums_valid(packet)
2638 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2641 def test_hairpinning_static_unknown_proto(self):
2642 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2644 host = self.pg0.remote_hosts[0]
2645 server = self.pg0.remote_hosts[1]
2647 host_nat_ip = "10.0.0.10"
2648 server_nat_ip = "10.0.0.11"
2650 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2651 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2652 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2653 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2657 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2658 IP(src=host.ip4, dst=server_nat_ip) /
2660 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2661 TCP(sport=1234, dport=1234))
2662 self.pg0.add_stream(p)
2663 self.pg_enable_capture(self.pg_interfaces)
2665 p = self.pg0.get_capture(1)
2668 self.assertEqual(packet[IP].src, host_nat_ip)
2669 self.assertEqual(packet[IP].dst, server.ip4)
2670 self.assertTrue(packet.haslayer(GRE))
2671 self.assert_packet_checksums_valid(packet)
2673 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2677 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2678 IP(src=server.ip4, dst=host_nat_ip) /
2680 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2681 TCP(sport=1234, dport=1234))
2682 self.pg0.add_stream(p)
2683 self.pg_enable_capture(self.pg_interfaces)
2685 p = self.pg0.get_capture(1)
2688 self.assertEqual(packet[IP].src, server_nat_ip)
2689 self.assertEqual(packet[IP].dst, host.ip4)
2690 self.assertTrue(packet.haslayer(GRE))
2691 self.assert_packet_checksums_valid(packet)
2693 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2696 def test_output_feature(self):
2697 """ NAT44 interface output feature (in2out postrouting) """
2698 self.nat44_add_address(self.nat_addr)
2699 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2700 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2701 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2705 pkts = self.create_stream_in(self.pg0, self.pg3)
2706 self.pg0.add_stream(pkts)
2707 self.pg_enable_capture(self.pg_interfaces)
2709 capture = self.pg3.get_capture(len(pkts))
2710 self.verify_capture_out(capture)
2713 pkts = self.create_stream_out(self.pg3)
2714 self.pg3.add_stream(pkts)
2715 self.pg_enable_capture(self.pg_interfaces)
2717 capture = self.pg0.get_capture(len(pkts))
2718 self.verify_capture_in(capture, self.pg0)
2720 # from non-NAT interface to NAT inside interface
2721 pkts = self.create_stream_in(self.pg2, self.pg0)
2722 self.pg2.add_stream(pkts)
2723 self.pg_enable_capture(self.pg_interfaces)
2725 capture = self.pg0.get_capture(len(pkts))
2726 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2728 def test_output_feature_vrf_aware(self):
2729 """ NAT44 interface output feature VRF aware (in2out postrouting) """
2730 nat_ip_vrf10 = "10.0.0.10"
2731 nat_ip_vrf20 = "10.0.0.20"
2733 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2734 dst_address_length=32,
2735 next_hop_address=self.pg3.remote_ip4n,
2736 next_hop_sw_if_index=self.pg3.sw_if_index,
2738 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2739 dst_address_length=32,
2740 next_hop_address=self.pg3.remote_ip4n,
2741 next_hop_sw_if_index=self.pg3.sw_if_index,
2744 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2745 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2746 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2747 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2748 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2752 pkts = self.create_stream_in(self.pg4, self.pg3)
2753 self.pg4.add_stream(pkts)
2754 self.pg_enable_capture(self.pg_interfaces)
2756 capture = self.pg3.get_capture(len(pkts))
2757 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2760 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2761 self.pg3.add_stream(pkts)
2762 self.pg_enable_capture(self.pg_interfaces)
2764 capture = self.pg4.get_capture(len(pkts))
2765 self.verify_capture_in(capture, self.pg4)
2768 pkts = self.create_stream_in(self.pg6, self.pg3)
2769 self.pg6.add_stream(pkts)
2770 self.pg_enable_capture(self.pg_interfaces)
2772 capture = self.pg3.get_capture(len(pkts))
2773 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2776 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2777 self.pg3.add_stream(pkts)
2778 self.pg_enable_capture(self.pg_interfaces)
2780 capture = self.pg6.get_capture(len(pkts))
2781 self.verify_capture_in(capture, self.pg6)
2783 def test_output_feature_hairpinning(self):
2784 """ NAT44 interface output feature hairpinning (in2out postrouting) """
2785 host = self.pg0.remote_hosts[0]
2786 server = self.pg0.remote_hosts[1]
2789 server_in_port = 5678
2790 server_out_port = 8765
2792 self.nat44_add_address(self.nat_addr)
2793 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2794 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2797 # add static mapping for server
2798 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2799 server_in_port, server_out_port,
2800 proto=IP_PROTOS.tcp)
2802 # send packet from host to server
2803 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2804 IP(src=host.ip4, dst=self.nat_addr) /
2805 TCP(sport=host_in_port, dport=server_out_port))
2806 self.pg0.add_stream(p)
2807 self.pg_enable_capture(self.pg_interfaces)
2809 capture = self.pg0.get_capture(1)
2814 self.assertEqual(ip.src, self.nat_addr)
2815 self.assertEqual(ip.dst, server.ip4)
2816 self.assertNotEqual(tcp.sport, host_in_port)
2817 self.assertEqual(tcp.dport, server_in_port)
2818 self.assert_packet_checksums_valid(p)
2819 host_out_port = tcp.sport
2821 self.logger.error(ppp("Unexpected or invalid packet:", p))
2824 # send reply from server to host
2825 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2826 IP(src=server.ip4, dst=self.nat_addr) /
2827 TCP(sport=server_in_port, dport=host_out_port))
2828 self.pg0.add_stream(p)
2829 self.pg_enable_capture(self.pg_interfaces)
2831 capture = self.pg0.get_capture(1)
2836 self.assertEqual(ip.src, self.nat_addr)
2837 self.assertEqual(ip.dst, host.ip4)
2838 self.assertEqual(tcp.sport, server_out_port)
2839 self.assertEqual(tcp.dport, host_in_port)
2840 self.assert_packet_checksums_valid(p)
2842 self.logger.error(ppp("Unexpected or invalid packet:", p))
2845 def test_one_armed_nat44(self):
2846 """ One armed NAT44 """
2847 remote_host = self.pg9.remote_hosts[0]
2848 local_host = self.pg9.remote_hosts[1]
2851 self.nat44_add_address(self.nat_addr)
2852 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2853 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2857 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2858 IP(src=local_host.ip4, dst=remote_host.ip4) /
2859 TCP(sport=12345, dport=80))
2860 self.pg9.add_stream(p)
2861 self.pg_enable_capture(self.pg_interfaces)
2863 capture = self.pg9.get_capture(1)
2868 self.assertEqual(ip.src, self.nat_addr)
2869 self.assertEqual(ip.dst, remote_host.ip4)
2870 self.assertNotEqual(tcp.sport, 12345)
2871 external_port = tcp.sport
2872 self.assertEqual(tcp.dport, 80)
2873 self.assert_packet_checksums_valid(p)
2875 self.logger.error(ppp("Unexpected or invalid packet:", p))
2879 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2880 IP(src=remote_host.ip4, dst=self.nat_addr) /
2881 TCP(sport=80, dport=external_port))
2882 self.pg9.add_stream(p)
2883 self.pg_enable_capture(self.pg_interfaces)
2885 capture = self.pg9.get_capture(1)
2890 self.assertEqual(ip.src, remote_host.ip4)
2891 self.assertEqual(ip.dst, local_host.ip4)
2892 self.assertEqual(tcp.sport, 80)
2893 self.assertEqual(tcp.dport, 12345)
2894 self.assert_packet_checksums_valid(p)
2896 self.logger.error(ppp("Unexpected or invalid packet:", p))
2899 def test_del_session(self):
2900 """ Delete NAT44 session """
2901 self.nat44_add_address(self.nat_addr)
2902 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2903 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2906 pkts = self.create_stream_in(self.pg0, self.pg1)
2907 self.pg0.add_stream(pkts)
2908 self.pg_enable_capture(self.pg_interfaces)
2910 self.pg1.get_capture(len(pkts))
2912 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2913 nsessions = len(sessions)
2915 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2916 sessions[0].inside_port,
2917 sessions[0].protocol)
2918 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2919 sessions[1].outside_port,
2920 sessions[1].protocol,
2923 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2924 self.assertEqual(nsessions - len(sessions), 2)
2926 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2927 sessions[0].inside_port,
2928 sessions[0].protocol)
2930 self.verify_no_nat44_user()
2932 def test_set_get_reass(self):
2933 """ NAT44 set/get virtual fragmentation reassembly """
2934 reas_cfg1 = self.vapi.nat_get_reass()
2936 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
2937 max_reass=reas_cfg1.ip4_max_reass * 2,
2938 max_frag=reas_cfg1.ip4_max_frag * 2)
2940 reas_cfg2 = self.vapi.nat_get_reass()
2942 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
2943 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
2944 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
2946 self.vapi.nat_set_reass(drop_frag=1)
2947 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
2949 def test_frag_in_order(self):
2950 """ NAT44 translate fragments arriving in order """
2951 self.nat44_add_address(self.nat_addr)
2952 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2953 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2956 data = "A" * 4 + "B" * 16 + "C" * 3
2957 self.tcp_port_in = random.randint(1025, 65535)
2959 reass = self.vapi.nat_reass_dump()
2960 reass_n_start = len(reass)
2963 pkts = self.create_stream_frag(self.pg0,
2964 self.pg1.remote_ip4,
2968 self.pg0.add_stream(pkts)
2969 self.pg_enable_capture(self.pg_interfaces)
2971 frags = self.pg1.get_capture(len(pkts))
2972 p = self.reass_frags_and_verify(frags,
2974 self.pg1.remote_ip4)
2975 self.assertEqual(p[TCP].dport, 20)
2976 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2977 self.tcp_port_out = p[TCP].sport
2978 self.assertEqual(data, p[Raw].load)
2981 pkts = self.create_stream_frag(self.pg1,
2986 self.pg1.add_stream(pkts)
2987 self.pg_enable_capture(self.pg_interfaces)
2989 frags = self.pg0.get_capture(len(pkts))
2990 p = self.reass_frags_and_verify(frags,
2991 self.pg1.remote_ip4,
2992 self.pg0.remote_ip4)
2993 self.assertEqual(p[TCP].sport, 20)
2994 self.assertEqual(p[TCP].dport, self.tcp_port_in)
2995 self.assertEqual(data, p[Raw].load)
2997 reass = self.vapi.nat_reass_dump()
2998 reass_n_end = len(reass)
3000 self.assertEqual(reass_n_end - reass_n_start, 2)
3002 def test_reass_hairpinning(self):
3003 """ NAT44 fragments hairpinning """
3004 server = self.pg0.remote_hosts[1]
3005 host_in_port = random.randint(1025, 65535)
3006 server_in_port = random.randint(1025, 65535)
3007 server_out_port = random.randint(1025, 65535)
3008 data = "A" * 4 + "B" * 16 + "C" * 3
3010 self.nat44_add_address(self.nat_addr)
3011 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3012 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3014 # add static mapping for server
3015 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3016 server_in_port, server_out_port,
3017 proto=IP_PROTOS.tcp)
3019 # send packet from host to server
3020 pkts = self.create_stream_frag(self.pg0,
3025 self.pg0.add_stream(pkts)
3026 self.pg_enable_capture(self.pg_interfaces)
3028 frags = self.pg0.get_capture(len(pkts))
3029 p = self.reass_frags_and_verify(frags,
3032 self.assertNotEqual(p[TCP].sport, host_in_port)
3033 self.assertEqual(p[TCP].dport, server_in_port)
3034 self.assertEqual(data, p[Raw].load)
3036 def test_frag_out_of_order(self):
3037 """ NAT44 translate fragments arriving out of order """
3038 self.nat44_add_address(self.nat_addr)
3039 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3040 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3043 data = "A" * 4 + "B" * 16 + "C" * 3
3044 random.randint(1025, 65535)
3047 pkts = self.create_stream_frag(self.pg0,
3048 self.pg1.remote_ip4,
3053 self.pg0.add_stream(pkts)
3054 self.pg_enable_capture(self.pg_interfaces)
3056 frags = self.pg1.get_capture(len(pkts))
3057 p = self.reass_frags_and_verify(frags,
3059 self.pg1.remote_ip4)
3060 self.assertEqual(p[TCP].dport, 20)
3061 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3062 self.tcp_port_out = p[TCP].sport
3063 self.assertEqual(data, p[Raw].load)
3066 pkts = self.create_stream_frag(self.pg1,
3072 self.pg1.add_stream(pkts)
3073 self.pg_enable_capture(self.pg_interfaces)
3075 frags = self.pg0.get_capture(len(pkts))
3076 p = self.reass_frags_and_verify(frags,
3077 self.pg1.remote_ip4,
3078 self.pg0.remote_ip4)
3079 self.assertEqual(p[TCP].sport, 20)
3080 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3081 self.assertEqual(data, p[Raw].load)
3083 def test_port_restricted(self):
3084 """ Port restricted NAT44 (MAP-E CE) """
3085 self.nat44_add_address(self.nat_addr)
3086 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3087 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3089 self.vapi.nat_set_addr_and_port_alloc_alg(alg=1,
3094 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3095 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3096 TCP(sport=4567, dport=22))
3097 self.pg0.add_stream(p)
3098 self.pg_enable_capture(self.pg_interfaces)
3100 capture = self.pg1.get_capture(1)
3105 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3106 self.assertEqual(ip.src, self.nat_addr)
3107 self.assertEqual(tcp.dport, 22)
3108 self.assertNotEqual(tcp.sport, 4567)
3109 self.assertEqual((tcp.sport >> 6) & 63, 10)
3110 self.assert_packet_checksums_valid(p)
3112 self.logger.error(ppp("Unexpected or invalid packet:", p))
3115 def test_port_range(self):
3116 """ External address port range """
3117 self.nat44_add_address(self.nat_addr)
3118 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3119 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3121 self.vapi.nat_set_addr_and_port_alloc_alg(alg=2,
3126 for port in range(0, 5):
3127 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3128 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3129 TCP(sport=1125 + port))
3131 self.pg0.add_stream(pkts)
3132 self.pg_enable_capture(self.pg_interfaces)
3134 capture = self.pg1.get_capture(3)
3137 self.assertGreaterEqual(tcp.sport, 1025)
3138 self.assertLessEqual(tcp.sport, 1027)
3140 def test_ipfix_max_frags(self):
3141 """ IPFIX logging maximum fragments pending reassembly exceeded """
3142 self.nat44_add_address(self.nat_addr)
3143 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3144 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3146 self.vapi.nat_set_reass(max_frag=1)
3147 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3148 src_address=self.pg3.local_ip4n,
3150 template_interval=10)
3151 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3152 src_port=self.ipfix_src_port)
3154 data = "A" * 4 + "B" * 16 + "C" * 3
3155 self.tcp_port_in = random.randint(1025, 65535)
3156 pkts = self.create_stream_frag(self.pg0,
3157 self.pg1.remote_ip4,
3162 self.pg0.add_stream(pkts)
3163 self.pg_enable_capture(self.pg_interfaces)
3165 self.pg1.assert_nothing_captured()
3167 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3168 capture = self.pg3.get_capture(9)
3169 ipfix = IPFIXDecoder()
3170 # first load template
3172 self.assertTrue(p.haslayer(IPFIX))
3173 self.assertEqual(p[IP].src, self.pg3.local_ip4)
3174 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3175 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3176 self.assertEqual(p[UDP].dport, 4739)
3177 self.assertEqual(p[IPFIX].observationDomainID,
3178 self.ipfix_domain_id)
3179 if p.haslayer(Template):
3180 ipfix.add_template(p.getlayer(Template))
3181 # verify events in data set
3183 if p.haslayer(Data):
3184 data = ipfix.decode_data_set(p.getlayer(Set))
3185 self.verify_ipfix_max_fragments_ip4(data, 1,
3186 self.pg0.remote_ip4n)
3188 def test_multiple_outside_vrf(self):
3189 """ Multiple outside VRF """
3193 self.pg1.unconfig_ip4()
3194 self.pg2.unconfig_ip4()
3195 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
3196 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
3197 self.pg1.set_table_ip4(vrf_id1)
3198 self.pg2.set_table_ip4(vrf_id2)
3199 self.pg1.config_ip4()
3200 self.pg2.config_ip4()
3201 self.pg1.resolve_arp()
3202 self.pg2.resolve_arp()
3204 self.nat44_add_address(self.nat_addr)
3205 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3206 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3208 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
3213 pkts = self.create_stream_in(self.pg0, self.pg1)
3214 self.pg0.add_stream(pkts)
3215 self.pg_enable_capture(self.pg_interfaces)
3217 capture = self.pg1.get_capture(len(pkts))
3218 self.verify_capture_out(capture, self.nat_addr)
3220 pkts = self.create_stream_out(self.pg1, self.nat_addr)
3221 self.pg1.add_stream(pkts)
3222 self.pg_enable_capture(self.pg_interfaces)
3224 capture = self.pg0.get_capture(len(pkts))
3225 self.verify_capture_in(capture, self.pg0)
3227 self.tcp_port_in = 60303
3228 self.udp_port_in = 60304
3229 self.icmp_id_in = 60305
3232 pkts = self.create_stream_in(self.pg0, self.pg2)
3233 self.pg0.add_stream(pkts)
3234 self.pg_enable_capture(self.pg_interfaces)
3236 capture = self.pg2.get_capture(len(pkts))
3237 self.verify_capture_out(capture, self.nat_addr)
3239 pkts = self.create_stream_out(self.pg2, self.nat_addr)
3240 self.pg2.add_stream(pkts)
3241 self.pg_enable_capture(self.pg_interfaces)
3243 capture = self.pg0.get_capture(len(pkts))
3244 self.verify_capture_in(capture, self.pg0)
3247 self.pg1.unconfig_ip4()
3248 self.pg2.unconfig_ip4()
3249 self.pg1.set_table_ip4(0)
3250 self.pg2.set_table_ip4(0)
3251 self.pg1.config_ip4()
3252 self.pg2.config_ip4()
3253 self.pg1.resolve_arp()
3254 self.pg2.resolve_arp()
3256 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3257 def test_session_timeout(self):
3258 """ NAT44 session timeouts """
3259 self.nat44_add_address(self.nat_addr)
3260 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3261 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3263 self.vapi.nat_set_timeouts(udp=5)
3267 for i in range(0, max_sessions):
3268 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
3269 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3270 IP(src=src, dst=self.pg1.remote_ip4) /
3271 UDP(sport=1025, dport=53))
3273 self.pg0.add_stream(pkts)
3274 self.pg_enable_capture(self.pg_interfaces)
3276 self.pg1.get_capture(max_sessions)
3281 for i in range(0, max_sessions):
3282 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
3283 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3284 IP(src=src, dst=self.pg1.remote_ip4) /
3285 UDP(sport=1026, dport=53))
3287 self.pg0.add_stream(pkts)
3288 self.pg_enable_capture(self.pg_interfaces)
3290 self.pg1.get_capture(max_sessions)
3293 users = self.vapi.nat44_user_dump()
3295 nsessions = nsessions + user.nsessions
3296 self.assertLess(nsessions, 2 * max_sessions)
3299 super(TestNAT44, self).tearDown()
3300 if not self.vpp_dead:
3301 self.logger.info(self.vapi.cli("show nat44 addresses"))
3302 self.logger.info(self.vapi.cli("show nat44 interfaces"))
3303 self.logger.info(self.vapi.cli("show nat44 static mappings"))
3304 self.logger.info(self.vapi.cli("show nat44 interface address"))
3305 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3306 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3307 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
3308 self.logger.info(self.vapi.cli("show nat timeouts"))
3310 self.vapi.cli("show nat addr-port-assignment-alg"))
3312 self.vapi.cli("clear logging")
3315 class TestNAT44EndpointDependent(MethodHolder):
3316 """ Endpoint-Dependent mapping and filtering test cases """
3319 def setUpConstants(cls):
3320 super(TestNAT44EndpointDependent, cls).setUpConstants()
3321 cls.vpp_cmdline.extend(["nat", "{", "endpoint-dependent", "}"])
3324 def setUpClass(cls):
3325 super(TestNAT44EndpointDependent, cls).setUpClass()
3326 cls.vapi.cli("set log class nat level debug")
3328 cls.tcp_port_in = 6303
3329 cls.tcp_port_out = 6303
3330 cls.udp_port_in = 6304
3331 cls.udp_port_out = 6304
3332 cls.icmp_id_in = 6305
3333 cls.icmp_id_out = 6305
3334 cls.nat_addr = '10.0.0.3'
3335 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3336 cls.ipfix_src_port = 4739
3337 cls.ipfix_domain_id = 1
3338 cls.tcp_external_port = 80
3340 cls.create_pg_interfaces(range(7))
3341 cls.interfaces = list(cls.pg_interfaces[0:3])
3343 for i in cls.interfaces:
3348 cls.pg0.generate_remote_hosts(3)
3349 cls.pg0.configure_ipv4_neighbors()
3353 cls.pg4.generate_remote_hosts(2)
3354 cls.pg4.config_ip4()
3355 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
3356 cls.vapi.sw_interface_add_del_address(cls.pg4.sw_if_index,
3360 cls.pg4.resolve_arp()
3361 cls.pg4._remote_hosts[1]._ip4 = cls.pg4._remote_hosts[0]._ip4
3362 cls.pg4.resolve_arp()
3364 zero_ip4n = socket.inet_pton(socket.AF_INET, "0.0.0.0")
3365 cls.vapi.ip_table_add_del(1, is_add=1)
3367 cls.pg5._local_ip4 = "10.1.1.1"
3368 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET,
3370 cls.pg5._remote_hosts[0]._ip4 = "10.1.1.2"
3371 cls.pg5._remote_hosts[0]._ip4n = socket.inet_pton(
3372 socket.AF_INET, cls.pg5.remote_ip4)
3373 cls.pg5.set_table_ip4(1)
3374 cls.pg5.config_ip4()
3376 cls.vapi.ip_add_del_route(dst_address=cls.pg5.remote_ip4n,
3377 dst_address_length=32,
3379 next_hop_sw_if_index=cls.pg5.sw_if_index,
3380 next_hop_address=zero_ip4n)
3382 cls.pg6._local_ip4 = "10.1.2.1"
3383 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET,
3385 cls.pg6._remote_hosts[0]._ip4 = "10.1.2.2"
3386 cls.pg6._remote_hosts[0]._ip4n = socket.inet_pton(
3387 socket.AF_INET, cls.pg6.remote_ip4)
3388 cls.pg6.set_table_ip4(1)
3389 cls.pg6.config_ip4()
3391 cls.vapi.ip_add_del_route(dst_address=cls.pg6.remote_ip4n,
3392 dst_address_length=32,
3394 next_hop_sw_if_index=cls.pg6.sw_if_index,
3395 next_hop_address=zero_ip4n)
3397 cls.vapi.ip_add_del_route(dst_address=cls.pg6.remote_ip4n,
3398 dst_address_length=16,
3399 next_hop_address=zero_ip4n,
3401 next_hop_table_id=1)
3402 cls.vapi.ip_add_del_route(dst_address=zero_ip4n,
3403 dst_address_length=0,
3404 next_hop_address=zero_ip4n,
3406 next_hop_table_id=0)
3407 cls.vapi.ip_add_del_route(dst_address=zero_ip4n,
3408 dst_address_length=0,
3410 next_hop_sw_if_index=cls.pg1.sw_if_index,
3411 next_hop_address=cls.pg1.local_ip4n)
3413 cls.pg5.resolve_arp()
3414 cls.pg6.resolve_arp()
3417 super(TestNAT44EndpointDependent, cls).tearDownClass()
3420 def test_dynamic(self):
3421 """ NAT44 dynamic translation test """
3423 self.nat44_add_address(self.nat_addr)
3424 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3425 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3428 nat_config = self.vapi.nat_show_config()
3429 self.assertEqual(1, nat_config.endpoint_dependent)
3432 pkts = self.create_stream_in(self.pg0, self.pg1)
3433 self.pg0.add_stream(pkts)
3434 self.pg_enable_capture(self.pg_interfaces)
3436 capture = self.pg1.get_capture(len(pkts))
3437 self.verify_capture_out(capture)
3440 pkts = self.create_stream_out(self.pg1)
3441 self.pg1.add_stream(pkts)
3442 self.pg_enable_capture(self.pg_interfaces)
3444 capture = self.pg0.get_capture(len(pkts))
3445 self.verify_capture_in(capture, self.pg0)
3447 def test_forwarding(self):
3448 """ NAT44 forwarding test """
3450 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3451 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3453 self.vapi.nat44_forwarding_enable_disable(1)
3455 real_ip = self.pg0.remote_ip4n
3456 alias_ip = self.nat_addr_n
3457 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
3458 external_ip=alias_ip)
3461 # in2out - static mapping match
3463 pkts = self.create_stream_out(self.pg1)
3464 self.pg1.add_stream(pkts)
3465 self.pg_enable_capture(self.pg_interfaces)
3467 capture = self.pg0.get_capture(len(pkts))
3468 self.verify_capture_in(capture, self.pg0)
3470 pkts = self.create_stream_in(self.pg0, self.pg1)
3471 self.pg0.add_stream(pkts)
3472 self.pg_enable_capture(self.pg_interfaces)
3474 capture = self.pg1.get_capture(len(pkts))
3475 self.verify_capture_out(capture, same_port=True)
3477 # in2out - no static mapping match
3479 host0 = self.pg0.remote_hosts[0]
3480 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
3482 pkts = self.create_stream_out(self.pg1,
3483 dst_ip=self.pg0.remote_ip4,
3484 use_inside_ports=True)
3485 self.pg1.add_stream(pkts)
3486 self.pg_enable_capture(self.pg_interfaces)
3488 capture = self.pg0.get_capture(len(pkts))
3489 self.verify_capture_in(capture, self.pg0)
3491 pkts = self.create_stream_in(self.pg0, self.pg1)
3492 self.pg0.add_stream(pkts)
3493 self.pg_enable_capture(self.pg_interfaces)
3495 capture = self.pg1.get_capture(len(pkts))
3496 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3499 self.pg0.remote_hosts[0] = host0
3501 user = self.pg0.remote_hosts[1]
3502 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
3503 self.assertEqual(len(sessions), 3)
3504 self.assertTrue(sessions[0].ext_host_valid)
3505 self.vapi.nat44_del_session(
3506 sessions[0].inside_ip_address,
3507 sessions[0].inside_port,
3508 sessions[0].protocol,
3509 ext_host_address=sessions[0].ext_host_address,
3510 ext_host_port=sessions[0].ext_host_port)
3511 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
3512 self.assertEqual(len(sessions), 2)
3515 self.vapi.nat44_forwarding_enable_disable(0)
3516 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
3517 external_ip=alias_ip,
3520 def test_static_lb(self):
3521 """ NAT44 local service load balancing """
3522 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3525 server1 = self.pg0.remote_hosts[0]
3526 server2 = self.pg0.remote_hosts[1]
3528 locals = [{'addr': server1.ip4n,
3532 {'addr': server2.ip4n,
3537 self.nat44_add_address(self.nat_addr)
3538 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3541 local_num=len(locals),
3543 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3544 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3547 # from client to service
3548 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3549 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3550 TCP(sport=12345, dport=external_port))
3551 self.pg1.add_stream(p)
3552 self.pg_enable_capture(self.pg_interfaces)
3554 capture = self.pg0.get_capture(1)
3560 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3561 if ip.dst == server1.ip4:
3565 self.assertEqual(tcp.dport, local_port)
3566 self.assert_packet_checksums_valid(p)
3568 self.logger.error(ppp("Unexpected or invalid packet:", p))
3571 # from service back to client
3572 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3573 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
3574 TCP(sport=local_port, dport=12345))
3575 self.pg0.add_stream(p)
3576 self.pg_enable_capture(self.pg_interfaces)
3578 capture = self.pg1.get_capture(1)
3583 self.assertEqual(ip.src, self.nat_addr)
3584 self.assertEqual(tcp.sport, external_port)
3585 self.assert_packet_checksums_valid(p)
3587 self.logger.error(ppp("Unexpected or invalid packet:", p))
3590 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3591 self.assertEqual(len(sessions), 1)
3592 self.assertTrue(sessions[0].ext_host_valid)
3593 self.vapi.nat44_del_session(
3594 sessions[0].inside_ip_address,
3595 sessions[0].inside_port,
3596 sessions[0].protocol,
3597 ext_host_address=sessions[0].ext_host_address,
3598 ext_host_port=sessions[0].ext_host_port)
3599 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3600 self.assertEqual(len(sessions), 0)
3602 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3603 def test_static_lb_multi_clients(self):
3604 """ NAT44 local service load balancing - multiple clients"""
3606 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3609 server1 = self.pg0.remote_hosts[0]
3610 server2 = self.pg0.remote_hosts[1]
3612 locals = [{'addr': server1.ip4n,
3616 {'addr': server2.ip4n,
3621 self.nat44_add_address(self.nat_addr)
3622 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3625 local_num=len(locals),
3627 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3628 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3633 clients = ip4_range(self.pg1.remote_ip4, 10, 50)
3635 for client in clients:
3636 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3637 IP(src=client, dst=self.nat_addr) /
3638 TCP(sport=12345, dport=external_port))
3640 self.pg1.add_stream(pkts)
3641 self.pg_enable_capture(self.pg_interfaces)
3643 capture = self.pg0.get_capture(len(pkts))
3645 if p[IP].dst == server1.ip4:
3649 self.assertTrue(server1_n > server2_n)
3651 def test_static_lb_2(self):
3652 """ NAT44 local service load balancing (asymmetrical rule) """
3653 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3656 server1 = self.pg0.remote_hosts[0]
3657 server2 = self.pg0.remote_hosts[1]
3659 locals = [{'addr': server1.ip4n,
3663 {'addr': server2.ip4n,
3668 self.vapi.nat44_forwarding_enable_disable(1)
3669 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3673 local_num=len(locals),
3675 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3676 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3679 # from client to service
3680 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3681 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3682 TCP(sport=12345, dport=external_port))
3683 self.pg1.add_stream(p)
3684 self.pg_enable_capture(self.pg_interfaces)
3686 capture = self.pg0.get_capture(1)
3692 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3693 if ip.dst == server1.ip4:
3697 self.assertEqual(tcp.dport, local_port)
3698 self.assert_packet_checksums_valid(p)
3700 self.logger.error(ppp("Unexpected or invalid packet:", p))
3703 # from service back to client
3704 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3705 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
3706 TCP(sport=local_port, dport=12345))
3707 self.pg0.add_stream(p)
3708 self.pg_enable_capture(self.pg_interfaces)
3710 capture = self.pg1.get_capture(1)
3715 self.assertEqual(ip.src, self.nat_addr)
3716 self.assertEqual(tcp.sport, external_port)
3717 self.assert_packet_checksums_valid(p)
3719 self.logger.error(ppp("Unexpected or invalid packet:", p))
3722 # from client to server (no translation)
3723 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3724 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
3725 TCP(sport=12346, dport=local_port))
3726 self.pg1.add_stream(p)
3727 self.pg_enable_capture(self.pg_interfaces)
3729 capture = self.pg0.get_capture(1)
3735 self.assertEqual(ip.dst, server1.ip4)
3736 self.assertEqual(tcp.dport, local_port)
3737 self.assert_packet_checksums_valid(p)
3739 self.logger.error(ppp("Unexpected or invalid packet:", p))
3742 # from service back to client (no translation)
3743 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
3744 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
3745 TCP(sport=local_port, dport=12346))
3746 self.pg0.add_stream(p)
3747 self.pg_enable_capture(self.pg_interfaces)
3749 capture = self.pg1.get_capture(1)
3754 self.assertEqual(ip.src, server1.ip4)
3755 self.assertEqual(tcp.sport, local_port)
3756 self.assert_packet_checksums_valid(p)
3758 self.logger.error(ppp("Unexpected or invalid packet:", p))
3761 def test_lb_affinity(self):
3762 """ NAT44 local service load balancing affinity """
3763 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3766 server1 = self.pg0.remote_hosts[0]
3767 server2 = self.pg0.remote_hosts[1]
3769 locals = [{'addr': server1.ip4n,
3773 {'addr': server2.ip4n,
3778 self.nat44_add_address(self.nat_addr)
3779 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3783 local_num=len(locals),
3785 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3786 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3789 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3790 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3791 TCP(sport=1025, dport=external_port))
3792 self.pg1.add_stream(p)
3793 self.pg_enable_capture(self.pg_interfaces)
3795 capture = self.pg0.get_capture(1)
3796 backend = capture[0][IP].dst
3798 sessions = self.vapi.nat44_user_session_dump(
3799 socket.inet_pton(socket.AF_INET, backend), 0)
3800 self.assertEqual(len(sessions), 1)
3801 self.assertTrue(sessions[0].ext_host_valid)
3802 self.vapi.nat44_del_session(
3803 sessions[0].inside_ip_address,
3804 sessions[0].inside_port,
3805 sessions[0].protocol,
3806 ext_host_address=sessions[0].ext_host_address,
3807 ext_host_port=sessions[0].ext_host_port)
3810 for port in range(1030, 1100):
3811 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3812 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3813 TCP(sport=port, dport=external_port))
3815 self.pg1.add_stream(pkts)
3816 self.pg_enable_capture(self.pg_interfaces)
3818 capture = self.pg0.get_capture(len(pkts))
3820 self.assertEqual(p[IP].dst, backend)
3822 def test_unknown_proto(self):
3823 """ NAT44 translate packet with unknown protocol """
3824 self.nat44_add_address(self.nat_addr)
3825 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3826 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3830 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3831 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3832 TCP(sport=self.tcp_port_in, dport=20))
3833 self.pg0.add_stream(p)
3834 self.pg_enable_capture(self.pg_interfaces)
3836 p = self.pg1.get_capture(1)
3838 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3839 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3841 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3842 TCP(sport=1234, dport=1234))
3843 self.pg0.add_stream(p)
3844 self.pg_enable_capture(self.pg_interfaces)
3846 p = self.pg1.get_capture(1)
3849 self.assertEqual(packet[IP].src, self.nat_addr)
3850 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3851 self.assertTrue(packet.haslayer(GRE))
3852 self.assert_packet_checksums_valid(packet)
3854 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3858 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3859 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3861 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3862 TCP(sport=1234, dport=1234))
3863 self.pg1.add_stream(p)
3864 self.pg_enable_capture(self.pg_interfaces)
3866 p = self.pg0.get_capture(1)
3869 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3870 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3871 self.assertTrue(packet.haslayer(GRE))
3872 self.assert_packet_checksums_valid(packet)
3874 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3877 def test_hairpinning_unknown_proto(self):
3878 """ NAT44 translate packet with unknown protocol - hairpinning """
3879 host = self.pg0.remote_hosts[0]
3880 server = self.pg0.remote_hosts[1]
3882 server_out_port = 8765
3883 server_nat_ip = "10.0.0.11"
3885 self.nat44_add_address(self.nat_addr)
3886 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3887 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3890 # add static mapping for server
3891 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3894 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3895 IP(src=host.ip4, dst=server_nat_ip) /
3896 TCP(sport=host_in_port, dport=server_out_port))
3897 self.pg0.add_stream(p)
3898 self.pg_enable_capture(self.pg_interfaces)
3900 self.pg0.get_capture(1)
3902 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3903 IP(src=host.ip4, dst=server_nat_ip) /
3905 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3906 TCP(sport=1234, dport=1234))
3907 self.pg0.add_stream(p)
3908 self.pg_enable_capture(self.pg_interfaces)
3910 p = self.pg0.get_capture(1)
3913 self.assertEqual(packet[IP].src, self.nat_addr)
3914 self.assertEqual(packet[IP].dst, server.ip4)
3915 self.assertTrue(packet.haslayer(GRE))
3916 self.assert_packet_checksums_valid(packet)
3918 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3922 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3923 IP(src=server.ip4, dst=self.nat_addr) /
3925 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3926 TCP(sport=1234, dport=1234))
3927 self.pg0.add_stream(p)
3928 self.pg_enable_capture(self.pg_interfaces)
3930 p = self.pg0.get_capture(1)
3933 self.assertEqual(packet[IP].src, server_nat_ip)
3934 self.assertEqual(packet[IP].dst, host.ip4)
3935 self.assertTrue(packet.haslayer(GRE))
3936 self.assert_packet_checksums_valid(packet)
3938 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3941 def test_output_feature_and_service(self):
3942 """ NAT44 interface output feature and services """
3943 external_addr = '1.2.3.4'
3947 self.vapi.nat44_forwarding_enable_disable(1)
3948 self.nat44_add_address(self.nat_addr)
3949 self.vapi.nat44_add_del_identity_mapping(ip=self.pg1.remote_ip4n)
3950 self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
3951 local_port, external_port,
3952 proto=IP_PROTOS.tcp, out2in_only=1)
3953 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3954 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3956 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3959 # from client to service
3960 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3961 IP(src=self.pg1.remote_ip4, dst=external_addr) /
3962 TCP(sport=12345, dport=external_port))
3963 self.pg1.add_stream(p)
3964 self.pg_enable_capture(self.pg_interfaces)
3966 capture = self.pg0.get_capture(1)
3971 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3972 self.assertEqual(tcp.dport, local_port)
3973 self.assert_packet_checksums_valid(p)
3975 self.logger.error(ppp("Unexpected or invalid packet:", p))
3978 # from service back to client
3979 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3980 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3981 TCP(sport=local_port, dport=12345))
3982 self.pg0.add_stream(p)
3983 self.pg_enable_capture(self.pg_interfaces)
3985 capture = self.pg1.get_capture(1)
3990 self.assertEqual(ip.src, external_addr)
3991 self.assertEqual(tcp.sport, external_port)
3992 self.assert_packet_checksums_valid(p)
3994 self.logger.error(ppp("Unexpected or invalid packet:", p))
3997 # from local network host to external network
3998 pkts = self.create_stream_in(self.pg0, self.pg1)
3999 self.pg0.add_stream(pkts)
4000 self.pg_enable_capture(self.pg_interfaces)
4002 capture = self.pg1.get_capture(len(pkts))
4003 self.verify_capture_out(capture)
4004 pkts = self.create_stream_in(self.pg0, self.pg1)
4005 self.pg0.add_stream(pkts)
4006 self.pg_enable_capture(self.pg_interfaces)
4008 capture = self.pg1.get_capture(len(pkts))
4009 self.verify_capture_out(capture)
4011 # from external network back to local network host
4012 pkts = self.create_stream_out(self.pg1)
4013 self.pg1.add_stream(pkts)
4014 self.pg_enable_capture(self.pg_interfaces)
4016 capture = self.pg0.get_capture(len(pkts))
4017 self.verify_capture_in(capture, self.pg0)
4019 def test_output_feature_and_service2(self):
4020 """ NAT44 interface output feature and service host direct access """
4021 self.vapi.nat44_forwarding_enable_disable(1)
4022 self.nat44_add_address(self.nat_addr)
4023 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4026 # session initiaded from service host - translate
4027 pkts = self.create_stream_in(self.pg0, self.pg1)
4028 self.pg0.add_stream(pkts)
4029 self.pg_enable_capture(self.pg_interfaces)
4031 capture = self.pg1.get_capture(len(pkts))
4032 self.verify_capture_out(capture)
4034 pkts = self.create_stream_out(self.pg1)
4035 self.pg1.add_stream(pkts)
4036 self.pg_enable_capture(self.pg_interfaces)
4038 capture = self.pg0.get_capture(len(pkts))
4039 self.verify_capture_in(capture, self.pg0)
4041 # session initiaded from remote host - do not translate
4042 self.tcp_port_in = 60303
4043 self.udp_port_in = 60304
4044 self.icmp_id_in = 60305
4045 pkts = self.create_stream_out(self.pg1,
4046 self.pg0.remote_ip4,
4047 use_inside_ports=True)
4048 self.pg1.add_stream(pkts)
4049 self.pg_enable_capture(self.pg_interfaces)
4051 capture = self.pg0.get_capture(len(pkts))
4052 self.verify_capture_in(capture, self.pg0)
4054 pkts = self.create_stream_in(self.pg0, self.pg1)
4055 self.pg0.add_stream(pkts)
4056 self.pg_enable_capture(self.pg_interfaces)
4058 capture = self.pg1.get_capture(len(pkts))
4059 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
4062 def test_output_feature_and_service3(self):
4063 """ NAT44 interface output feature and DST NAT """
4064 external_addr = '1.2.3.4'
4068 self.vapi.nat44_forwarding_enable_disable(1)
4069 self.nat44_add_address(self.nat_addr)
4070 self.nat44_add_static_mapping(self.pg1.remote_ip4, external_addr,
4071 local_port, external_port,
4072 proto=IP_PROTOS.tcp, out2in_only=1)
4073 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4074 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4076 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4079 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4080 IP(src=self.pg0.remote_ip4, dst=external_addr) /
4081 TCP(sport=12345, dport=external_port))
4082 self.pg0.add_stream(p)
4083 self.pg_enable_capture(self.pg_interfaces)
4085 capture = self.pg1.get_capture(1)
4090 self.assertEqual(ip.src, self.pg0.remote_ip4)
4091 self.assertEqual(tcp.sport, 12345)
4092 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4093 self.assertEqual(tcp.dport, local_port)
4094 self.assert_packet_checksums_valid(p)
4096 self.logger.error(ppp("Unexpected or invalid packet:", p))
4099 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4100 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
4101 TCP(sport=local_port, dport=12345))
4102 self.pg1.add_stream(p)
4103 self.pg_enable_capture(self.pg_interfaces)
4105 capture = self.pg0.get_capture(1)
4110 self.assertEqual(ip.src, external_addr)
4111 self.assertEqual(tcp.sport, external_port)
4112 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4113 self.assertEqual(tcp.dport, 12345)
4114 self.assert_packet_checksums_valid(p)
4116 self.logger.error(ppp("Unexpected or invalid packet:", p))
4119 def test_next_src_nat(self):
4120 """ On way back forward packet to nat44-in2out node. """
4121 twice_nat_addr = '10.0.1.3'
4124 post_twice_nat_port = 0
4126 self.vapi.nat44_forwarding_enable_disable(1)
4127 self.nat44_add_address(twice_nat_addr, twice_nat=1)
4128 self.nat44_add_static_mapping(self.pg6.remote_ip4, self.pg1.remote_ip4,
4129 local_port, external_port,
4130 proto=IP_PROTOS.tcp, out2in_only=1,
4131 self_twice_nat=1, vrf_id=1)
4132 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index,
4135 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4136 IP(src=self.pg6.remote_ip4, dst=self.pg1.remote_ip4) /
4137 TCP(sport=12345, dport=external_port))
4138 self.pg6.add_stream(p)
4139 self.pg_enable_capture(self.pg_interfaces)
4141 capture = self.pg6.get_capture(1)
4146 self.assertEqual(ip.src, twice_nat_addr)
4147 self.assertNotEqual(tcp.sport, 12345)
4148 post_twice_nat_port = tcp.sport
4149 self.assertEqual(ip.dst, self.pg6.remote_ip4)
4150 self.assertEqual(tcp.dport, local_port)
4151 self.assert_packet_checksums_valid(p)
4153 self.logger.error(ppp("Unexpected or invalid packet:", p))
4156 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4157 IP(src=self.pg6.remote_ip4, dst=twice_nat_addr) /
4158 TCP(sport=local_port, dport=post_twice_nat_port))
4159 self.pg6.add_stream(p)
4160 self.pg_enable_capture(self.pg_interfaces)
4162 capture = self.pg6.get_capture(1)
4167 self.assertEqual(ip.src, self.pg1.remote_ip4)
4168 self.assertEqual(tcp.sport, external_port)
4169 self.assertEqual(ip.dst, self.pg6.remote_ip4)
4170 self.assertEqual(tcp.dport, 12345)
4171 self.assert_packet_checksums_valid(p)
4173 self.logger.error(ppp("Unexpected or invalid packet:", p))
4176 def twice_nat_common(self, self_twice_nat=False, same_pg=False, lb=False,
4178 twice_nat_addr = '10.0.1.3'
4186 port_in1 = port_in+1
4187 port_in2 = port_in+2
4192 server1 = self.pg0.remote_hosts[0]
4193 server2 = self.pg0.remote_hosts[1]
4205 eh_translate = ((not self_twice_nat) or (not lb and same_pg) or
4208 self.nat44_add_address(self.nat_addr)
4209 self.nat44_add_address(twice_nat_addr, twice_nat=1)
4211 self.nat44_add_static_mapping(pg0.remote_ip4, self.nat_addr,
4213 proto=IP_PROTOS.tcp,
4214 twice_nat=int(not self_twice_nat),
4215 self_twice_nat=int(self_twice_nat))
4217 locals = [{'addr': server1.ip4n,
4221 {'addr': server2.ip4n,
4225 out_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
4226 self.vapi.nat44_add_del_lb_static_mapping(out_addr_n,
4230 not self_twice_nat),
4233 local_num=len(locals),
4235 self.vapi.nat44_interface_add_del_feature(pg0.sw_if_index)
4236 self.vapi.nat44_interface_add_del_feature(pg1.sw_if_index,
4243 assert client_id is not None
4245 client = self.pg0.remote_hosts[0]
4246 elif client_id == 2:
4247 client = self.pg0.remote_hosts[1]
4249 client = pg1.remote_hosts[0]
4250 p = (Ether(src=pg1.remote_mac, dst=pg1.local_mac) /
4251 IP(src=client.ip4, dst=self.nat_addr) /
4252 TCP(sport=eh_port_out, dport=port_out))
4254 self.pg_enable_capture(self.pg_interfaces)
4256 capture = pg0.get_capture(1)
4262 if ip.dst == server1.ip4:
4268 self.assertEqual(ip.dst, server.ip4)
4270 self.assertIn(tcp.dport, [port_in1, port_in2])
4272 self.assertEqual(tcp.dport, port_in)
4274 self.assertEqual(ip.src, twice_nat_addr)
4275 self.assertNotEqual(tcp.sport, eh_port_out)
4277 self.assertEqual(ip.src, client.ip4)
4278 self.assertEqual(tcp.sport, eh_port_out)
4280 eh_port_in = tcp.sport
4281 saved_port_in = tcp.dport
4282 self.assert_packet_checksums_valid(p)
4284 self.logger.error(ppp("Unexpected or invalid packet:", p))
4287 p = (Ether(src=server.mac, dst=pg0.local_mac) /
4288 IP(src=server.ip4, dst=eh_addr_in) /
4289 TCP(sport=saved_port_in, dport=eh_port_in))
4291 self.pg_enable_capture(self.pg_interfaces)
4293 capture = pg1.get_capture(1)
4298 self.assertEqual(ip.dst, client.ip4)
4299 self.assertEqual(ip.src, self.nat_addr)
4300 self.assertEqual(tcp.dport, eh_port_out)
4301 self.assertEqual(tcp.sport, port_out)
4302 self.assert_packet_checksums_valid(p)
4304 self.logger.error(ppp("Unexpected or invalid packet:", p))
4308 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
4309 self.assertEqual(len(sessions), 1)
4310 self.assertTrue(sessions[0].ext_host_valid)
4311 self.assertTrue(sessions[0].is_twicenat)
4312 self.vapi.nat44_del_session(
4313 sessions[0].inside_ip_address,
4314 sessions[0].inside_port,
4315 sessions[0].protocol,
4316 ext_host_address=sessions[0].ext_host_nat_address,
4317 ext_host_port=sessions[0].ext_host_nat_port)
4318 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
4319 self.assertEqual(len(sessions), 0)
4321 def test_twice_nat(self):
4323 self.twice_nat_common()
4325 def test_self_twice_nat_positive(self):
4326 """ Self Twice NAT44 (positive test) """
4327 self.twice_nat_common(self_twice_nat=True, same_pg=True)
4329 def test_self_twice_nat_negative(self):
4330 """ Self Twice NAT44 (negative test) """
4331 self.twice_nat_common(self_twice_nat=True)
4333 def test_twice_nat_lb(self):
4334 """ Twice NAT44 local service load balancing """
4335 self.twice_nat_common(lb=True)
4337 def test_self_twice_nat_lb_positive(self):
4338 """ Self Twice NAT44 local service load balancing (positive test) """
4339 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4342 def test_self_twice_nat_lb_negative(self):
4343 """ Self Twice NAT44 local service load balancing (negative test) """
4344 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4347 def test_twice_nat_interface_addr(self):
4348 """ Acquire twice NAT44 addresses from interface """
4349 self.vapi.nat44_add_interface_addr(self.pg3.sw_if_index, twice_nat=1)
4351 # no address in NAT pool
4352 adresses = self.vapi.nat44_address_dump()
4353 self.assertEqual(0, len(adresses))
4355 # configure interface address and check NAT address pool
4356 self.pg3.config_ip4()
4357 adresses = self.vapi.nat44_address_dump()
4358 self.assertEqual(1, len(adresses))
4359 self.assertEqual(adresses[0].ip_address[0:4], self.pg3.local_ip4n)
4360 self.assertEqual(adresses[0].twice_nat, 1)
4362 # remove interface address and check NAT address pool
4363 self.pg3.unconfig_ip4()
4364 adresses = self.vapi.nat44_address_dump()
4365 self.assertEqual(0, len(adresses))
4367 def test_tcp_session_close_in(self):
4368 """ Close TCP session from inside network """
4369 self.tcp_port_out = 10505
4370 self.nat44_add_address(self.nat_addr)
4371 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4375 proto=IP_PROTOS.tcp,
4377 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4378 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4381 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4382 start_sessnum = len(sessions)
4384 self.initiate_tcp_session(self.pg0, self.pg1)
4386 # FIN packet in -> out
4387 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4388 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4389 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4390 flags="FA", seq=100, ack=300))
4391 self.pg0.add_stream(p)
4392 self.pg_enable_capture(self.pg_interfaces)
4394 self.pg1.get_capture(1)
4398 # ACK packet out -> in
4399 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4400 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4401 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4402 flags="A", seq=300, ack=101))
4405 # FIN packet out -> in
4406 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4407 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4408 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4409 flags="FA", seq=300, ack=101))
4412 self.pg1.add_stream(pkts)
4413 self.pg_enable_capture(self.pg_interfaces)
4415 self.pg0.get_capture(2)
4417 # ACK packet in -> out
4418 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4419 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4420 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4421 flags="A", seq=101, ack=301))
4422 self.pg0.add_stream(p)
4423 self.pg_enable_capture(self.pg_interfaces)
4425 self.pg1.get_capture(1)
4427 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4429 self.assertEqual(len(sessions) - start_sessnum, 0)
4431 def test_tcp_session_close_out(self):
4432 """ Close TCP session from outside network """
4433 self.tcp_port_out = 10505
4434 self.nat44_add_address(self.nat_addr)
4435 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4439 proto=IP_PROTOS.tcp,
4441 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4442 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4445 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4446 start_sessnum = len(sessions)
4448 self.initiate_tcp_session(self.pg0, self.pg1)
4450 # FIN packet out -> in
4451 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4452 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4453 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4454 flags="FA", seq=100, ack=300))
4455 self.pg1.add_stream(p)
4456 self.pg_enable_capture(self.pg_interfaces)
4458 self.pg0.get_capture(1)
4460 # FIN+ACK packet in -> out
4461 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4462 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4463 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4464 flags="FA", seq=300, ack=101))
4466 self.pg0.add_stream(p)
4467 self.pg_enable_capture(self.pg_interfaces)
4469 self.pg1.get_capture(1)
4471 # ACK packet out -> in
4472 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4473 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4474 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4475 flags="A", seq=101, ack=301))
4476 self.pg1.add_stream(p)
4477 self.pg_enable_capture(self.pg_interfaces)
4479 self.pg0.get_capture(1)
4481 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4483 self.assertEqual(len(sessions) - start_sessnum, 0)
4485 def test_tcp_session_close_simultaneous(self):
4486 """ Close TCP session from inside network """
4487 self.tcp_port_out = 10505
4488 self.nat44_add_address(self.nat_addr)
4489 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4493 proto=IP_PROTOS.tcp,
4495 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4496 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4499 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4500 start_sessnum = len(sessions)
4502 self.initiate_tcp_session(self.pg0, self.pg1)
4504 # FIN packet in -> out
4505 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4506 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4507 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4508 flags="FA", seq=100, ack=300))
4509 self.pg0.add_stream(p)
4510 self.pg_enable_capture(self.pg_interfaces)
4512 self.pg1.get_capture(1)
4514 # FIN packet out -> in
4515 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4516 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4517 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4518 flags="FA", seq=300, ack=100))
4519 self.pg1.add_stream(p)
4520 self.pg_enable_capture(self.pg_interfaces)
4522 self.pg0.get_capture(1)
4524 # ACK packet in -> out
4525 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4526 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4527 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4528 flags="A", seq=101, ack=301))
4529 self.pg0.add_stream(p)
4530 self.pg_enable_capture(self.pg_interfaces)
4532 self.pg1.get_capture(1)
4534 # ACK packet out -> in
4535 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4536 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4537 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4538 flags="A", seq=301, ack=101))
4539 self.pg1.add_stream(p)
4540 self.pg_enable_capture(self.pg_interfaces)
4542 self.pg0.get_capture(1)
4544 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4546 self.assertEqual(len(sessions) - start_sessnum, 0)
4548 def test_one_armed_nat44_static(self):
4549 """ One armed NAT44 and 1:1 NAPT asymmetrical rule """
4550 remote_host = self.pg4.remote_hosts[0]
4551 local_host = self.pg4.remote_hosts[1]
4556 self.vapi.nat44_forwarding_enable_disable(1)
4557 self.nat44_add_address(self.nat_addr, twice_nat=1)
4558 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
4559 local_port, external_port,
4560 proto=IP_PROTOS.tcp, out2in_only=1,
4562 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
4563 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index,
4566 # from client to service
4567 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
4568 IP(src=remote_host.ip4, dst=self.nat_addr) /
4569 TCP(sport=12345, dport=external_port))
4570 self.pg4.add_stream(p)
4571 self.pg_enable_capture(self.pg_interfaces)
4573 capture = self.pg4.get_capture(1)
4578 self.assertEqual(ip.dst, local_host.ip4)
4579 self.assertEqual(ip.src, self.nat_addr)
4580 self.assertEqual(tcp.dport, local_port)
4581 self.assertNotEqual(tcp.sport, 12345)
4582 eh_port_in = tcp.sport
4583 self.assert_packet_checksums_valid(p)
4585 self.logger.error(ppp("Unexpected or invalid packet:", p))
4588 # from service back to client
4589 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
4590 IP(src=local_host.ip4, dst=self.nat_addr) /
4591 TCP(sport=local_port, dport=eh_port_in))
4592 self.pg4.add_stream(p)
4593 self.pg_enable_capture(self.pg_interfaces)
4595 capture = self.pg4.get_capture(1)
4600 self.assertEqual(ip.src, self.nat_addr)
4601 self.assertEqual(ip.dst, remote_host.ip4)
4602 self.assertEqual(tcp.sport, external_port)
4603 self.assertEqual(tcp.dport, 12345)
4604 self.assert_packet_checksums_valid(p)
4606 self.logger.error(ppp("Unexpected or invalid packet:", p))
4609 def test_static_with_port_out2(self):
4610 """ 1:1 NAPT asymmetrical rule """
4615 self.vapi.nat44_forwarding_enable_disable(1)
4616 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
4617 local_port, external_port,
4618 proto=IP_PROTOS.tcp, out2in_only=1)
4619 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4620 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4623 # from client to service
4624 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4625 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4626 TCP(sport=12345, dport=external_port))
4627 self.pg1.add_stream(p)
4628 self.pg_enable_capture(self.pg_interfaces)
4630 capture = self.pg0.get_capture(1)
4635 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4636 self.assertEqual(tcp.dport, local_port)
4637 self.assert_packet_checksums_valid(p)
4639 self.logger.error(ppp("Unexpected or invalid packet:", p))
4643 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4644 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4645 ICMP(type=11) / capture[0][IP])
4646 self.pg0.add_stream(p)
4647 self.pg_enable_capture(self.pg_interfaces)
4649 capture = self.pg1.get_capture(1)
4652 self.assertEqual(p[IP].src, self.nat_addr)
4654 self.assertEqual(inner.dst, self.nat_addr)
4655 self.assertEqual(inner[TCPerror].dport, external_port)
4657 self.logger.error(ppp("Unexpected or invalid packet:", p))
4660 # from service back to client
4661 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4662 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4663 TCP(sport=local_port, dport=12345))
4664 self.pg0.add_stream(p)
4665 self.pg_enable_capture(self.pg_interfaces)
4667 capture = self.pg1.get_capture(1)
4672 self.assertEqual(ip.src, self.nat_addr)
4673 self.assertEqual(tcp.sport, external_port)
4674 self.assert_packet_checksums_valid(p)
4676 self.logger.error(ppp("Unexpected or invalid packet:", p))
4680 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4681 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4682 ICMP(type=11) / capture[0][IP])
4683 self.pg1.add_stream(p)
4684 self.pg_enable_capture(self.pg_interfaces)
4686 capture = self.pg0.get_capture(1)
4689 self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
4691 self.assertEqual(inner.src, self.pg0.remote_ip4)
4692 self.assertEqual(inner[TCPerror].sport, local_port)
4694 self.logger.error(ppp("Unexpected or invalid packet:", p))
4697 # from client to server (no translation)
4698 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4699 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
4700 TCP(sport=12346, dport=local_port))
4701 self.pg1.add_stream(p)
4702 self.pg_enable_capture(self.pg_interfaces)
4704 capture = self.pg0.get_capture(1)
4709 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4710 self.assertEqual(tcp.dport, local_port)
4711 self.assert_packet_checksums_valid(p)
4713 self.logger.error(ppp("Unexpected or invalid packet:", p))
4716 # from service back to client (no translation)
4717 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4718 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4719 TCP(sport=local_port, dport=12346))
4720 self.pg0.add_stream(p)
4721 self.pg_enable_capture(self.pg_interfaces)
4723 capture = self.pg1.get_capture(1)
4728 self.assertEqual(ip.src, self.pg0.remote_ip4)
4729 self.assertEqual(tcp.sport, local_port)
4730 self.assert_packet_checksums_valid(p)
4732 self.logger.error(ppp("Unexpected or invalid packet:", p))
4735 def test_output_feature(self):
4736 """ NAT44 interface output feature (in2out postrouting) """
4737 self.vapi.nat44_forwarding_enable_disable(1)
4738 self.nat44_add_address(self.nat_addr)
4739 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4741 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4745 pkts = self.create_stream_in(self.pg0, self.pg1)
4746 self.pg0.add_stream(pkts)
4747 self.pg_enable_capture(self.pg_interfaces)
4749 capture = self.pg1.get_capture(len(pkts))
4750 self.verify_capture_out(capture)
4753 pkts = self.create_stream_out(self.pg1)
4754 self.pg1.add_stream(pkts)
4755 self.pg_enable_capture(self.pg_interfaces)
4757 capture = self.pg0.get_capture(len(pkts))
4758 self.verify_capture_in(capture, self.pg0)
4760 def test_multiple_vrf(self):
4761 """ Multiple VRF setup """
4762 external_addr = '1.2.3.4'
4767 self.vapi.nat44_forwarding_enable_disable(1)
4768 self.nat44_add_address(self.nat_addr)
4769 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4770 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4772 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4774 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
4775 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index,
4777 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index,
4779 self.nat44_add_static_mapping(self.pg5.remote_ip4, external_addr,
4780 local_port, external_port, vrf_id=1,
4781 proto=IP_PROTOS.tcp, out2in_only=1)
4782 self.nat44_add_static_mapping(
4783 self.pg0.remote_ip4, external_sw_if_index=self.pg0.sw_if_index,
4784 local_port=local_port, vrf_id=0, external_port=external_port,
4785 proto=IP_PROTOS.tcp, out2in_only=1)
4787 # from client to service (both VRF1)
4788 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4789 IP(src=self.pg6.remote_ip4, dst=external_addr) /
4790 TCP(sport=12345, dport=external_port))
4791 self.pg6.add_stream(p)
4792 self.pg_enable_capture(self.pg_interfaces)
4794 capture = self.pg5.get_capture(1)
4799 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4800 self.assertEqual(tcp.dport, local_port)
4801 self.assert_packet_checksums_valid(p)
4803 self.logger.error(ppp("Unexpected or invalid packet:", p))
4806 # from service back to client (both VRF1)
4807 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4808 IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) /
4809 TCP(sport=local_port, dport=12345))
4810 self.pg5.add_stream(p)
4811 self.pg_enable_capture(self.pg_interfaces)
4813 capture = self.pg6.get_capture(1)
4818 self.assertEqual(ip.src, external_addr)
4819 self.assertEqual(tcp.sport, external_port)
4820 self.assert_packet_checksums_valid(p)
4822 self.logger.error(ppp("Unexpected or invalid packet:", p))
4825 # dynamic NAT from VRF1 to VRF0 (output-feature)
4826 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4827 IP(src=self.pg5.remote_ip4, dst=self.pg1.remote_ip4) /
4828 TCP(sport=2345, dport=22))
4829 self.pg5.add_stream(p)
4830 self.pg_enable_capture(self.pg_interfaces)
4832 capture = self.pg1.get_capture(1)
4837 self.assertEqual(ip.src, self.nat_addr)
4838 self.assertNotEqual(tcp.sport, 2345)
4839 self.assert_packet_checksums_valid(p)
4842 self.logger.error(ppp("Unexpected or invalid packet:", p))
4845 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4846 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4847 TCP(sport=22, dport=port))
4848 self.pg1.add_stream(p)
4849 self.pg_enable_capture(self.pg_interfaces)
4851 capture = self.pg5.get_capture(1)
4856 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4857 self.assertEqual(tcp.dport, 2345)
4858 self.assert_packet_checksums_valid(p)
4860 self.logger.error(ppp("Unexpected or invalid packet:", p))
4863 # from client VRF1 to service VRF0
4864 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4865 IP(src=self.pg6.remote_ip4, dst=self.pg0.local_ip4) /
4866 TCP(sport=12346, dport=external_port))
4867 self.pg6.add_stream(p)
4868 self.pg_enable_capture(self.pg_interfaces)
4870 capture = self.pg0.get_capture(1)
4875 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4876 self.assertEqual(tcp.dport, local_port)
4877 self.assert_packet_checksums_valid(p)
4879 self.logger.error(ppp("Unexpected or invalid packet:", p))
4882 # from service VRF0 back to client VRF1
4883 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4884 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
4885 TCP(sport=local_port, dport=12346))
4886 self.pg0.add_stream(p)
4887 self.pg_enable_capture(self.pg_interfaces)
4889 capture = self.pg6.get_capture(1)
4894 self.assertEqual(ip.src, self.pg0.local_ip4)
4895 self.assertEqual(tcp.sport, external_port)
4896 self.assert_packet_checksums_valid(p)
4898 self.logger.error(ppp("Unexpected or invalid packet:", p))
4901 # from client VRF0 to service VRF1
4902 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4903 IP(src=self.pg0.remote_ip4, dst=external_addr) /
4904 TCP(sport=12347, dport=external_port))
4905 self.pg0.add_stream(p)
4906 self.pg_enable_capture(self.pg_interfaces)
4908 capture = self.pg5.get_capture(1)
4913 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4914 self.assertEqual(tcp.dport, local_port)
4915 self.assert_packet_checksums_valid(p)
4917 self.logger.error(ppp("Unexpected or invalid packet:", p))
4920 # from service VRF1 back to client VRF0
4921 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4922 IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) /
4923 TCP(sport=local_port, dport=12347))
4924 self.pg5.add_stream(p)
4925 self.pg_enable_capture(self.pg_interfaces)
4927 capture = self.pg0.get_capture(1)
4932 self.assertEqual(ip.src, external_addr)
4933 self.assertEqual(tcp.sport, external_port)
4934 self.assert_packet_checksums_valid(p)
4936 self.logger.error(ppp("Unexpected or invalid packet:", p))
4939 # from client to server (both VRF1, no translation)
4940 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4941 IP(src=self.pg6.remote_ip4, dst=self.pg5.remote_ip4) /
4942 TCP(sport=12348, dport=local_port))
4943 self.pg6.add_stream(p)
4944 self.pg_enable_capture(self.pg_interfaces)
4946 capture = self.pg5.get_capture(1)
4951 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4952 self.assertEqual(tcp.dport, local_port)
4953 self.assert_packet_checksums_valid(p)
4955 self.logger.error(ppp("Unexpected or invalid packet:", p))
4958 # from server back to client (both VRF1, no translation)
4959 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4960 IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) /
4961 TCP(sport=local_port, dport=12348))
4962 self.pg5.add_stream(p)
4963 self.pg_enable_capture(self.pg_interfaces)
4965 capture = self.pg6.get_capture(1)
4970 self.assertEqual(ip.src, self.pg5.remote_ip4)
4971 self.assertEqual(tcp.sport, local_port)
4972 self.assert_packet_checksums_valid(p)
4974 self.logger.error(ppp("Unexpected or invalid packet:", p))
4977 # from client VRF1 to server VRF0 (no translation)
4978 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4979 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
4980 TCP(sport=local_port, dport=12349))
4981 self.pg0.add_stream(p)
4982 self.pg_enable_capture(self.pg_interfaces)
4984 capture = self.pg6.get_capture(1)
4989 self.assertEqual(ip.src, self.pg0.remote_ip4)
4990 self.assertEqual(tcp.sport, local_port)
4991 self.assert_packet_checksums_valid(p)
4993 self.logger.error(ppp("Unexpected or invalid packet:", p))
4996 # from server VRF0 back to client VRF1 (no translation)
4997 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4998 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
4999 TCP(sport=local_port, dport=12349))
5000 self.pg0.add_stream(p)
5001 self.pg_enable_capture(self.pg_interfaces)
5003 capture = self.pg6.get_capture(1)
5008 self.assertEqual(ip.src, self.pg0.remote_ip4)
5009 self.assertEqual(tcp.sport, local_port)
5010 self.assert_packet_checksums_valid(p)
5012 self.logger.error(ppp("Unexpected or invalid packet:", p))
5015 # from client VRF0 to server VRF1 (no translation)
5016 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5017 IP(src=self.pg0.remote_ip4, dst=self.pg5.remote_ip4) /
5018 TCP(sport=12344, dport=local_port))
5019 self.pg0.add_stream(p)
5020 self.pg_enable_capture(self.pg_interfaces)
5022 capture = self.pg5.get_capture(1)
5027 self.assertEqual(ip.dst, self.pg5.remote_ip4)
5028 self.assertEqual(tcp.dport, local_port)
5029 self.assert_packet_checksums_valid(p)
5031 self.logger.error(ppp("Unexpected or invalid packet:", p))
5034 # from server VRF1 back to client VRF0 (no translation)
5035 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
5036 IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) /
5037 TCP(sport=local_port, dport=12344))
5038 self.pg5.add_stream(p)
5039 self.pg_enable_capture(self.pg_interfaces)
5041 capture = self.pg0.get_capture(1)
5046 self.assertEqual(ip.src, self.pg5.remote_ip4)
5047 self.assertEqual(tcp.sport, local_port)
5048 self.assert_packet_checksums_valid(p)
5050 self.logger.error(ppp("Unexpected or invalid packet:", p))
5053 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5054 def test_session_timeout(self):
5055 """ NAT44 session timeouts """
5056 self.nat44_add_address(self.nat_addr)
5057 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5058 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5060 self.vapi.nat_set_timeouts(icmp=5)
5064 for i in range(0, max_sessions):
5065 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
5066 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5067 IP(src=src, dst=self.pg1.remote_ip4) /
5068 ICMP(id=1025, type='echo-request'))
5070 self.pg0.add_stream(pkts)
5071 self.pg_enable_capture(self.pg_interfaces)
5073 self.pg1.get_capture(max_sessions)
5078 for i in range(0, max_sessions):
5079 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
5080 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5081 IP(src=src, dst=self.pg1.remote_ip4) /
5082 ICMP(id=1026, type='echo-request'))
5084 self.pg0.add_stream(pkts)
5085 self.pg_enable_capture(self.pg_interfaces)
5087 self.pg1.get_capture(max_sessions)
5090 users = self.vapi.nat44_user_dump()
5092 nsessions = nsessions + user.nsessions
5093 self.assertLess(nsessions, 2 * max_sessions)
5095 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5096 def test_session_limit_per_user(self):
5097 """ Maximum sessions per user limit """
5098 self.nat44_add_address(self.nat_addr)
5099 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5100 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5102 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
5103 src_address=self.pg2.local_ip4n,
5105 template_interval=10)
5107 # get maximum number of translations per user
5108 nat44_config = self.vapi.nat_show_config()
5111 for port in range(0, nat44_config.max_translations_per_user):
5112 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5113 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5114 UDP(sport=1025 + port, dport=1025 + port))
5117 self.pg0.add_stream(pkts)
5118 self.pg_enable_capture(self.pg_interfaces)
5120 capture = self.pg1.get_capture(len(pkts))
5122 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5123 src_port=self.ipfix_src_port)
5125 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5126 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5127 UDP(sport=3001, dport=3002))
5128 self.pg0.add_stream(p)
5129 self.pg_enable_capture(self.pg_interfaces)
5131 capture = self.pg1.assert_nothing_captured()
5133 # verify IPFIX logging
5134 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5136 capture = self.pg2.get_capture(10)
5137 ipfix = IPFIXDecoder()
5138 # first load template
5140 self.assertTrue(p.haslayer(IPFIX))
5141 if p.haslayer(Template):
5142 ipfix.add_template(p.getlayer(Template))
5143 # verify events in data set
5145 if p.haslayer(Data):
5146 data = ipfix.decode_data_set(p.getlayer(Set))
5147 self.verify_ipfix_max_entries_per_user(
5149 nat44_config.max_translations_per_user,
5150 self.pg0.remote_ip4n)
5153 super(TestNAT44EndpointDependent, self).tearDown()
5154 if not self.vpp_dead:
5155 self.logger.info(self.vapi.cli("show nat44 addresses"))
5156 self.logger.info(self.vapi.cli("show nat44 interfaces"))
5157 self.logger.info(self.vapi.cli("show nat44 static mappings"))
5158 self.logger.info(self.vapi.cli("show nat44 interface address"))
5159 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
5160 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
5161 self.logger.info(self.vapi.cli("show nat timeouts"))
5163 self.vapi.cli("clear logging")
5166 class TestNAT44Out2InDPO(MethodHolder):
5167 """ NAT44 Test Cases using out2in DPO """
5170 def setUpConstants(cls):
5171 super(TestNAT44Out2InDPO, cls).setUpConstants()
5172 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
5175 def setUpClass(cls):
5176 super(TestNAT44Out2InDPO, cls).setUpClass()
5177 cls.vapi.cli("set log class nat level debug")
5180 cls.tcp_port_in = 6303
5181 cls.tcp_port_out = 6303
5182 cls.udp_port_in = 6304
5183 cls.udp_port_out = 6304
5184 cls.icmp_id_in = 6305
5185 cls.icmp_id_out = 6305
5186 cls.nat_addr = '10.0.0.3'
5187 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5188 cls.dst_ip4 = '192.168.70.1'
5190 cls.create_pg_interfaces(range(2))
5193 cls.pg0.config_ip4()
5194 cls.pg0.resolve_arp()
5197 cls.pg1.config_ip6()
5198 cls.pg1.resolve_ndp()
5200 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
5201 dst_address_length=0,
5202 next_hop_address=cls.pg1.remote_ip6n,
5203 next_hop_sw_if_index=cls.pg1.sw_if_index)
5206 super(TestNAT44Out2InDPO, cls).tearDownClass()
5209 def configure_xlat(self):
5210 self.dst_ip6_pfx = '1:2:3::'
5211 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
5213 self.dst_ip6_pfx_len = 96
5214 self.src_ip6_pfx = '4:5:6::'
5215 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
5217 self.src_ip6_pfx_len = 96
5218 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
5219 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
5220 '\x00\x00\x00\x00', 0, is_translation=1,
5223 def test_464xlat_ce(self):
5224 """ Test 464XLAT CE with NAT44 """
5226 nat_config = self.vapi.nat_show_config()
5227 self.assertEqual(1, nat_config.out2in_dpo)
5229 self.configure_xlat()
5231 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5232 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
5234 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
5235 self.dst_ip6_pfx_len)
5236 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
5237 self.src_ip6_pfx_len)
5240 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
5241 self.pg0.add_stream(pkts)
5242 self.pg_enable_capture(self.pg_interfaces)
5244 capture = self.pg1.get_capture(len(pkts))
5245 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
5248 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
5250 self.pg1.add_stream(pkts)
5251 self.pg_enable_capture(self.pg_interfaces)
5253 capture = self.pg0.get_capture(len(pkts))
5254 self.verify_capture_in(capture, self.pg0)
5256 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
5258 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
5259 self.nat_addr_n, is_add=0)
5261 def test_464xlat_ce_no_nat(self):
5262 """ Test 464XLAT CE without NAT44 """
5264 self.configure_xlat()
5266 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
5267 self.dst_ip6_pfx_len)
5268 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
5269 self.src_ip6_pfx_len)
5271 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
5272 self.pg0.add_stream(pkts)
5273 self.pg_enable_capture(self.pg_interfaces)
5275 capture = self.pg1.get_capture(len(pkts))
5276 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
5277 nat_ip=out_dst_ip6, same_port=True)
5279 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
5280 self.pg1.add_stream(pkts)
5281 self.pg_enable_capture(self.pg_interfaces)
5283 capture = self.pg0.get_capture(len(pkts))
5284 self.verify_capture_in(capture, self.pg0)
5287 class TestDeterministicNAT(MethodHolder):
5288 """ Deterministic NAT Test Cases """
5291 def setUpConstants(cls):
5292 super(TestDeterministicNAT, cls).setUpConstants()
5293 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
5296 def setUpClass(cls):
5297 super(TestDeterministicNAT, cls).setUpClass()
5298 cls.vapi.cli("set log class nat level debug")
5301 cls.tcp_port_in = 6303
5302 cls.tcp_external_port = 6303
5303 cls.udp_port_in = 6304
5304 cls.udp_external_port = 6304
5305 cls.icmp_id_in = 6305
5306 cls.nat_addr = '10.0.0.3'
5308 cls.create_pg_interfaces(range(3))
5309 cls.interfaces = list(cls.pg_interfaces)
5311 for i in cls.interfaces:
5316 cls.pg0.generate_remote_hosts(2)
5317 cls.pg0.configure_ipv4_neighbors()
5320 super(TestDeterministicNAT, cls).tearDownClass()
5323 def create_stream_in(self, in_if, out_if, ttl=64):
5325 Create packet stream for inside network
5327 :param in_if: Inside interface
5328 :param out_if: Outside interface
5329 :param ttl: TTL of generated packets
5333 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5334 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5335 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
5339 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5340 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5341 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
5345 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5346 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5347 ICMP(id=self.icmp_id_in, type='echo-request'))
5352 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
5354 Create packet stream for outside network
5356 :param out_if: Outside interface
5357 :param dst_ip: Destination IP address (Default use global NAT address)
5358 :param ttl: TTL of generated packets
5361 dst_ip = self.nat_addr
5364 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5365 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5366 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
5370 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5371 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5372 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
5376 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5377 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5378 ICMP(id=self.icmp_external_id, type='echo-reply'))
5383 def verify_capture_out(self, capture, nat_ip=None):
5385 Verify captured packets on outside network
5387 :param capture: Captured packets
5388 :param nat_ip: Translated IP address (Default use global NAT address)
5389 :param same_port: Sorce port number is not translated (Default False)
5392 nat_ip = self.nat_addr
5393 for packet in capture:
5395 self.assertEqual(packet[IP].src, nat_ip)
5396 if packet.haslayer(TCP):
5397 self.tcp_port_out = packet[TCP].sport
5398 elif packet.haslayer(UDP):
5399 self.udp_port_out = packet[UDP].sport
5401 self.icmp_external_id = packet[ICMP].id
5403 self.logger.error(ppp("Unexpected or invalid packet "
5404 "(outside network):", packet))
5407 def test_deterministic_mode(self):
5408 """ NAT plugin run deterministic mode """
5409 in_addr = '172.16.255.0'
5410 out_addr = '172.17.255.50'
5411 in_addr_t = '172.16.255.20'
5412 in_addr_n = socket.inet_aton(in_addr)
5413 out_addr_n = socket.inet_aton(out_addr)
5414 in_addr_t_n = socket.inet_aton(in_addr_t)
5418 nat_config = self.vapi.nat_show_config()
5419 self.assertEqual(1, nat_config.deterministic)
5421 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
5423 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
5424 self.assertEqual(rep1.out_addr[:4], out_addr_n)
5425 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
5426 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
5428 deterministic_mappings = self.vapi.nat_det_map_dump()
5429 self.assertEqual(len(deterministic_mappings), 1)
5430 dsm = deterministic_mappings[0]
5431 self.assertEqual(in_addr_n, dsm.in_addr[:4])
5432 self.assertEqual(in_plen, dsm.in_plen)
5433 self.assertEqual(out_addr_n, dsm.out_addr[:4])
5434 self.assertEqual(out_plen, dsm.out_plen)
5436 self.clear_nat_det()
5437 deterministic_mappings = self.vapi.nat_det_map_dump()
5438 self.assertEqual(len(deterministic_mappings), 0)
5440 def test_set_timeouts(self):
5441 """ Set deterministic NAT timeouts """
5442 timeouts_before = self.vapi.nat_get_timeouts()
5444 self.vapi.nat_set_timeouts(timeouts_before.udp + 10,
5445 timeouts_before.tcp_established + 10,
5446 timeouts_before.tcp_transitory + 10,
5447 timeouts_before.icmp + 10)
5449 timeouts_after = self.vapi.nat_get_timeouts()
5451 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
5452 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
5453 self.assertNotEqual(timeouts_before.tcp_established,
5454 timeouts_after.tcp_established)
5455 self.assertNotEqual(timeouts_before.tcp_transitory,
5456 timeouts_after.tcp_transitory)
5458 def test_det_in(self):
5459 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
5461 nat_ip = "10.0.0.10"
5463 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5465 socket.inet_aton(nat_ip),
5467 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5468 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5472 pkts = self.create_stream_in(self.pg0, self.pg1)
5473 self.pg0.add_stream(pkts)
5474 self.pg_enable_capture(self.pg_interfaces)
5476 capture = self.pg1.get_capture(len(pkts))
5477 self.verify_capture_out(capture, nat_ip)
5480 pkts = self.create_stream_out(self.pg1, nat_ip)
5481 self.pg1.add_stream(pkts)
5482 self.pg_enable_capture(self.pg_interfaces)
5484 capture = self.pg0.get_capture(len(pkts))
5485 self.verify_capture_in(capture, self.pg0)
5488 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
5489 self.assertEqual(len(sessions), 3)
5493 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5494 self.assertEqual(s.in_port, self.tcp_port_in)
5495 self.assertEqual(s.out_port, self.tcp_port_out)
5496 self.assertEqual(s.ext_port, self.tcp_external_port)
5500 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5501 self.assertEqual(s.in_port, self.udp_port_in)
5502 self.assertEqual(s.out_port, self.udp_port_out)
5503 self.assertEqual(s.ext_port, self.udp_external_port)
5507 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5508 self.assertEqual(s.in_port, self.icmp_id_in)
5509 self.assertEqual(s.out_port, self.icmp_external_id)
5511 def test_multiple_users(self):
5512 """ Deterministic NAT multiple users """
5514 nat_ip = "10.0.0.10"
5516 external_port = 6303
5518 host0 = self.pg0.remote_hosts[0]
5519 host1 = self.pg0.remote_hosts[1]
5521 self.vapi.nat_det_add_del_map(host0.ip4n,
5523 socket.inet_aton(nat_ip),
5525 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5526 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5530 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
5531 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
5532 TCP(sport=port_in, dport=external_port))
5533 self.pg0.add_stream(p)
5534 self.pg_enable_capture(self.pg_interfaces)
5536 capture = self.pg1.get_capture(1)
5541 self.assertEqual(ip.src, nat_ip)
5542 self.assertEqual(ip.dst, self.pg1.remote_ip4)
5543 self.assertEqual(tcp.dport, external_port)
5544 port_out0 = tcp.sport
5546 self.logger.error(ppp("Unexpected or invalid packet:", p))
5550 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
5551 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
5552 TCP(sport=port_in, dport=external_port))
5553 self.pg0.add_stream(p)
5554 self.pg_enable_capture(self.pg_interfaces)
5556 capture = self.pg1.get_capture(1)
5561 self.assertEqual(ip.src, nat_ip)
5562 self.assertEqual(ip.dst, self.pg1.remote_ip4)
5563 self.assertEqual(tcp.dport, external_port)
5564 port_out1 = tcp.sport
5566 self.logger.error(ppp("Unexpected or invalid packet:", p))
5569 dms = self.vapi.nat_det_map_dump()
5570 self.assertEqual(1, len(dms))
5571 self.assertEqual(2, dms[0].ses_num)
5574 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5575 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
5576 TCP(sport=external_port, dport=port_out0))
5577 self.pg1.add_stream(p)
5578 self.pg_enable_capture(self.pg_interfaces)
5580 capture = self.pg0.get_capture(1)
5585 self.assertEqual(ip.src, self.pg1.remote_ip4)
5586 self.assertEqual(ip.dst, host0.ip4)
5587 self.assertEqual(tcp.dport, port_in)
5588 self.assertEqual(tcp.sport, external_port)
5590 self.logger.error(ppp("Unexpected or invalid packet:", p))
5594 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5595 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
5596 TCP(sport=external_port, dport=port_out1))
5597 self.pg1.add_stream(p)
5598 self.pg_enable_capture(self.pg_interfaces)
5600 capture = self.pg0.get_capture(1)
5605 self.assertEqual(ip.src, self.pg1.remote_ip4)
5606 self.assertEqual(ip.dst, host1.ip4)
5607 self.assertEqual(tcp.dport, port_in)
5608 self.assertEqual(tcp.sport, external_port)
5610 self.logger.error(ppp("Unexpected or invalid packet", p))
5613 # session close api test
5614 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
5616 self.pg1.remote_ip4n,
5618 dms = self.vapi.nat_det_map_dump()
5619 self.assertEqual(dms[0].ses_num, 1)
5621 self.vapi.nat_det_close_session_in(host0.ip4n,
5623 self.pg1.remote_ip4n,
5625 dms = self.vapi.nat_det_map_dump()
5626 self.assertEqual(dms[0].ses_num, 0)
5628 def test_tcp_session_close_detection_in(self):
5629 """ Deterministic NAT TCP session close from inside network """
5630 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5632 socket.inet_aton(self.nat_addr),
5634 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5635 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5638 self.initiate_tcp_session(self.pg0, self.pg1)
5640 # close the session from inside
5642 # FIN packet in -> out
5643 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5644 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5645 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5647 self.pg0.add_stream(p)
5648 self.pg_enable_capture(self.pg_interfaces)
5650 self.pg1.get_capture(1)
5654 # ACK packet out -> in
5655 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5656 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5657 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5661 # FIN packet out -> in
5662 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5663 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5664 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5668 self.pg1.add_stream(pkts)
5669 self.pg_enable_capture(self.pg_interfaces)
5671 self.pg0.get_capture(2)
5673 # ACK packet in -> out
5674 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5675 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5676 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5678 self.pg0.add_stream(p)
5679 self.pg_enable_capture(self.pg_interfaces)
5681 self.pg1.get_capture(1)
5683 # Check if deterministic NAT44 closed the session
5684 dms = self.vapi.nat_det_map_dump()
5685 self.assertEqual(0, dms[0].ses_num)
5687 self.logger.error("TCP session termination failed")
5690 def test_tcp_session_close_detection_out(self):
5691 """ Deterministic NAT TCP session close from outside network """
5692 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5694 socket.inet_aton(self.nat_addr),
5696 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5697 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5700 self.initiate_tcp_session(self.pg0, self.pg1)
5702 # close the session from outside
5704 # FIN packet out -> in
5705 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5706 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5707 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5709 self.pg1.add_stream(p)
5710 self.pg_enable_capture(self.pg_interfaces)
5712 self.pg0.get_capture(1)
5716 # ACK packet in -> out
5717 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5718 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5719 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5723 # ACK packet in -> out
5724 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5725 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5726 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5730 self.pg0.add_stream(pkts)
5731 self.pg_enable_capture(self.pg_interfaces)
5733 self.pg1.get_capture(2)
5735 # ACK packet out -> in
5736 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5737 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5738 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5740 self.pg1.add_stream(p)
5741 self.pg_enable_capture(self.pg_interfaces)
5743 self.pg0.get_capture(1)
5745 # Check if deterministic NAT44 closed the session
5746 dms = self.vapi.nat_det_map_dump()
5747 self.assertEqual(0, dms[0].ses_num)
5749 self.logger.error("TCP session termination failed")
5752 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5753 def test_session_timeout(self):
5754 """ Deterministic NAT session timeouts """
5755 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5757 socket.inet_aton(self.nat_addr),
5759 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5760 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5763 self.initiate_tcp_session(self.pg0, self.pg1)
5764 self.vapi.nat_set_timeouts(5, 5, 5, 5)
5765 pkts = self.create_stream_in(self.pg0, self.pg1)
5766 self.pg0.add_stream(pkts)
5767 self.pg_enable_capture(self.pg_interfaces)
5769 capture = self.pg1.get_capture(len(pkts))
5772 dms = self.vapi.nat_det_map_dump()
5773 self.assertEqual(0, dms[0].ses_num)
5775 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5776 def test_session_limit_per_user(self):
5777 """ Deterministic NAT maximum sessions per user limit """
5778 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5780 socket.inet_aton(self.nat_addr),
5782 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5783 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5785 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
5786 src_address=self.pg2.local_ip4n,
5788 template_interval=10)
5789 self.vapi.nat_ipfix()
5792 for port in range(1025, 2025):
5793 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5794 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5795 UDP(sport=port, dport=port))
5798 self.pg0.add_stream(pkts)
5799 self.pg_enable_capture(self.pg_interfaces)
5801 capture = self.pg1.get_capture(len(pkts))
5803 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5804 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5805 UDP(sport=3001, dport=3002))
5806 self.pg0.add_stream(p)
5807 self.pg_enable_capture(self.pg_interfaces)
5809 capture = self.pg1.assert_nothing_captured()
5811 # verify ICMP error packet
5812 capture = self.pg0.get_capture(1)
5814 self.assertTrue(p.haslayer(ICMP))
5816 self.assertEqual(icmp.type, 3)
5817 self.assertEqual(icmp.code, 1)
5818 self.assertTrue(icmp.haslayer(IPerror))
5819 inner_ip = icmp[IPerror]
5820 self.assertEqual(inner_ip[UDPerror].sport, 3001)
5821 self.assertEqual(inner_ip[UDPerror].dport, 3002)
5823 dms = self.vapi.nat_det_map_dump()
5825 self.assertEqual(1000, dms[0].ses_num)
5827 # verify IPFIX logging
5828 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5830 capture = self.pg2.get_capture(2)
5831 ipfix = IPFIXDecoder()
5832 # first load template
5834 self.assertTrue(p.haslayer(IPFIX))
5835 if p.haslayer(Template):
5836 ipfix.add_template(p.getlayer(Template))
5837 # verify events in data set
5839 if p.haslayer(Data):
5840 data = ipfix.decode_data_set(p.getlayer(Set))
5841 self.verify_ipfix_max_entries_per_user(data,
5843 self.pg0.remote_ip4n)
5845 def clear_nat_det(self):
5847 Clear deterministic NAT configuration.
5849 self.vapi.nat_ipfix(enable=0)
5850 self.vapi.nat_set_timeouts()
5851 deterministic_mappings = self.vapi.nat_det_map_dump()
5852 for dsm in deterministic_mappings:
5853 self.vapi.nat_det_add_del_map(dsm.in_addr,
5859 interfaces = self.vapi.nat44_interface_dump()
5860 for intf in interfaces:
5861 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
5866 super(TestDeterministicNAT, self).tearDown()
5867 if not self.vpp_dead:
5868 self.logger.info(self.vapi.cli("show nat44 interfaces"))
5869 self.logger.info(self.vapi.cli("show nat timeouts"))
5871 self.vapi.cli("show nat44 deterministic mappings"))
5873 self.vapi.cli("show nat44 deterministic sessions"))
5874 self.clear_nat_det()
5877 class TestNAT64(MethodHolder):
5878 """ NAT64 Test Cases """
5881 def setUpConstants(cls):
5882 super(TestNAT64, cls).setUpConstants()
5883 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
5884 "nat64 st hash buckets 256", "}"])
5887 def setUpClass(cls):
5888 super(TestNAT64, cls).setUpClass()
5891 cls.tcp_port_in = 6303
5892 cls.tcp_port_out = 6303
5893 cls.udp_port_in = 6304
5894 cls.udp_port_out = 6304
5895 cls.icmp_id_in = 6305
5896 cls.icmp_id_out = 6305
5897 cls.nat_addr = '10.0.0.3'
5898 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5900 cls.vrf1_nat_addr = '10.0.10.3'
5901 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
5903 cls.ipfix_src_port = 4739
5904 cls.ipfix_domain_id = 1
5906 cls.create_pg_interfaces(range(6))
5907 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
5908 cls.ip6_interfaces.append(cls.pg_interfaces[2])
5909 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
5911 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
5913 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
5915 cls.pg0.generate_remote_hosts(2)
5917 for i in cls.ip6_interfaces:
5920 i.configure_ipv6_neighbors()
5922 for i in cls.ip4_interfaces:
5928 cls.pg3.config_ip4()
5929 cls.pg3.resolve_arp()
5930 cls.pg3.config_ip6()
5931 cls.pg3.configure_ipv6_neighbors()
5934 cls.pg5.config_ip6()
5937 super(TestNAT64, cls).tearDownClass()
5940 def test_nat64_inside_interface_handles_neighbor_advertisement(self):
5941 """ NAT64 inside interface handles Neighbor Advertisement """
5943 self.vapi.nat64_add_del_interface(self.pg5.sw_if_index)
5946 ping = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
5947 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
5948 ICMPv6EchoRequest())
5950 self.pg5.add_stream(pkts)
5951 self.pg_enable_capture(self.pg_interfaces)
5954 # Wait for Neighbor Solicitation
5955 capture = self.pg5.get_capture(len(pkts))
5958 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
5959 self.assertTrue(packet.haslayer(ICMPv6ND_NS))
5960 tgt = packet[ICMPv6ND_NS].tgt
5962 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5965 # Send Neighbor Advertisement
5966 p = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
5967 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
5968 ICMPv6ND_NA(tgt=tgt) /
5969 ICMPv6NDOptDstLLAddr(lladdr=self.pg5.remote_mac))
5971 self.pg5.add_stream(pkts)
5972 self.pg_enable_capture(self.pg_interfaces)
5975 # Try to send ping again
5977 self.pg5.add_stream(pkts)
5978 self.pg_enable_capture(self.pg_interfaces)
5981 # Wait for ping reply
5982 capture = self.pg5.get_capture(len(pkts))
5985 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
5986 self.assertEqual(packet[IPv6].dst, self.pg5.remote_ip6)
5987 self.assertTrue(packet.haslayer(ICMPv6EchoReply))
5989 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5992 def test_pool(self):
5993 """ Add/delete address to NAT64 pool """
5994 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
5996 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
5998 addresses = self.vapi.nat64_pool_addr_dump()
5999 self.assertEqual(len(addresses), 1)
6000 self.assertEqual(addresses[0].address, nat_addr)
6002 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
6004 addresses = self.vapi.nat64_pool_addr_dump()
6005 self.assertEqual(len(addresses), 0)
6007 def test_interface(self):
6008 """ Enable/disable NAT64 feature on the interface """
6009 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6010 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6012 interfaces = self.vapi.nat64_interface_dump()
6013 self.assertEqual(len(interfaces), 2)
6016 for intf in interfaces:
6017 if intf.sw_if_index == self.pg0.sw_if_index:
6018 self.assertEqual(intf.is_inside, 1)
6020 elif intf.sw_if_index == self.pg1.sw_if_index:
6021 self.assertEqual(intf.is_inside, 0)
6023 self.assertTrue(pg0_found)
6024 self.assertTrue(pg1_found)
6026 features = self.vapi.cli("show interface features pg0")
6027 self.assertNotEqual(features.find('nat64-in2out'), -1)
6028 features = self.vapi.cli("show interface features pg1")
6029 self.assertNotEqual(features.find('nat64-out2in'), -1)
6031 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
6032 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
6034 interfaces = self.vapi.nat64_interface_dump()
6035 self.assertEqual(len(interfaces), 0)
6037 def test_static_bib(self):
6038 """ Add/delete static BIB entry """
6039 in_addr = socket.inet_pton(socket.AF_INET6,
6040 '2001:db8:85a3::8a2e:370:7334')
6041 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
6044 proto = IP_PROTOS.tcp
6046 self.vapi.nat64_add_del_static_bib(in_addr,
6051 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
6056 self.assertEqual(bibe.i_addr, in_addr)
6057 self.assertEqual(bibe.o_addr, out_addr)
6058 self.assertEqual(bibe.i_port, in_port)
6059 self.assertEqual(bibe.o_port, out_port)
6060 self.assertEqual(static_bib_num, 1)
6062 self.vapi.nat64_add_del_static_bib(in_addr,
6068 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
6073 self.assertEqual(static_bib_num, 0)
6075 def test_set_timeouts(self):
6076 """ Set NAT64 timeouts """
6077 # verify default values
6078 timeouts = self.vapi.nat_get_timeouts()
6079 self.assertEqual(timeouts.udp, 300)
6080 self.assertEqual(timeouts.icmp, 60)
6081 self.assertEqual(timeouts.tcp_transitory, 240)
6082 self.assertEqual(timeouts.tcp_established, 7440)
6084 # set and verify custom values
6085 self.vapi.nat_set_timeouts(udp=200, icmp=30, tcp_transitory=250,
6086 tcp_established=7450)
6087 timeouts = self.vapi.nat_get_timeouts()
6088 self.assertEqual(timeouts.udp, 200)
6089 self.assertEqual(timeouts.icmp, 30)
6090 self.assertEqual(timeouts.tcp_transitory, 250)
6091 self.assertEqual(timeouts.tcp_established, 7450)
6093 def test_dynamic(self):
6094 """ NAT64 dynamic translation test """
6095 self.tcp_port_in = 6303
6096 self.udp_port_in = 6304
6097 self.icmp_id_in = 6305
6099 ses_num_start = self.nat64_get_ses_num()
6101 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6103 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6104 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6107 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6108 self.pg0.add_stream(pkts)
6109 self.pg_enable_capture(self.pg_interfaces)
6111 capture = self.pg1.get_capture(len(pkts))
6112 self.verify_capture_out(capture, nat_ip=self.nat_addr,
6113 dst_ip=self.pg1.remote_ip4)
6116 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6117 self.pg1.add_stream(pkts)
6118 self.pg_enable_capture(self.pg_interfaces)
6120 capture = self.pg0.get_capture(len(pkts))
6121 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
6122 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
6125 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6126 self.pg0.add_stream(pkts)
6127 self.pg_enable_capture(self.pg_interfaces)
6129 capture = self.pg1.get_capture(len(pkts))
6130 self.verify_capture_out(capture, nat_ip=self.nat_addr,
6131 dst_ip=self.pg1.remote_ip4)
6134 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6135 self.pg1.add_stream(pkts)
6136 self.pg_enable_capture(self.pg_interfaces)
6138 capture = self.pg0.get_capture(len(pkts))
6139 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
6141 ses_num_end = self.nat64_get_ses_num()
6143 self.assertEqual(ses_num_end - ses_num_start, 3)
6145 # tenant with specific VRF
6146 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
6147 self.vrf1_nat_addr_n,
6148 vrf_id=self.vrf1_id)
6149 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
6151 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
6152 self.pg2.add_stream(pkts)
6153 self.pg_enable_capture(self.pg_interfaces)
6155 capture = self.pg1.get_capture(len(pkts))
6156 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
6157 dst_ip=self.pg1.remote_ip4)
6159 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
6160 self.pg1.add_stream(pkts)
6161 self.pg_enable_capture(self.pg_interfaces)
6163 capture = self.pg2.get_capture(len(pkts))
6164 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
6166 def test_static(self):
6167 """ NAT64 static translation test """
6168 self.tcp_port_in = 60303
6169 self.udp_port_in = 60304
6170 self.icmp_id_in = 60305
6171 self.tcp_port_out = 60303
6172 self.udp_port_out = 60304
6173 self.icmp_id_out = 60305
6175 ses_num_start = self.nat64_get_ses_num()
6177 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6179 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6180 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6182 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
6187 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
6192 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
6199 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6200 self.pg0.add_stream(pkts)
6201 self.pg_enable_capture(self.pg_interfaces)
6203 capture = self.pg1.get_capture(len(pkts))
6204 self.verify_capture_out(capture, nat_ip=self.nat_addr,
6205 dst_ip=self.pg1.remote_ip4, same_port=True)
6208 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6209 self.pg1.add_stream(pkts)
6210 self.pg_enable_capture(self.pg_interfaces)
6212 capture = self.pg0.get_capture(len(pkts))
6213 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
6214 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
6216 ses_num_end = self.nat64_get_ses_num()
6218 self.assertEqual(ses_num_end - ses_num_start, 3)
6220 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
6221 def test_session_timeout(self):
6222 """ NAT64 session timeout """
6223 self.icmp_id_in = 1234
6224 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6226 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6227 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6228 self.vapi.nat_set_timeouts(icmp=5, tcp_transitory=5, tcp_established=5)
6230 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6231 self.pg0.add_stream(pkts)
6232 self.pg_enable_capture(self.pg_interfaces)
6234 capture = self.pg1.get_capture(len(pkts))
6236 ses_num_before_timeout = self.nat64_get_ses_num()
6240 # ICMP and TCP session after timeout
6241 ses_num_after_timeout = self.nat64_get_ses_num()
6242 self.assertEqual(ses_num_before_timeout - ses_num_after_timeout, 2)
6244 def test_icmp_error(self):
6245 """ NAT64 ICMP Error message translation """
6246 self.tcp_port_in = 6303
6247 self.udp_port_in = 6304
6248 self.icmp_id_in = 6305
6250 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6252 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6253 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6255 # send some packets to create sessions
6256 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6257 self.pg0.add_stream(pkts)
6258 self.pg_enable_capture(self.pg_interfaces)
6260 capture_ip4 = self.pg1.get_capture(len(pkts))
6261 self.verify_capture_out(capture_ip4,
6262 nat_ip=self.nat_addr,
6263 dst_ip=self.pg1.remote_ip4)
6265 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6266 self.pg1.add_stream(pkts)
6267 self.pg_enable_capture(self.pg_interfaces)
6269 capture_ip6 = self.pg0.get_capture(len(pkts))
6270 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
6271 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
6272 self.pg0.remote_ip6)
6275 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6276 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
6277 ICMPv6DestUnreach(code=1) /
6278 packet[IPv6] for packet in capture_ip6]
6279 self.pg0.add_stream(pkts)
6280 self.pg_enable_capture(self.pg_interfaces)
6282 capture = self.pg1.get_capture(len(pkts))
6283 for packet in capture:
6285 self.assertEqual(packet[IP].src, self.nat_addr)
6286 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
6287 self.assertEqual(packet[ICMP].type, 3)
6288 self.assertEqual(packet[ICMP].code, 13)
6289 inner = packet[IPerror]
6290 self.assertEqual(inner.src, self.pg1.remote_ip4)
6291 self.assertEqual(inner.dst, self.nat_addr)
6292 self.assert_packet_checksums_valid(packet)
6293 if inner.haslayer(TCPerror):
6294 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
6295 elif inner.haslayer(UDPerror):
6296 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
6298 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
6300 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6304 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6305 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
6306 ICMP(type=3, code=13) /
6307 packet[IP] for packet in capture_ip4]
6308 self.pg1.add_stream(pkts)
6309 self.pg_enable_capture(self.pg_interfaces)
6311 capture = self.pg0.get_capture(len(pkts))
6312 for packet in capture:
6314 self.assertEqual(packet[IPv6].src, ip.src)
6315 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6316 icmp = packet[ICMPv6DestUnreach]
6317 self.assertEqual(icmp.code, 1)
6318 inner = icmp[IPerror6]
6319 self.assertEqual(inner.src, self.pg0.remote_ip6)
6320 self.assertEqual(inner.dst, ip.src)
6321 self.assert_icmpv6_checksum_valid(packet)
6322 if inner.haslayer(TCPerror):
6323 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
6324 elif inner.haslayer(UDPerror):
6325 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
6327 self.assertEqual(inner[ICMPv6EchoRequest].id,
6330 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6333 def test_hairpinning(self):
6334 """ NAT64 hairpinning """
6336 client = self.pg0.remote_hosts[0]
6337 server = self.pg0.remote_hosts[1]
6338 server_tcp_in_port = 22
6339 server_tcp_out_port = 4022
6340 server_udp_in_port = 23
6341 server_udp_out_port = 4023
6342 client_tcp_in_port = 1234
6343 client_udp_in_port = 1235
6344 client_tcp_out_port = 0
6345 client_udp_out_port = 0
6346 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
6347 nat_addr_ip6 = ip.src
6349 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6351 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6352 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6354 self.vapi.nat64_add_del_static_bib(server.ip6n,
6357 server_tcp_out_port,
6359 self.vapi.nat64_add_del_static_bib(server.ip6n,
6362 server_udp_out_port,
6367 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6368 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6369 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
6371 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6372 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6373 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
6375 self.pg0.add_stream(pkts)
6376 self.pg_enable_capture(self.pg_interfaces)
6378 capture = self.pg0.get_capture(len(pkts))
6379 for packet in capture:
6381 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6382 self.assertEqual(packet[IPv6].dst, server.ip6)
6383 self.assert_packet_checksums_valid(packet)
6384 if packet.haslayer(TCP):
6385 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
6386 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
6387 client_tcp_out_port = packet[TCP].sport
6389 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
6390 self.assertEqual(packet[UDP].dport, server_udp_in_port)
6391 client_udp_out_port = packet[UDP].sport
6393 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6398 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6399 IPv6(src=server.ip6, dst=nat_addr_ip6) /
6400 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
6402 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6403 IPv6(src=server.ip6, dst=nat_addr_ip6) /
6404 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
6406 self.pg0.add_stream(pkts)
6407 self.pg_enable_capture(self.pg_interfaces)
6409 capture = self.pg0.get_capture(len(pkts))
6410 for packet in capture:
6412 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6413 self.assertEqual(packet[IPv6].dst, client.ip6)
6414 self.assert_packet_checksums_valid(packet)
6415 if packet.haslayer(TCP):
6416 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
6417 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
6419 self.assertEqual(packet[UDP].sport, server_udp_out_port)
6420 self.assertEqual(packet[UDP].dport, client_udp_in_port)
6422 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6427 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6428 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6429 ICMPv6DestUnreach(code=1) /
6430 packet[IPv6] for packet in capture]
6431 self.pg0.add_stream(pkts)
6432 self.pg_enable_capture(self.pg_interfaces)
6434 capture = self.pg0.get_capture(len(pkts))
6435 for packet in capture:
6437 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6438 self.assertEqual(packet[IPv6].dst, server.ip6)
6439 icmp = packet[ICMPv6DestUnreach]
6440 self.assertEqual(icmp.code, 1)
6441 inner = icmp[IPerror6]
6442 self.assertEqual(inner.src, server.ip6)
6443 self.assertEqual(inner.dst, nat_addr_ip6)
6444 self.assert_packet_checksums_valid(packet)
6445 if inner.haslayer(TCPerror):
6446 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
6447 self.assertEqual(inner[TCPerror].dport,
6448 client_tcp_out_port)
6450 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
6451 self.assertEqual(inner[UDPerror].dport,
6452 client_udp_out_port)
6454 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6457 def test_prefix(self):
6458 """ NAT64 Network-Specific Prefix """
6460 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6462 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6463 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6464 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
6465 self.vrf1_nat_addr_n,
6466 vrf_id=self.vrf1_id)
6467 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
6470 global_pref64 = "2001:db8::"
6471 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
6472 global_pref64_len = 32
6473 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
6475 prefix = self.vapi.nat64_prefix_dump()
6476 self.assertEqual(len(prefix), 1)
6477 self.assertEqual(prefix[0].prefix, global_pref64_n)
6478 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
6479 self.assertEqual(prefix[0].vrf_id, 0)
6481 # Add tenant specific prefix
6482 vrf1_pref64 = "2001:db8:122:300::"
6483 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
6484 vrf1_pref64_len = 56
6485 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
6487 vrf_id=self.vrf1_id)
6488 prefix = self.vapi.nat64_prefix_dump()
6489 self.assertEqual(len(prefix), 2)
6492 pkts = self.create_stream_in_ip6(self.pg0,
6495 plen=global_pref64_len)
6496 self.pg0.add_stream(pkts)
6497 self.pg_enable_capture(self.pg_interfaces)
6499 capture = self.pg1.get_capture(len(pkts))
6500 self.verify_capture_out(capture, nat_ip=self.nat_addr,
6501 dst_ip=self.pg1.remote_ip4)
6503 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6504 self.pg1.add_stream(pkts)
6505 self.pg_enable_capture(self.pg_interfaces)
6507 capture = self.pg0.get_capture(len(pkts))
6508 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
6511 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
6513 # Tenant specific prefix
6514 pkts = self.create_stream_in_ip6(self.pg2,
6517 plen=vrf1_pref64_len)
6518 self.pg2.add_stream(pkts)
6519 self.pg_enable_capture(self.pg_interfaces)
6521 capture = self.pg1.get_capture(len(pkts))
6522 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
6523 dst_ip=self.pg1.remote_ip4)
6525 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
6526 self.pg1.add_stream(pkts)
6527 self.pg_enable_capture(self.pg_interfaces)
6529 capture = self.pg2.get_capture(len(pkts))
6530 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
6533 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
6535 def test_unknown_proto(self):
6536 """ NAT64 translate packet with unknown protocol """
6538 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6540 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6541 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6542 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6545 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6546 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
6547 TCP(sport=self.tcp_port_in, dport=20))
6548 self.pg0.add_stream(p)
6549 self.pg_enable_capture(self.pg_interfaces)
6551 p = self.pg1.get_capture(1)
6553 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6554 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
6556 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
6557 TCP(sport=1234, dport=1234))
6558 self.pg0.add_stream(p)
6559 self.pg_enable_capture(self.pg_interfaces)
6561 p = self.pg1.get_capture(1)
6564 self.assertEqual(packet[IP].src, self.nat_addr)
6565 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
6566 self.assertTrue(packet.haslayer(GRE))
6567 self.assert_packet_checksums_valid(packet)
6569 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6573 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6574 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
6576 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
6577 TCP(sport=1234, dport=1234))
6578 self.pg1.add_stream(p)
6579 self.pg_enable_capture(self.pg_interfaces)
6581 p = self.pg0.get_capture(1)
6584 self.assertEqual(packet[IPv6].src, remote_ip6)
6585 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6586 self.assertEqual(packet[IPv6].nh, 47)
6588 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6591 def test_hairpinning_unknown_proto(self):
6592 """ NAT64 translate packet with unknown protocol - hairpinning """
6594 client = self.pg0.remote_hosts[0]
6595 server = self.pg0.remote_hosts[1]
6596 server_tcp_in_port = 22
6597 server_tcp_out_port = 4022
6598 client_tcp_in_port = 1234
6599 client_tcp_out_port = 1235
6600 server_nat_ip = "10.0.0.100"
6601 client_nat_ip = "10.0.0.110"
6602 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
6603 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
6604 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
6605 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
6607 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
6609 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6610 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6612 self.vapi.nat64_add_del_static_bib(server.ip6n,
6615 server_tcp_out_port,
6618 self.vapi.nat64_add_del_static_bib(server.ip6n,
6624 self.vapi.nat64_add_del_static_bib(client.ip6n,
6627 client_tcp_out_port,
6631 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6632 IPv6(src=client.ip6, dst=server_nat_ip6) /
6633 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
6634 self.pg0.add_stream(p)
6635 self.pg_enable_capture(self.pg_interfaces)
6637 p = self.pg0.get_capture(1)
6639 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6640 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
6642 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
6643 TCP(sport=1234, dport=1234))
6644 self.pg0.add_stream(p)
6645 self.pg_enable_capture(self.pg_interfaces)
6647 p = self.pg0.get_capture(1)
6650 self.assertEqual(packet[IPv6].src, client_nat_ip6)
6651 self.assertEqual(packet[IPv6].dst, server.ip6)
6652 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
6654 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6658 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6659 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
6661 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
6662 TCP(sport=1234, dport=1234))
6663 self.pg0.add_stream(p)
6664 self.pg_enable_capture(self.pg_interfaces)
6666 p = self.pg0.get_capture(1)
6669 self.assertEqual(packet[IPv6].src, server_nat_ip6)
6670 self.assertEqual(packet[IPv6].dst, client.ip6)
6671 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
6673 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6676 def test_one_armed_nat64(self):
6677 """ One armed NAT64 """
6679 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
6683 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6685 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
6686 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
6689 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
6690 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
6691 TCP(sport=12345, dport=80))
6692 self.pg3.add_stream(p)
6693 self.pg_enable_capture(self.pg_interfaces)
6695 capture = self.pg3.get_capture(1)
6700 self.assertEqual(ip.src, self.nat_addr)
6701 self.assertEqual(ip.dst, self.pg3.remote_ip4)
6702 self.assertNotEqual(tcp.sport, 12345)
6703 external_port = tcp.sport
6704 self.assertEqual(tcp.dport, 80)
6705 self.assert_packet_checksums_valid(p)
6707 self.logger.error(ppp("Unexpected or invalid packet:", p))
6711 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
6712 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
6713 TCP(sport=80, dport=external_port))
6714 self.pg3.add_stream(p)
6715 self.pg_enable_capture(self.pg_interfaces)
6717 capture = self.pg3.get_capture(1)
6722 self.assertEqual(ip.src, remote_host_ip6)
6723 self.assertEqual(ip.dst, self.pg3.remote_ip6)
6724 self.assertEqual(tcp.sport, 80)
6725 self.assertEqual(tcp.dport, 12345)
6726 self.assert_packet_checksums_valid(p)
6728 self.logger.error(ppp("Unexpected or invalid packet:", p))
6731 def test_frag_in_order(self):
6732 """ NAT64 translate fragments arriving in order """
6733 self.tcp_port_in = random.randint(1025, 65535)
6735 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6737 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6738 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6740 reass = self.vapi.nat_reass_dump()
6741 reass_n_start = len(reass)
6745 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6746 self.tcp_port_in, 20, data)
6747 self.pg0.add_stream(pkts)
6748 self.pg_enable_capture(self.pg_interfaces)
6750 frags = self.pg1.get_capture(len(pkts))
6751 p = self.reass_frags_and_verify(frags,
6753 self.pg1.remote_ip4)
6754 self.assertEqual(p[TCP].dport, 20)
6755 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
6756 self.tcp_port_out = p[TCP].sport
6757 self.assertEqual(data, p[Raw].load)
6760 data = "A" * 4 + "b" * 16 + "C" * 3
6761 pkts = self.create_stream_frag(self.pg1,
6766 self.pg1.add_stream(pkts)
6767 self.pg_enable_capture(self.pg_interfaces)
6769 frags = self.pg0.get_capture(len(pkts))
6770 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6771 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
6772 self.assertEqual(p[TCP].sport, 20)
6773 self.assertEqual(p[TCP].dport, self.tcp_port_in)
6774 self.assertEqual(data, p[Raw].load)
6776 reass = self.vapi.nat_reass_dump()
6777 reass_n_end = len(reass)
6779 self.assertEqual(reass_n_end - reass_n_start, 2)
6781 def test_reass_hairpinning(self):
6782 """ NAT64 fragments hairpinning """
6784 server = self.pg0.remote_hosts[1]
6785 server_in_port = random.randint(1025, 65535)
6786 server_out_port = random.randint(1025, 65535)
6787 client_in_port = random.randint(1025, 65535)
6788 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
6789 nat_addr_ip6 = ip.src
6791 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6793 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6794 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6796 # add static BIB entry for server
6797 self.vapi.nat64_add_del_static_bib(server.ip6n,
6803 # send packet from host to server
6804 pkts = self.create_stream_frag_ip6(self.pg0,
6809 self.pg0.add_stream(pkts)
6810 self.pg_enable_capture(self.pg_interfaces)
6812 frags = self.pg0.get_capture(len(pkts))
6813 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
6814 self.assertNotEqual(p[TCP].sport, client_in_port)
6815 self.assertEqual(p[TCP].dport, server_in_port)
6816 self.assertEqual(data, p[Raw].load)
6818 def test_frag_out_of_order(self):
6819 """ NAT64 translate fragments arriving out of order """
6820 self.tcp_port_in = random.randint(1025, 65535)
6822 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6824 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6825 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6829 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6830 self.tcp_port_in, 20, data)
6832 self.pg0.add_stream(pkts)
6833 self.pg_enable_capture(self.pg_interfaces)
6835 frags = self.pg1.get_capture(len(pkts))
6836 p = self.reass_frags_and_verify(frags,
6838 self.pg1.remote_ip4)
6839 self.assertEqual(p[TCP].dport, 20)
6840 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
6841 self.tcp_port_out = p[TCP].sport
6842 self.assertEqual(data, p[Raw].load)
6845 data = "A" * 4 + "B" * 16 + "C" * 3
6846 pkts = self.create_stream_frag(self.pg1,
6852 self.pg1.add_stream(pkts)
6853 self.pg_enable_capture(self.pg_interfaces)
6855 frags = self.pg0.get_capture(len(pkts))
6856 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6857 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
6858 self.assertEqual(p[TCP].sport, 20)
6859 self.assertEqual(p[TCP].dport, self.tcp_port_in)
6860 self.assertEqual(data, p[Raw].load)
6862 def test_interface_addr(self):
6863 """ Acquire NAT64 pool addresses from interface """
6864 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
6866 # no address in NAT64 pool
6867 adresses = self.vapi.nat44_address_dump()
6868 self.assertEqual(0, len(adresses))
6870 # configure interface address and check NAT64 address pool
6871 self.pg4.config_ip4()
6872 addresses = self.vapi.nat64_pool_addr_dump()
6873 self.assertEqual(len(addresses), 1)
6874 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
6876 # remove interface address and check NAT64 address pool
6877 self.pg4.unconfig_ip4()
6878 addresses = self.vapi.nat64_pool_addr_dump()
6879 self.assertEqual(0, len(adresses))
6881 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
6882 def test_ipfix_max_bibs_sessions(self):
6883 """ IPFIX logging maximum session and BIB entries exceeded """
6886 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
6890 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6892 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6893 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6897 for i in range(0, max_bibs):
6898 src = "fd01:aa::%x" % (i)
6899 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6900 IPv6(src=src, dst=remote_host_ip6) /
6901 TCP(sport=12345, dport=80))
6903 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6904 IPv6(src=src, dst=remote_host_ip6) /
6905 TCP(sport=12345, dport=22))
6907 self.pg0.add_stream(pkts)
6908 self.pg_enable_capture(self.pg_interfaces)
6910 self.pg1.get_capture(max_sessions)
6912 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6913 src_address=self.pg3.local_ip4n,
6915 template_interval=10)
6916 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6917 src_port=self.ipfix_src_port)
6919 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6920 IPv6(src=src, dst=remote_host_ip6) /
6921 TCP(sport=12345, dport=25))
6922 self.pg0.add_stream(p)
6923 self.pg_enable_capture(self.pg_interfaces)
6925 self.pg1.assert_nothing_captured()
6927 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6928 capture = self.pg3.get_capture(9)
6929 ipfix = IPFIXDecoder()
6930 # first load template
6932 self.assertTrue(p.haslayer(IPFIX))
6933 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6934 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6935 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6936 self.assertEqual(p[UDP].dport, 4739)
6937 self.assertEqual(p[IPFIX].observationDomainID,
6938 self.ipfix_domain_id)
6939 if p.haslayer(Template):
6940 ipfix.add_template(p.getlayer(Template))
6941 # verify events in data set
6943 if p.haslayer(Data):
6944 data = ipfix.decode_data_set(p.getlayer(Set))
6945 self.verify_ipfix_max_sessions(data, max_sessions)
6947 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6948 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
6949 TCP(sport=12345, dport=80))
6950 self.pg0.add_stream(p)
6951 self.pg_enable_capture(self.pg_interfaces)
6953 self.pg1.assert_nothing_captured()
6955 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6956 capture = self.pg3.get_capture(1)
6957 # verify events in data set
6959 self.assertTrue(p.haslayer(IPFIX))
6960 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6961 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6962 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6963 self.assertEqual(p[UDP].dport, 4739)
6964 self.assertEqual(p[IPFIX].observationDomainID,
6965 self.ipfix_domain_id)
6966 if p.haslayer(Data):
6967 data = ipfix.decode_data_set(p.getlayer(Set))
6968 self.verify_ipfix_max_bibs(data, max_bibs)
6970 def test_ipfix_max_frags(self):
6971 """ IPFIX logging maximum fragments pending reassembly exceeded """
6972 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6974 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6975 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6976 self.vapi.nat_set_reass(max_frag=1, is_ip6=1)
6977 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6978 src_address=self.pg3.local_ip4n,
6980 template_interval=10)
6981 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6982 src_port=self.ipfix_src_port)
6985 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6986 self.tcp_port_in, 20, data)
6988 self.pg0.add_stream(pkts)
6989 self.pg_enable_capture(self.pg_interfaces)
6991 self.pg1.assert_nothing_captured()
6993 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6994 capture = self.pg3.get_capture(9)
6995 ipfix = IPFIXDecoder()
6996 # first load template
6998 self.assertTrue(p.haslayer(IPFIX))
6999 self.assertEqual(p[IP].src, self.pg3.local_ip4)
7000 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
7001 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
7002 self.assertEqual(p[UDP].dport, 4739)
7003 self.assertEqual(p[IPFIX].observationDomainID,
7004 self.ipfix_domain_id)
7005 if p.haslayer(Template):
7006 ipfix.add_template(p.getlayer(Template))
7007 # verify events in data set
7009 if p.haslayer(Data):
7010 data = ipfix.decode_data_set(p.getlayer(Set))
7011 self.verify_ipfix_max_fragments_ip6(data, 1,
7012 self.pg0.remote_ip6n)
7014 def test_ipfix_bib_ses(self):
7015 """ IPFIX logging NAT64 BIB/session create and delete events """
7016 self.tcp_port_in = random.randint(1025, 65535)
7017 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
7021 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
7023 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
7024 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
7025 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
7026 src_address=self.pg3.local_ip4n,
7028 template_interval=10)
7029 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
7030 src_port=self.ipfix_src_port)
7033 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
7034 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
7035 TCP(sport=self.tcp_port_in, dport=25))
7036 self.pg0.add_stream(p)
7037 self.pg_enable_capture(self.pg_interfaces)
7039 p = self.pg1.get_capture(1)
7040 self.tcp_port_out = p[0][TCP].sport
7041 self.vapi.cli("ipfix flush") # FIXME this should be an API call
7042 capture = self.pg3.get_capture(10)
7043 ipfix = IPFIXDecoder()
7044 # first load template
7046 self.assertTrue(p.haslayer(IPFIX))
7047 self.assertEqual(p[IP].src, self.pg3.local_ip4)
7048 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
7049 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
7050 self.assertEqual(p[UDP].dport, 4739)
7051 self.assertEqual(p[IPFIX].observationDomainID,
7052 self.ipfix_domain_id)
7053 if p.haslayer(Template):
7054 ipfix.add_template(p.getlayer(Template))
7055 # verify events in data set
7057 if p.haslayer(Data):
7058 data = ipfix.decode_data_set(p.getlayer(Set))
7059 if ord(data[0][230]) == 10:
7060 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
7061 elif ord(data[0][230]) == 6:
7062 self.verify_ipfix_nat64_ses(data,
7064 self.pg0.remote_ip6n,
7065 self.pg1.remote_ip4,
7068 self.logger.error(ppp("Unexpected or invalid packet: ", p))
7071 self.pg_enable_capture(self.pg_interfaces)
7072 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
7075 self.vapi.cli("ipfix flush") # FIXME this should be an API call
7076 capture = self.pg3.get_capture(2)
7077 # verify events in data set
7079 self.assertTrue(p.haslayer(IPFIX))
7080 self.assertEqual(p[IP].src, self.pg3.local_ip4)
7081 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
7082 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
7083 self.assertEqual(p[UDP].dport, 4739)
7084 self.assertEqual(p[IPFIX].observationDomainID,
7085 self.ipfix_domain_id)
7086 if p.haslayer(Data):
7087 data = ipfix.decode_data_set(p.getlayer(Set))
7088 if ord(data[0][230]) == 11:
7089 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
7090 elif ord(data[0][230]) == 7:
7091 self.verify_ipfix_nat64_ses(data,
7093 self.pg0.remote_ip6n,
7094 self.pg1.remote_ip4,
7097 self.logger.error(ppp("Unexpected or invalid packet: ", p))
7099 def nat64_get_ses_num(self):
7101 Return number of active NAT64 sessions.
7103 st = self.vapi.nat64_st_dump()
7106 def clear_nat64(self):
7108 Clear NAT64 configuration.
7110 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
7111 domain_id=self.ipfix_domain_id)
7112 self.ipfix_src_port = 4739
7113 self.ipfix_domain_id = 1
7115 self.vapi.nat_set_timeouts()
7117 interfaces = self.vapi.nat64_interface_dump()
7118 for intf in interfaces:
7119 if intf.is_inside > 1:
7120 self.vapi.nat64_add_del_interface(intf.sw_if_index,
7123 self.vapi.nat64_add_del_interface(intf.sw_if_index,
7127 bib = self.vapi.nat64_bib_dump(255)
7130 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
7138 adresses = self.vapi.nat64_pool_addr_dump()
7139 for addr in adresses:
7140 self.vapi.nat64_add_del_pool_addr_range(addr.address,
7145 prefixes = self.vapi.nat64_prefix_dump()
7146 for prefix in prefixes:
7147 self.vapi.nat64_add_del_prefix(prefix.prefix,
7149 vrf_id=prefix.vrf_id,
7153 super(TestNAT64, self).tearDown()
7154 if not self.vpp_dead:
7155 self.logger.info(self.vapi.cli("show nat64 pool"))
7156 self.logger.info(self.vapi.cli("show nat64 interfaces"))
7157 self.logger.info(self.vapi.cli("show nat64 prefix"))
7158 self.logger.info(self.vapi.cli("show nat64 bib all"))
7159 self.logger.info(self.vapi.cli("show nat64 session table all"))
7160 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
7164 class TestDSlite(MethodHolder):
7165 """ DS-Lite Test Cases """
7168 def setUpClass(cls):
7169 super(TestDSlite, cls).setUpClass()
7172 cls.nat_addr = '10.0.0.3'
7173 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
7175 cls.create_pg_interfaces(range(2))
7177 cls.pg0.config_ip4()
7178 cls.pg0.resolve_arp()
7180 cls.pg1.config_ip6()
7181 cls.pg1.generate_remote_hosts(2)
7182 cls.pg1.configure_ipv6_neighbors()
7185 super(TestDSlite, cls).tearDownClass()
7188 def test_dslite(self):
7189 """ Test DS-Lite """
7190 nat_config = self.vapi.nat_show_config()
7191 self.assertEqual(0, nat_config.dslite_ce)
7193 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
7195 aftr_ip4 = '192.0.0.1'
7196 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
7197 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
7198 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
7199 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
7202 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7203 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
7204 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
7205 UDP(sport=20000, dport=10000))
7206 self.pg1.add_stream(p)
7207 self.pg_enable_capture(self.pg_interfaces)
7209 capture = self.pg0.get_capture(1)
7210 capture = capture[0]
7211 self.assertFalse(capture.haslayer(IPv6))
7212 self.assertEqual(capture[IP].src, self.nat_addr)
7213 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
7214 self.assertNotEqual(capture[UDP].sport, 20000)
7215 self.assertEqual(capture[UDP].dport, 10000)
7216 self.assert_packet_checksums_valid(capture)
7217 out_port = capture[UDP].sport
7219 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7220 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
7221 UDP(sport=10000, dport=out_port))
7222 self.pg0.add_stream(p)
7223 self.pg_enable_capture(self.pg_interfaces)
7225 capture = self.pg1.get_capture(1)
7226 capture = capture[0]
7227 self.assertEqual(capture[IPv6].src, aftr_ip6)
7228 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
7229 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7230 self.assertEqual(capture[IP].dst, '192.168.1.1')
7231 self.assertEqual(capture[UDP].sport, 10000)
7232 self.assertEqual(capture[UDP].dport, 20000)
7233 self.assert_packet_checksums_valid(capture)
7236 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7237 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
7238 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
7239 TCP(sport=20001, dport=10001))
7240 self.pg1.add_stream(p)
7241 self.pg_enable_capture(self.pg_interfaces)
7243 capture = self.pg0.get_capture(1)
7244 capture = capture[0]
7245 self.assertFalse(capture.haslayer(IPv6))
7246 self.assertEqual(capture[IP].src, self.nat_addr)
7247 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
7248 self.assertNotEqual(capture[TCP].sport, 20001)
7249 self.assertEqual(capture[TCP].dport, 10001)
7250 self.assert_packet_checksums_valid(capture)
7251 out_port = capture[TCP].sport
7253 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7254 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
7255 TCP(sport=10001, dport=out_port))
7256 self.pg0.add_stream(p)
7257 self.pg_enable_capture(self.pg_interfaces)
7259 capture = self.pg1.get_capture(1)
7260 capture = capture[0]
7261 self.assertEqual(capture[IPv6].src, aftr_ip6)
7262 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
7263 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7264 self.assertEqual(capture[IP].dst, '192.168.1.1')
7265 self.assertEqual(capture[TCP].sport, 10001)
7266 self.assertEqual(capture[TCP].dport, 20001)
7267 self.assert_packet_checksums_valid(capture)
7270 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7271 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
7272 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
7273 ICMP(id=4000, type='echo-request'))
7274 self.pg1.add_stream(p)
7275 self.pg_enable_capture(self.pg_interfaces)
7277 capture = self.pg0.get_capture(1)
7278 capture = capture[0]
7279 self.assertFalse(capture.haslayer(IPv6))
7280 self.assertEqual(capture[IP].src, self.nat_addr)
7281 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
7282 self.assertNotEqual(capture[ICMP].id, 4000)
7283 self.assert_packet_checksums_valid(capture)
7284 out_id = capture[ICMP].id
7286 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7287 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
7288 ICMP(id=out_id, type='echo-reply'))
7289 self.pg0.add_stream(p)
7290 self.pg_enable_capture(self.pg_interfaces)
7292 capture = self.pg1.get_capture(1)
7293 capture = capture[0]
7294 self.assertEqual(capture[IPv6].src, aftr_ip6)
7295 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
7296 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7297 self.assertEqual(capture[IP].dst, '192.168.1.1')
7298 self.assertEqual(capture[ICMP].id, 4000)
7299 self.assert_packet_checksums_valid(capture)
7301 # ping DS-Lite AFTR tunnel endpoint address
7302 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7303 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
7304 ICMPv6EchoRequest())
7305 self.pg1.add_stream(p)
7306 self.pg_enable_capture(self.pg_interfaces)
7308 capture = self.pg1.get_capture(1)
7309 capture = capture[0]
7310 self.assertEqual(capture[IPv6].src, aftr_ip6)
7311 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
7312 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
7315 super(TestDSlite, self).tearDown()
7316 if not self.vpp_dead:
7317 self.logger.info(self.vapi.cli("show dslite pool"))
7319 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
7320 self.logger.info(self.vapi.cli("show dslite sessions"))
7323 class TestDSliteCE(MethodHolder):
7324 """ DS-Lite CE Test Cases """
7327 def setUpConstants(cls):
7328 super(TestDSliteCE, cls).setUpConstants()
7329 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
7332 def setUpClass(cls):
7333 super(TestDSliteCE, cls).setUpClass()
7336 cls.create_pg_interfaces(range(2))
7338 cls.pg0.config_ip4()
7339 cls.pg0.resolve_arp()
7341 cls.pg1.config_ip6()
7342 cls.pg1.generate_remote_hosts(1)
7343 cls.pg1.configure_ipv6_neighbors()
7346 super(TestDSliteCE, cls).tearDownClass()
7349 def test_dslite_ce(self):
7350 """ Test DS-Lite CE """
7352 nat_config = self.vapi.nat_show_config()
7353 self.assertEqual(1, nat_config.dslite_ce)
7355 b4_ip4 = '192.0.0.2'
7356 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
7357 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
7358 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
7359 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
7361 aftr_ip4 = '192.0.0.1'
7362 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
7363 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
7364 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
7365 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
7367 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
7368 dst_address_length=128,
7369 next_hop_address=self.pg1.remote_ip6n,
7370 next_hop_sw_if_index=self.pg1.sw_if_index,
7374 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7375 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
7376 UDP(sport=10000, dport=20000))
7377 self.pg0.add_stream(p)
7378 self.pg_enable_capture(self.pg_interfaces)
7380 capture = self.pg1.get_capture(1)
7381 capture = capture[0]
7382 self.assertEqual(capture[IPv6].src, b4_ip6)
7383 self.assertEqual(capture[IPv6].dst, aftr_ip6)
7384 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7385 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
7386 self.assertEqual(capture[UDP].sport, 10000)
7387 self.assertEqual(capture[UDP].dport, 20000)
7388 self.assert_packet_checksums_valid(capture)
7391 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7392 IPv6(dst=b4_ip6, src=aftr_ip6) /
7393 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
7394 UDP(sport=20000, dport=10000))
7395 self.pg1.add_stream(p)
7396 self.pg_enable_capture(self.pg_interfaces)
7398 capture = self.pg0.get_capture(1)
7399 capture = capture[0]
7400 self.assertFalse(capture.haslayer(IPv6))
7401 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
7402 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
7403 self.assertEqual(capture[UDP].sport, 20000)
7404 self.assertEqual(capture[UDP].dport, 10000)
7405 self.assert_packet_checksums_valid(capture)
7407 # ping DS-Lite B4 tunnel endpoint address
7408 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7409 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
7410 ICMPv6EchoRequest())
7411 self.pg1.add_stream(p)
7412 self.pg_enable_capture(self.pg_interfaces)
7414 capture = self.pg1.get_capture(1)
7415 capture = capture[0]
7416 self.assertEqual(capture[IPv6].src, b4_ip6)
7417 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
7418 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
7421 super(TestDSliteCE, self).tearDown()
7422 if not self.vpp_dead:
7424 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
7426 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
7429 class TestNAT66(MethodHolder):
7430 """ NAT66 Test Cases """
7433 def setUpClass(cls):
7434 super(TestNAT66, cls).setUpClass()
7437 cls.nat_addr = 'fd01:ff::2'
7438 cls.nat_addr_n = socket.inet_pton(socket.AF_INET6, cls.nat_addr)
7440 cls.create_pg_interfaces(range(2))
7441 cls.interfaces = list(cls.pg_interfaces)
7443 for i in cls.interfaces:
7446 i.configure_ipv6_neighbors()
7449 super(TestNAT66, cls).tearDownClass()
7452 def test_static(self):
7453 """ 1:1 NAT66 test """
7454 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
7455 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index, is_inside=0)
7456 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
7461 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7462 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7465 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7466 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7469 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7470 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7471 ICMPv6EchoRequest())
7473 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7474 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7475 GRE() / IP() / TCP())
7477 self.pg0.add_stream(pkts)
7478 self.pg_enable_capture(self.pg_interfaces)
7480 capture = self.pg1.get_capture(len(pkts))
7481 for packet in capture:
7483 self.assertEqual(packet[IPv6].src, self.nat_addr)
7484 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
7485 self.assert_packet_checksums_valid(packet)
7487 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7492 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7493 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7496 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7497 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7500 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7501 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7504 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7505 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7506 GRE() / IP() / TCP())
7508 self.pg1.add_stream(pkts)
7509 self.pg_enable_capture(self.pg_interfaces)
7511 capture = self.pg0.get_capture(len(pkts))
7512 for packet in capture:
7514 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
7515 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
7516 self.assert_packet_checksums_valid(packet)
7518 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7521 sm = self.vapi.nat66_static_mapping_dump()
7522 self.assertEqual(len(sm), 1)
7523 self.assertEqual(sm[0].total_pkts, 8)
7525 def test_check_no_translate(self):
7526 """ NAT66 translate only when egress interface is outside interface """
7527 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
7528 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index)
7529 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
7533 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7534 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7536 self.pg0.add_stream([p])
7537 self.pg_enable_capture(self.pg_interfaces)
7539 capture = self.pg1.get_capture(1)
7542 self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
7543 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
7545 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7548 def clear_nat66(self):
7550 Clear NAT66 configuration.
7552 interfaces = self.vapi.nat66_interface_dump()
7553 for intf in interfaces:
7554 self.vapi.nat66_add_del_interface(intf.sw_if_index,
7558 static_mappings = self.vapi.nat66_static_mapping_dump()
7559 for sm in static_mappings:
7560 self.vapi.nat66_add_del_static_mapping(sm.local_ip_address,
7561 sm.external_ip_address,
7566 super(TestNAT66, self).tearDown()
7567 if not self.vpp_dead:
7568 self.logger.info(self.vapi.cli("show nat66 interfaces"))
7569 self.logger.info(self.vapi.cli("show nat66 static mappings"))
7573 if __name__ == '__main__':
7574 unittest.main(testRunner=VppTestRunner)