9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
12 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
13 ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
29 def clear_nat44(self):
31 Clear NAT44 configuration.
33 if hasattr(self, 'pg7') and hasattr(self, 'pg8'):
34 # I found no elegant way to do this
35 self.vapi.ip_add_del_route(
36 dst_address=self.pg7.remote_ip4n,
37 dst_address_length=32,
38 next_hop_address=self.pg7.remote_ip4n,
39 next_hop_sw_if_index=self.pg7.sw_if_index,
41 self.vapi.ip_add_del_route(
42 dst_address=self.pg8.remote_ip4n,
43 dst_address_length=32,
44 next_hop_address=self.pg8.remote_ip4n,
45 next_hop_sw_if_index=self.pg8.sw_if_index,
48 for intf in [self.pg7, self.pg8]:
49 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
51 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
56 if self.pg7.has_ip4_config:
57 self.pg7.unconfig_ip4()
59 self.vapi.nat44_forwarding_enable_disable(0)
61 interfaces = self.vapi.nat44_interface_addr_dump()
62 for intf in interfaces:
63 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
64 twice_nat=intf.twice_nat,
67 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
68 domain_id=self.ipfix_domain_id)
69 self.ipfix_src_port = 4739
70 self.ipfix_domain_id = 1
72 interfaces = self.vapi.nat44_interface_dump()
73 for intf in interfaces:
74 if intf.is_inside > 1:
75 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
78 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
82 interfaces = self.vapi.nat44_interface_output_feature_dump()
83 for intf in interfaces:
84 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
88 static_mappings = self.vapi.nat44_static_mapping_dump()
89 for sm in static_mappings:
90 self.vapi.nat44_add_del_static_mapping(
92 sm.external_ip_address,
93 local_port=sm.local_port,
94 external_port=sm.external_port,
95 addr_only=sm.addr_only,
98 twice_nat=sm.twice_nat,
99 self_twice_nat=sm.self_twice_nat,
100 out2in_only=sm.out2in_only,
102 external_sw_if_index=sm.external_sw_if_index,
105 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
106 for lb_sm in lb_static_mappings:
107 self.vapi.nat44_add_del_lb_static_mapping(
111 twice_nat=lb_sm.twice_nat,
112 self_twice_nat=lb_sm.self_twice_nat,
113 out2in_only=lb_sm.out2in_only,
119 identity_mappings = self.vapi.nat44_identity_mapping_dump()
120 for id_m in identity_mappings:
121 self.vapi.nat44_add_del_identity_mapping(
122 addr_only=id_m.addr_only,
125 sw_if_index=id_m.sw_if_index,
127 protocol=id_m.protocol,
130 adresses = self.vapi.nat44_address_dump()
131 for addr in adresses:
132 self.vapi.nat44_add_del_address_range(addr.ip_address,
134 twice_nat=addr.twice_nat,
137 self.vapi.nat_set_reass()
138 self.vapi.nat_set_reass(is_ip6=1)
139 self.verify_no_nat44_user()
140 self.vapi.nat_set_timeouts()
141 self.vapi.nat_set_addr_and_port_alloc_alg()
143 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
144 local_port=0, external_port=0, vrf_id=0,
145 is_add=1, external_sw_if_index=0xFFFFFFFF,
146 proto=0, twice_nat=0, self_twice_nat=0,
147 out2in_only=0, tag=""):
149 Add/delete NAT44 static mapping
151 :param local_ip: Local IP address
152 :param external_ip: External IP address
153 :param local_port: Local port number (Optional)
154 :param external_port: External port number (Optional)
155 :param vrf_id: VRF ID (Default 0)
156 :param is_add: 1 if add, 0 if delete (Default add)
157 :param external_sw_if_index: External interface instead of IP address
158 :param proto: IP protocol (Mandatory if port specified)
159 :param twice_nat: 1 if translate external host address and port
160 :param self_twice_nat: 1 if translate external host address and port
161 whenever external host address equals
162 local address of internal host
163 :param out2in_only: if 1 rule is matching only out2in direction
164 :param tag: Opaque string tag
167 if local_port and external_port:
169 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
170 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
171 self.vapi.nat44_add_del_static_mapping(
174 external_sw_if_index,
186 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
188 Add/delete NAT44 address
190 :param ip: IP address
191 :param is_add: 1 if add, 0 if delete (Default add)
192 :param twice_nat: twice NAT address for extenal hosts
194 nat_addr = socket.inet_pton(socket.AF_INET, ip)
195 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
199 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
201 Create packet stream for inside network
203 :param in_if: Inside interface
204 :param out_if: Outside interface
205 :param dst_ip: Destination address
206 :param ttl: TTL of generated packets
209 dst_ip = out_if.remote_ip4
213 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
214 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
215 TCP(sport=self.tcp_port_in, dport=20))
219 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
220 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
221 UDP(sport=self.udp_port_in, dport=20))
225 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
226 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
227 ICMP(id=self.icmp_id_in, type='echo-request'))
232 def compose_ip6(self, ip4, pref, plen):
234 Compose IPv4-embedded IPv6 addresses
236 :param ip4: IPv4 address
237 :param pref: IPv6 prefix
238 :param plen: IPv6 prefix length
239 :returns: IPv4-embedded IPv6 addresses
241 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
242 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
257 pref_n[10] = ip4_n[3]
261 pref_n[10] = ip4_n[2]
262 pref_n[11] = ip4_n[3]
265 pref_n[10] = ip4_n[1]
266 pref_n[11] = ip4_n[2]
267 pref_n[12] = ip4_n[3]
269 pref_n[12] = ip4_n[0]
270 pref_n[13] = ip4_n[1]
271 pref_n[14] = ip4_n[2]
272 pref_n[15] = ip4_n[3]
273 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
275 def extract_ip4(self, ip6, plen):
277 Extract IPv4 address embedded in IPv6 addresses
279 :param ip6: IPv6 address
280 :param plen: IPv6 prefix length
281 :returns: extracted IPv4 address
283 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
315 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
317 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
319 Create IPv6 packet stream for inside network
321 :param in_if: Inside interface
322 :param out_if: Outside interface
323 :param ttl: Hop Limit of generated packets
324 :param pref: NAT64 prefix
325 :param plen: NAT64 prefix length
329 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
331 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
334 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
335 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
336 TCP(sport=self.tcp_port_in, dport=20))
340 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
341 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
342 UDP(sport=self.udp_port_in, dport=20))
346 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
347 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
348 ICMPv6EchoRequest(id=self.icmp_id_in))
353 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
354 use_inside_ports=False):
356 Create packet stream for outside network
358 :param out_if: Outside interface
359 :param dst_ip: Destination IP address (Default use global NAT address)
360 :param ttl: TTL of generated packets
361 :param use_inside_ports: Use inside NAT ports as destination ports
362 instead of outside ports
365 dst_ip = self.nat_addr
366 if not use_inside_ports:
367 tcp_port = self.tcp_port_out
368 udp_port = self.udp_port_out
369 icmp_id = self.icmp_id_out
371 tcp_port = self.tcp_port_in
372 udp_port = self.udp_port_in
373 icmp_id = self.icmp_id_in
376 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
377 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
378 TCP(dport=tcp_port, sport=20))
382 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
383 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
384 UDP(dport=udp_port, sport=20))
388 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
389 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
390 ICMP(id=icmp_id, type='echo-reply'))
395 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
397 Create packet stream for outside network
399 :param out_if: Outside interface
400 :param dst_ip: Destination IP address (Default use global NAT address)
401 :param hl: HL of generated packets
405 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
406 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
407 TCP(dport=self.tcp_port_out, sport=20))
411 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
412 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
413 UDP(dport=self.udp_port_out, sport=20))
417 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
418 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
419 ICMPv6EchoReply(id=self.icmp_id_out))
424 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
425 dst_ip=None, is_ip6=False):
427 Verify captured packets on outside network
429 :param capture: Captured packets
430 :param nat_ip: Translated IP address (Default use global NAT address)
431 :param same_port: Sorce port number is not translated (Default False)
432 :param dst_ip: Destination IP address (Default do not verify)
433 :param is_ip6: If L3 protocol is IPv6 (Default False)
437 ICMP46 = ICMPv6EchoRequest
442 nat_ip = self.nat_addr
443 for packet in capture:
446 self.assert_packet_checksums_valid(packet)
447 self.assertEqual(packet[IP46].src, nat_ip)
448 if dst_ip is not None:
449 self.assertEqual(packet[IP46].dst, dst_ip)
450 if packet.haslayer(TCP):
452 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
455 packet[TCP].sport, self.tcp_port_in)
456 self.tcp_port_out = packet[TCP].sport
457 self.assert_packet_checksums_valid(packet)
458 elif packet.haslayer(UDP):
460 self.assertEqual(packet[UDP].sport, self.udp_port_in)
463 packet[UDP].sport, self.udp_port_in)
464 self.udp_port_out = packet[UDP].sport
467 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
469 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
470 self.icmp_id_out = packet[ICMP46].id
471 self.assert_packet_checksums_valid(packet)
473 self.logger.error(ppp("Unexpected or invalid packet "
474 "(outside network):", packet))
477 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
480 Verify captured packets on outside network
482 :param capture: Captured packets
483 :param nat_ip: Translated IP address
484 :param same_port: Sorce port number is not translated (Default False)
485 :param dst_ip: Destination IP address (Default do not verify)
487 return self.verify_capture_out(capture, nat_ip, same_port, dst_ip,
490 def verify_capture_in(self, capture, in_if):
492 Verify captured packets on inside network
494 :param capture: Captured packets
495 :param in_if: Inside interface
497 for packet in capture:
499 self.assert_packet_checksums_valid(packet)
500 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
501 if packet.haslayer(TCP):
502 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
503 elif packet.haslayer(UDP):
504 self.assertEqual(packet[UDP].dport, self.udp_port_in)
506 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
508 self.logger.error(ppp("Unexpected or invalid packet "
509 "(inside network):", packet))
512 def verify_capture_in_ip6(self, capture, src_ip, dst_ip):
514 Verify captured IPv6 packets on inside network
516 :param capture: Captured packets
517 :param src_ip: Source IP
518 :param dst_ip: Destination IP address
520 for packet in capture:
522 self.assertEqual(packet[IPv6].src, src_ip)
523 self.assertEqual(packet[IPv6].dst, dst_ip)
524 self.assert_packet_checksums_valid(packet)
525 if packet.haslayer(TCP):
526 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
527 elif packet.haslayer(UDP):
528 self.assertEqual(packet[UDP].dport, self.udp_port_in)
530 self.assertEqual(packet[ICMPv6EchoReply].id,
533 self.logger.error(ppp("Unexpected or invalid packet "
534 "(inside network):", packet))
537 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
539 Verify captured packet that don't have to be translated
541 :param capture: Captured packets
542 :param ingress_if: Ingress interface
543 :param egress_if: Egress interface
545 for packet in capture:
547 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
548 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
549 if packet.haslayer(TCP):
550 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
551 elif packet.haslayer(UDP):
552 self.assertEqual(packet[UDP].sport, self.udp_port_in)
554 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
556 self.logger.error(ppp("Unexpected or invalid packet "
557 "(inside network):", packet))
560 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
563 Verify captured packets with ICMP errors on outside network
565 :param capture: Captured packets
566 :param src_ip: Translated IP address or IP address of VPP
567 (Default use global NAT address)
568 :param icmp_type: Type of error ICMP packet
569 we are expecting (Default 11)
572 src_ip = self.nat_addr
573 for packet in capture:
575 self.assertEqual(packet[IP].src, src_ip)
576 self.assertTrue(packet.haslayer(ICMP))
578 self.assertEqual(icmp.type, icmp_type)
579 self.assertTrue(icmp.haslayer(IPerror))
580 inner_ip = icmp[IPerror]
581 if inner_ip.haslayer(TCPerror):
582 self.assertEqual(inner_ip[TCPerror].dport,
584 elif inner_ip.haslayer(UDPerror):
585 self.assertEqual(inner_ip[UDPerror].dport,
588 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
590 self.logger.error(ppp("Unexpected or invalid packet "
591 "(outside network):", packet))
594 def verify_capture_in_with_icmp_errors(self, capture, in_if, icmp_type=11):
596 Verify captured packets with ICMP errors on inside network
598 :param capture: Captured packets
599 :param in_if: Inside interface
600 :param icmp_type: Type of error ICMP packet
601 we are expecting (Default 11)
603 for packet in capture:
605 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
606 self.assertTrue(packet.haslayer(ICMP))
608 self.assertEqual(icmp.type, icmp_type)
609 self.assertTrue(icmp.haslayer(IPerror))
610 inner_ip = icmp[IPerror]
611 if inner_ip.haslayer(TCPerror):
612 self.assertEqual(inner_ip[TCPerror].sport,
614 elif inner_ip.haslayer(UDPerror):
615 self.assertEqual(inner_ip[UDPerror].sport,
618 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
620 self.logger.error(ppp("Unexpected or invalid packet "
621 "(inside network):", packet))
624 def create_stream_frag(self, src_if, dst, sport, dport, data):
626 Create fragmented packet stream
628 :param src_if: Source interface
629 :param dst: Destination IPv4 address
630 :param sport: Source TCP port
631 :param dport: Destination TCP port
632 :param data: Payload data
635 id = random.randint(0, 65535)
636 p = (IP(src=src_if.remote_ip4, dst=dst) /
637 TCP(sport=sport, dport=dport) /
639 p = p.__class__(str(p))
640 chksum = p['TCP'].chksum
642 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
643 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
644 TCP(sport=sport, dport=dport, chksum=chksum) /
647 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
648 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
649 proto=IP_PROTOS.tcp) /
652 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
653 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
659 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
660 pref=None, plen=0, frag_size=128):
662 Create fragmented packet stream
664 :param src_if: Source interface
665 :param dst: Destination IPv4 address
666 :param sport: Source TCP port
667 :param dport: Destination TCP port
668 :param data: Payload data
669 :param pref: NAT64 prefix
670 :param plen: NAT64 prefix length
671 :param fragsize: size of fragments
675 dst_ip6 = ''.join(['64:ff9b::', dst])
677 dst_ip6 = self.compose_ip6(dst, pref, plen)
679 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
680 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
681 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
682 TCP(sport=sport, dport=dport) /
685 return fragment6(p, frag_size)
687 def reass_frags_and_verify(self, frags, src, dst):
689 Reassemble and verify fragmented packet
691 :param frags: Captured fragments
692 :param src: Source IPv4 address to verify
693 :param dst: Destination IPv4 address to verify
695 :returns: Reassembled IPv4 packet
697 buffer = StringIO.StringIO()
699 self.assertEqual(p[IP].src, src)
700 self.assertEqual(p[IP].dst, dst)
701 self.assert_ip_checksum_valid(p)
702 buffer.seek(p[IP].frag * 8)
703 buffer.write(p[IP].payload)
704 ip = frags[0].getlayer(IP)
705 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
706 proto=frags[0][IP].proto)
707 if ip.proto == IP_PROTOS.tcp:
708 p = (ip / TCP(buffer.getvalue()))
709 self.assert_tcp_checksum_valid(p)
710 elif ip.proto == IP_PROTOS.udp:
711 p = (ip / UDP(buffer.getvalue()))
714 def reass_frags_and_verify_ip6(self, frags, src, dst):
716 Reassemble and verify fragmented packet
718 :param frags: Captured fragments
719 :param src: Source IPv6 address to verify
720 :param dst: Destination IPv6 address to verify
722 :returns: Reassembled IPv6 packet
724 buffer = StringIO.StringIO()
726 self.assertEqual(p[IPv6].src, src)
727 self.assertEqual(p[IPv6].dst, dst)
728 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
729 buffer.write(p[IPv6ExtHdrFragment].payload)
730 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
731 nh=frags[0][IPv6ExtHdrFragment].nh)
732 if ip.nh == IP_PROTOS.tcp:
733 p = (ip / TCP(buffer.getvalue()))
734 elif ip.nh == IP_PROTOS.udp:
735 p = (ip / UDP(buffer.getvalue()))
736 self.assert_packet_checksums_valid(p)
739 def initiate_tcp_session(self, in_if, out_if):
741 Initiates TCP session
743 :param in_if: Inside interface
744 :param out_if: Outside interface
748 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
749 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
750 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
753 self.pg_enable_capture(self.pg_interfaces)
755 capture = out_if.get_capture(1)
757 self.tcp_port_out = p[TCP].sport
759 # SYN + ACK packet out->in
760 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
761 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
762 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
765 self.pg_enable_capture(self.pg_interfaces)
770 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
771 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
772 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
775 self.pg_enable_capture(self.pg_interfaces)
777 out_if.get_capture(1)
780 self.logger.error("TCP 3 way handshake failed")
783 def verify_ipfix_nat44_ses(self, data):
785 Verify IPFIX NAT44 session create/delete event
787 :param data: Decoded IPFIX data records
789 nat44_ses_create_num = 0
790 nat44_ses_delete_num = 0
791 self.assertEqual(6, len(data))
794 self.assertIn(ord(record[230]), [4, 5])
795 if ord(record[230]) == 4:
796 nat44_ses_create_num += 1
798 nat44_ses_delete_num += 1
800 self.assertEqual(self.pg0.remote_ip4n, record[8])
801 # postNATSourceIPv4Address
802 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
805 self.assertEqual(struct.pack("!I", 0), record[234])
806 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
807 if IP_PROTOS.icmp == ord(record[4]):
808 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
809 self.assertEqual(struct.pack("!H", self.icmp_id_out),
811 elif IP_PROTOS.tcp == ord(record[4]):
812 self.assertEqual(struct.pack("!H", self.tcp_port_in),
814 self.assertEqual(struct.pack("!H", self.tcp_port_out),
816 elif IP_PROTOS.udp == ord(record[4]):
817 self.assertEqual(struct.pack("!H", self.udp_port_in),
819 self.assertEqual(struct.pack("!H", self.udp_port_out),
822 self.fail("Invalid protocol")
823 self.assertEqual(3, nat44_ses_create_num)
824 self.assertEqual(3, nat44_ses_delete_num)
826 def verify_ipfix_addr_exhausted(self, data):
828 Verify IPFIX NAT addresses event
830 :param data: Decoded IPFIX data records
832 self.assertEqual(1, len(data))
835 self.assertEqual(ord(record[230]), 3)
837 self.assertEqual(struct.pack("!I", 0), record[283])
839 def verify_ipfix_max_sessions(self, data, limit):
841 Verify IPFIX maximum session entries exceeded event
843 :param data: Decoded IPFIX data records
844 :param limit: Number of maximum session entries that can be created.
846 self.assertEqual(1, len(data))
849 self.assertEqual(ord(record[230]), 13)
850 # natQuotaExceededEvent
851 self.assertEqual(struct.pack("I", 1), record[466])
853 self.assertEqual(struct.pack("I", limit), record[471])
855 def verify_ipfix_max_bibs(self, data, limit):
857 Verify IPFIX maximum BIB entries exceeded event
859 :param data: Decoded IPFIX data records
860 :param limit: Number of maximum BIB entries that can be created.
862 self.assertEqual(1, len(data))
865 self.assertEqual(ord(record[230]), 13)
866 # natQuotaExceededEvent
867 self.assertEqual(struct.pack("I", 2), record[466])
869 self.assertEqual(struct.pack("I", limit), record[472])
871 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
873 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
875 :param data: Decoded IPFIX data records
876 :param limit: Number of maximum fragments pending reassembly
877 :param src_addr: IPv6 source address
879 self.assertEqual(1, len(data))
882 self.assertEqual(ord(record[230]), 13)
883 # natQuotaExceededEvent
884 self.assertEqual(struct.pack("I", 5), record[466])
885 # maxFragmentsPendingReassembly
886 self.assertEqual(struct.pack("I", limit), record[475])
888 self.assertEqual(src_addr, record[27])
890 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
892 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
894 :param data: Decoded IPFIX data records
895 :param limit: Number of maximum fragments pending reassembly
896 :param src_addr: IPv4 source address
898 self.assertEqual(1, len(data))
901 self.assertEqual(ord(record[230]), 13)
902 # natQuotaExceededEvent
903 self.assertEqual(struct.pack("I", 5), record[466])
904 # maxFragmentsPendingReassembly
905 self.assertEqual(struct.pack("I", limit), record[475])
907 self.assertEqual(src_addr, record[8])
909 def verify_ipfix_bib(self, data, is_create, src_addr):
911 Verify IPFIX NAT64 BIB create and delete events
913 :param data: Decoded IPFIX data records
914 :param is_create: Create event if nonzero value otherwise delete event
915 :param src_addr: IPv6 source address
917 self.assertEqual(1, len(data))
921 self.assertEqual(ord(record[230]), 10)
923 self.assertEqual(ord(record[230]), 11)
925 self.assertEqual(src_addr, record[27])
926 # postNATSourceIPv4Address
927 self.assertEqual(self.nat_addr_n, record[225])
929 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
931 self.assertEqual(struct.pack("!I", 0), record[234])
932 # sourceTransportPort
933 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
934 # postNAPTSourceTransportPort
935 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
937 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
940 Verify IPFIX NAT64 session create and delete events
942 :param data: Decoded IPFIX data records
943 :param is_create: Create event if nonzero value otherwise delete event
944 :param src_addr: IPv6 source address
945 :param dst_addr: IPv4 destination address
946 :param dst_port: destination TCP port
948 self.assertEqual(1, len(data))
952 self.assertEqual(ord(record[230]), 6)
954 self.assertEqual(ord(record[230]), 7)
956 self.assertEqual(src_addr, record[27])
957 # destinationIPv6Address
958 self.assertEqual(socket.inet_pton(socket.AF_INET6,
959 self.compose_ip6(dst_addr,
963 # postNATSourceIPv4Address
964 self.assertEqual(self.nat_addr_n, record[225])
965 # postNATDestinationIPv4Address
966 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
969 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
971 self.assertEqual(struct.pack("!I", 0), record[234])
972 # sourceTransportPort
973 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
974 # postNAPTSourceTransportPort
975 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
976 # destinationTransportPort
977 self.assertEqual(struct.pack("!H", dst_port), record[11])
978 # postNAPTDestinationTransportPort
979 self.assertEqual(struct.pack("!H", dst_port), record[228])
981 def verify_no_nat44_user(self):
982 """ Verify that there is no NAT44 user """
983 users = self.vapi.nat44_user_dump()
984 self.assertEqual(len(users), 0)
986 def verify_ipfix_max_entries_per_user(self, data, limit, src_addr):
988 Verify IPFIX maximum entries per user exceeded event
990 :param data: Decoded IPFIX data records
991 :param limit: Number of maximum entries per user
992 :param src_addr: IPv4 source address
994 self.assertEqual(1, len(data))
997 self.assertEqual(ord(record[230]), 13)
998 # natQuotaExceededEvent
999 self.assertEqual(struct.pack("I", 3), record[466])
1001 self.assertEqual(struct.pack("I", limit), record[473])
1003 self.assertEqual(src_addr, record[8])
1006 class TestNAT44(MethodHolder):
1007 """ NAT44 Test Cases """
1010 def setUpClass(cls):
1011 super(TestNAT44, cls).setUpClass()
1012 cls.vapi.cli("set log class nat level debug")
1015 cls.tcp_port_in = 6303
1016 cls.tcp_port_out = 6303
1017 cls.udp_port_in = 6304
1018 cls.udp_port_out = 6304
1019 cls.icmp_id_in = 6305
1020 cls.icmp_id_out = 6305
1021 cls.nat_addr = '10.0.0.3'
1022 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
1023 cls.ipfix_src_port = 4739
1024 cls.ipfix_domain_id = 1
1025 cls.tcp_external_port = 80
1027 cls.create_pg_interfaces(range(10))
1028 cls.interfaces = list(cls.pg_interfaces[0:4])
1030 for i in cls.interfaces:
1035 cls.pg0.generate_remote_hosts(3)
1036 cls.pg0.configure_ipv4_neighbors()
1038 cls.pg1.generate_remote_hosts(1)
1039 cls.pg1.configure_ipv4_neighbors()
1041 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
1042 cls.vapi.ip_table_add_del(10, is_add=1)
1043 cls.vapi.ip_table_add_del(20, is_add=1)
1045 cls.pg4._local_ip4 = "172.16.255.1"
1046 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1047 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
1048 cls.pg4.set_table_ip4(10)
1049 cls.pg5._local_ip4 = "172.17.255.3"
1050 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1051 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
1052 cls.pg5.set_table_ip4(10)
1053 cls.pg6._local_ip4 = "172.16.255.1"
1054 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1055 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
1056 cls.pg6.set_table_ip4(20)
1057 for i in cls.overlapping_interfaces:
1065 cls.pg9.generate_remote_hosts(2)
1066 cls.pg9.config_ip4()
1067 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
1068 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
1072 cls.pg9.resolve_arp()
1073 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
1074 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
1075 cls.pg9.resolve_arp()
1078 super(TestNAT44, cls).tearDownClass()
1081 def test_dynamic(self):
1082 """ NAT44 dynamic translation test """
1084 self.nat44_add_address(self.nat_addr)
1085 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1086 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1090 pkts = self.create_stream_in(self.pg0, self.pg1)
1091 self.pg0.add_stream(pkts)
1092 self.pg_enable_capture(self.pg_interfaces)
1094 capture = self.pg1.get_capture(len(pkts))
1095 self.verify_capture_out(capture)
1098 pkts = self.create_stream_out(self.pg1)
1099 self.pg1.add_stream(pkts)
1100 self.pg_enable_capture(self.pg_interfaces)
1102 capture = self.pg0.get_capture(len(pkts))
1103 self.verify_capture_in(capture, self.pg0)
1105 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1106 """ NAT44 handling of client packets with TTL=1 """
1108 self.nat44_add_address(self.nat_addr)
1109 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1110 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1113 # Client side - generate traffic
1114 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1115 self.pg0.add_stream(pkts)
1116 self.pg_enable_capture(self.pg_interfaces)
1119 # Client side - verify ICMP type 11 packets
1120 capture = self.pg0.get_capture(len(pkts))
1121 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1123 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1124 """ NAT44 handling of server packets with TTL=1 """
1126 self.nat44_add_address(self.nat_addr)
1127 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1128 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1131 # Client side - create sessions
1132 pkts = self.create_stream_in(self.pg0, self.pg1)
1133 self.pg0.add_stream(pkts)
1134 self.pg_enable_capture(self.pg_interfaces)
1137 # Server side - generate traffic
1138 capture = self.pg1.get_capture(len(pkts))
1139 self.verify_capture_out(capture)
1140 pkts = self.create_stream_out(self.pg1, ttl=1)
1141 self.pg1.add_stream(pkts)
1142 self.pg_enable_capture(self.pg_interfaces)
1145 # Server side - verify ICMP type 11 packets
1146 capture = self.pg1.get_capture(len(pkts))
1147 self.verify_capture_out_with_icmp_errors(capture,
1148 src_ip=self.pg1.local_ip4)
1150 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1151 """ NAT44 handling of error responses to client packets with TTL=2 """
1153 self.nat44_add_address(self.nat_addr)
1154 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1155 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1158 # Client side - generate traffic
1159 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1160 self.pg0.add_stream(pkts)
1161 self.pg_enable_capture(self.pg_interfaces)
1164 # Server side - simulate ICMP type 11 response
1165 capture = self.pg1.get_capture(len(pkts))
1166 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1167 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1168 ICMP(type=11) / packet[IP] for packet in capture]
1169 self.pg1.add_stream(pkts)
1170 self.pg_enable_capture(self.pg_interfaces)
1173 # Client side - verify ICMP type 11 packets
1174 capture = self.pg0.get_capture(len(pkts))
1175 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1177 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1178 """ NAT44 handling of error responses to server packets with TTL=2 """
1180 self.nat44_add_address(self.nat_addr)
1181 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1182 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1185 # Client side - create sessions
1186 pkts = self.create_stream_in(self.pg0, self.pg1)
1187 self.pg0.add_stream(pkts)
1188 self.pg_enable_capture(self.pg_interfaces)
1191 # Server side - generate traffic
1192 capture = self.pg1.get_capture(len(pkts))
1193 self.verify_capture_out(capture)
1194 pkts = self.create_stream_out(self.pg1, ttl=2)
1195 self.pg1.add_stream(pkts)
1196 self.pg_enable_capture(self.pg_interfaces)
1199 # Client side - simulate ICMP type 11 response
1200 capture = self.pg0.get_capture(len(pkts))
1201 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1202 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1203 ICMP(type=11) / packet[IP] for packet in capture]
1204 self.pg0.add_stream(pkts)
1205 self.pg_enable_capture(self.pg_interfaces)
1208 # Server side - verify ICMP type 11 packets
1209 capture = self.pg1.get_capture(len(pkts))
1210 self.verify_capture_out_with_icmp_errors(capture)
1212 def test_ping_out_interface_from_outside(self):
1213 """ Ping NAT44 out interface from outside network """
1215 self.nat44_add_address(self.nat_addr)
1216 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1217 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1220 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1221 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1222 ICMP(id=self.icmp_id_out, type='echo-request'))
1224 self.pg1.add_stream(pkts)
1225 self.pg_enable_capture(self.pg_interfaces)
1227 capture = self.pg1.get_capture(len(pkts))
1230 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1231 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1232 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1233 self.assertEqual(packet[ICMP].type, 0) # echo reply
1235 self.logger.error(ppp("Unexpected or invalid packet "
1236 "(outside network):", packet))
1239 def test_ping_internal_host_from_outside(self):
1240 """ Ping internal host from outside network """
1242 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1243 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1244 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1248 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1249 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1250 ICMP(id=self.icmp_id_out, type='echo-request'))
1251 self.pg1.add_stream(pkt)
1252 self.pg_enable_capture(self.pg_interfaces)
1254 capture = self.pg0.get_capture(1)
1255 self.verify_capture_in(capture, self.pg0)
1256 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1259 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1260 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1261 ICMP(id=self.icmp_id_in, type='echo-reply'))
1262 self.pg0.add_stream(pkt)
1263 self.pg_enable_capture(self.pg_interfaces)
1265 capture = self.pg1.get_capture(1)
1266 self.verify_capture_out(capture, same_port=True)
1267 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1269 def test_forwarding(self):
1270 """ NAT44 forwarding test """
1272 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1273 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1275 self.vapi.nat44_forwarding_enable_disable(1)
1277 real_ip = self.pg0.remote_ip4n
1278 alias_ip = self.nat_addr_n
1279 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1280 external_ip=alias_ip)
1283 # static mapping match
1285 pkts = self.create_stream_out(self.pg1)
1286 self.pg1.add_stream(pkts)
1287 self.pg_enable_capture(self.pg_interfaces)
1289 capture = self.pg0.get_capture(len(pkts))
1290 self.verify_capture_in(capture, self.pg0)
1292 pkts = self.create_stream_in(self.pg0, self.pg1)
1293 self.pg0.add_stream(pkts)
1294 self.pg_enable_capture(self.pg_interfaces)
1296 capture = self.pg1.get_capture(len(pkts))
1297 self.verify_capture_out(capture, same_port=True)
1299 # no static mapping match
1301 host0 = self.pg0.remote_hosts[0]
1302 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1304 pkts = self.create_stream_out(self.pg1,
1305 dst_ip=self.pg0.remote_ip4,
1306 use_inside_ports=True)
1307 self.pg1.add_stream(pkts)
1308 self.pg_enable_capture(self.pg_interfaces)
1310 capture = self.pg0.get_capture(len(pkts))
1311 self.verify_capture_in(capture, self.pg0)
1313 pkts = self.create_stream_in(self.pg0, self.pg1)
1314 self.pg0.add_stream(pkts)
1315 self.pg_enable_capture(self.pg_interfaces)
1317 capture = self.pg1.get_capture(len(pkts))
1318 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1321 self.pg0.remote_hosts[0] = host0
1324 self.vapi.nat44_forwarding_enable_disable(0)
1325 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1326 external_ip=alias_ip,
1329 def test_static_in(self):
1330 """ 1:1 NAT initialized from inside network """
1332 nat_ip = "10.0.0.10"
1333 self.tcp_port_out = 6303
1334 self.udp_port_out = 6304
1335 self.icmp_id_out = 6305
1337 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1338 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1339 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1341 sm = self.vapi.nat44_static_mapping_dump()
1342 self.assertEqual(len(sm), 1)
1343 self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1344 self.assertEqual(sm[0].protocol, 0)
1345 self.assertEqual(sm[0].local_port, 0)
1346 self.assertEqual(sm[0].external_port, 0)
1349 pkts = self.create_stream_in(self.pg0, self.pg1)
1350 self.pg0.add_stream(pkts)
1351 self.pg_enable_capture(self.pg_interfaces)
1353 capture = self.pg1.get_capture(len(pkts))
1354 self.verify_capture_out(capture, nat_ip, True)
1357 pkts = self.create_stream_out(self.pg1, nat_ip)
1358 self.pg1.add_stream(pkts)
1359 self.pg_enable_capture(self.pg_interfaces)
1361 capture = self.pg0.get_capture(len(pkts))
1362 self.verify_capture_in(capture, self.pg0)
1364 def test_static_out(self):
1365 """ 1:1 NAT initialized from outside network """
1367 nat_ip = "10.0.0.20"
1368 self.tcp_port_out = 6303
1369 self.udp_port_out = 6304
1370 self.icmp_id_out = 6305
1373 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1374 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1375 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1377 sm = self.vapi.nat44_static_mapping_dump()
1378 self.assertEqual(len(sm), 1)
1379 self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1382 pkts = self.create_stream_out(self.pg1, nat_ip)
1383 self.pg1.add_stream(pkts)
1384 self.pg_enable_capture(self.pg_interfaces)
1386 capture = self.pg0.get_capture(len(pkts))
1387 self.verify_capture_in(capture, self.pg0)
1390 pkts = self.create_stream_in(self.pg0, self.pg1)
1391 self.pg0.add_stream(pkts)
1392 self.pg_enable_capture(self.pg_interfaces)
1394 capture = self.pg1.get_capture(len(pkts))
1395 self.verify_capture_out(capture, nat_ip, True)
1397 def test_static_with_port_in(self):
1398 """ 1:1 NAPT initialized from inside network """
1400 self.tcp_port_out = 3606
1401 self.udp_port_out = 3607
1402 self.icmp_id_out = 3608
1404 self.nat44_add_address(self.nat_addr)
1405 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1406 self.tcp_port_in, self.tcp_port_out,
1407 proto=IP_PROTOS.tcp)
1408 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1409 self.udp_port_in, self.udp_port_out,
1410 proto=IP_PROTOS.udp)
1411 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1412 self.icmp_id_in, self.icmp_id_out,
1413 proto=IP_PROTOS.icmp)
1414 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1415 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1419 pkts = self.create_stream_in(self.pg0, self.pg1)
1420 self.pg0.add_stream(pkts)
1421 self.pg_enable_capture(self.pg_interfaces)
1423 capture = self.pg1.get_capture(len(pkts))
1424 self.verify_capture_out(capture)
1427 pkts = self.create_stream_out(self.pg1)
1428 self.pg1.add_stream(pkts)
1429 self.pg_enable_capture(self.pg_interfaces)
1431 capture = self.pg0.get_capture(len(pkts))
1432 self.verify_capture_in(capture, self.pg0)
1434 def test_static_with_port_out(self):
1435 """ 1:1 NAPT initialized from outside network """
1437 self.tcp_port_out = 30606
1438 self.udp_port_out = 30607
1439 self.icmp_id_out = 30608
1441 self.nat44_add_address(self.nat_addr)
1442 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1443 self.tcp_port_in, self.tcp_port_out,
1444 proto=IP_PROTOS.tcp)
1445 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1446 self.udp_port_in, self.udp_port_out,
1447 proto=IP_PROTOS.udp)
1448 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1449 self.icmp_id_in, self.icmp_id_out,
1450 proto=IP_PROTOS.icmp)
1451 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1452 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1456 pkts = self.create_stream_out(self.pg1)
1457 self.pg1.add_stream(pkts)
1458 self.pg_enable_capture(self.pg_interfaces)
1460 capture = self.pg0.get_capture(len(pkts))
1461 self.verify_capture_in(capture, self.pg0)
1464 pkts = self.create_stream_in(self.pg0, self.pg1)
1465 self.pg0.add_stream(pkts)
1466 self.pg_enable_capture(self.pg_interfaces)
1468 capture = self.pg1.get_capture(len(pkts))
1469 self.verify_capture_out(capture)
1471 def test_static_vrf_aware(self):
1472 """ 1:1 NAT VRF awareness """
1474 nat_ip1 = "10.0.0.30"
1475 nat_ip2 = "10.0.0.40"
1476 self.tcp_port_out = 6303
1477 self.udp_port_out = 6304
1478 self.icmp_id_out = 6305
1480 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1482 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1484 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1486 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1487 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1489 # inside interface VRF match NAT44 static mapping VRF
1490 pkts = self.create_stream_in(self.pg4, self.pg3)
1491 self.pg4.add_stream(pkts)
1492 self.pg_enable_capture(self.pg_interfaces)
1494 capture = self.pg3.get_capture(len(pkts))
1495 self.verify_capture_out(capture, nat_ip1, True)
1497 # inside interface VRF don't match NAT44 static mapping VRF (packets
1499 pkts = self.create_stream_in(self.pg0, self.pg3)
1500 self.pg0.add_stream(pkts)
1501 self.pg_enable_capture(self.pg_interfaces)
1503 self.pg3.assert_nothing_captured()
1505 def test_dynamic_to_static(self):
1506 """ Switch from dynamic translation to 1:1NAT """
1507 nat_ip = "10.0.0.10"
1508 self.tcp_port_out = 6303
1509 self.udp_port_out = 6304
1510 self.icmp_id_out = 6305
1512 self.nat44_add_address(self.nat_addr)
1513 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1514 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1518 pkts = self.create_stream_in(self.pg0, self.pg1)
1519 self.pg0.add_stream(pkts)
1520 self.pg_enable_capture(self.pg_interfaces)
1522 capture = self.pg1.get_capture(len(pkts))
1523 self.verify_capture_out(capture)
1526 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1527 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
1528 self.assertEqual(len(sessions), 0)
1529 pkts = self.create_stream_in(self.pg0, self.pg1)
1530 self.pg0.add_stream(pkts)
1531 self.pg_enable_capture(self.pg_interfaces)
1533 capture = self.pg1.get_capture(len(pkts))
1534 self.verify_capture_out(capture, nat_ip, True)
1536 def test_identity_nat(self):
1537 """ Identity NAT """
1539 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1540 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1541 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1544 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1545 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1546 TCP(sport=12345, dport=56789))
1547 self.pg1.add_stream(p)
1548 self.pg_enable_capture(self.pg_interfaces)
1550 capture = self.pg0.get_capture(1)
1555 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1556 self.assertEqual(ip.src, self.pg1.remote_ip4)
1557 self.assertEqual(tcp.dport, 56789)
1558 self.assertEqual(tcp.sport, 12345)
1559 self.assert_packet_checksums_valid(p)
1561 self.logger.error(ppp("Unexpected or invalid packet:", p))
1564 def test_multiple_inside_interfaces(self):
1565 """ NAT44 multiple non-overlapping address space inside interfaces """
1567 self.nat44_add_address(self.nat_addr)
1568 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1569 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1570 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1573 # between two NAT44 inside interfaces (no translation)
1574 pkts = self.create_stream_in(self.pg0, self.pg1)
1575 self.pg0.add_stream(pkts)
1576 self.pg_enable_capture(self.pg_interfaces)
1578 capture = self.pg1.get_capture(len(pkts))
1579 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1581 # from NAT44 inside to interface without NAT44 feature (no translation)
1582 pkts = self.create_stream_in(self.pg0, self.pg2)
1583 self.pg0.add_stream(pkts)
1584 self.pg_enable_capture(self.pg_interfaces)
1586 capture = self.pg2.get_capture(len(pkts))
1587 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1589 # in2out 1st interface
1590 pkts = self.create_stream_in(self.pg0, self.pg3)
1591 self.pg0.add_stream(pkts)
1592 self.pg_enable_capture(self.pg_interfaces)
1594 capture = self.pg3.get_capture(len(pkts))
1595 self.verify_capture_out(capture)
1597 # out2in 1st interface
1598 pkts = self.create_stream_out(self.pg3)
1599 self.pg3.add_stream(pkts)
1600 self.pg_enable_capture(self.pg_interfaces)
1602 capture = self.pg0.get_capture(len(pkts))
1603 self.verify_capture_in(capture, self.pg0)
1605 # in2out 2nd interface
1606 pkts = self.create_stream_in(self.pg1, self.pg3)
1607 self.pg1.add_stream(pkts)
1608 self.pg_enable_capture(self.pg_interfaces)
1610 capture = self.pg3.get_capture(len(pkts))
1611 self.verify_capture_out(capture)
1613 # out2in 2nd interface
1614 pkts = self.create_stream_out(self.pg3)
1615 self.pg3.add_stream(pkts)
1616 self.pg_enable_capture(self.pg_interfaces)
1618 capture = self.pg1.get_capture(len(pkts))
1619 self.verify_capture_in(capture, self.pg1)
1621 def test_inside_overlapping_interfaces(self):
1622 """ NAT44 multiple inside interfaces with overlapping address space """
1624 static_nat_ip = "10.0.0.10"
1625 self.nat44_add_address(self.nat_addr)
1626 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1628 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1629 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1630 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1631 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1634 # between NAT44 inside interfaces with same VRF (no translation)
1635 pkts = self.create_stream_in(self.pg4, self.pg5)
1636 self.pg4.add_stream(pkts)
1637 self.pg_enable_capture(self.pg_interfaces)
1639 capture = self.pg5.get_capture(len(pkts))
1640 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1642 # between NAT44 inside interfaces with different VRF (hairpinning)
1643 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1644 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1645 TCP(sport=1234, dport=5678))
1646 self.pg4.add_stream(p)
1647 self.pg_enable_capture(self.pg_interfaces)
1649 capture = self.pg6.get_capture(1)
1654 self.assertEqual(ip.src, self.nat_addr)
1655 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1656 self.assertNotEqual(tcp.sport, 1234)
1657 self.assertEqual(tcp.dport, 5678)
1659 self.logger.error(ppp("Unexpected or invalid packet:", p))
1662 # in2out 1st interface
1663 pkts = self.create_stream_in(self.pg4, self.pg3)
1664 self.pg4.add_stream(pkts)
1665 self.pg_enable_capture(self.pg_interfaces)
1667 capture = self.pg3.get_capture(len(pkts))
1668 self.verify_capture_out(capture)
1670 # out2in 1st interface
1671 pkts = self.create_stream_out(self.pg3)
1672 self.pg3.add_stream(pkts)
1673 self.pg_enable_capture(self.pg_interfaces)
1675 capture = self.pg4.get_capture(len(pkts))
1676 self.verify_capture_in(capture, self.pg4)
1678 # in2out 2nd interface
1679 pkts = self.create_stream_in(self.pg5, self.pg3)
1680 self.pg5.add_stream(pkts)
1681 self.pg_enable_capture(self.pg_interfaces)
1683 capture = self.pg3.get_capture(len(pkts))
1684 self.verify_capture_out(capture)
1686 # out2in 2nd interface
1687 pkts = self.create_stream_out(self.pg3)
1688 self.pg3.add_stream(pkts)
1689 self.pg_enable_capture(self.pg_interfaces)
1691 capture = self.pg5.get_capture(len(pkts))
1692 self.verify_capture_in(capture, self.pg5)
1695 addresses = self.vapi.nat44_address_dump()
1696 self.assertEqual(len(addresses), 1)
1697 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1698 self.assertEqual(len(sessions), 3)
1699 for session in sessions:
1700 self.assertFalse(session.is_static)
1701 self.assertEqual(session.inside_ip_address[0:4],
1702 self.pg5.remote_ip4n)
1703 self.assertEqual(session.outside_ip_address,
1704 addresses[0].ip_address)
1705 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1706 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1707 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1708 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1709 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1710 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1711 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1712 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1713 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1715 # in2out 3rd interface
1716 pkts = self.create_stream_in(self.pg6, self.pg3)
1717 self.pg6.add_stream(pkts)
1718 self.pg_enable_capture(self.pg_interfaces)
1720 capture = self.pg3.get_capture(len(pkts))
1721 self.verify_capture_out(capture, static_nat_ip, True)
1723 # out2in 3rd interface
1724 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1725 self.pg3.add_stream(pkts)
1726 self.pg_enable_capture(self.pg_interfaces)
1728 capture = self.pg6.get_capture(len(pkts))
1729 self.verify_capture_in(capture, self.pg6)
1731 # general user and session dump verifications
1732 users = self.vapi.nat44_user_dump()
1733 self.assertTrue(len(users) >= 3)
1734 addresses = self.vapi.nat44_address_dump()
1735 self.assertEqual(len(addresses), 1)
1737 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1739 for session in sessions:
1740 self.assertEqual(user.ip_address, session.inside_ip_address)
1741 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1742 self.assertTrue(session.protocol in
1743 [IP_PROTOS.tcp, IP_PROTOS.udp,
1745 self.assertFalse(session.ext_host_valid)
1748 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1749 self.assertTrue(len(sessions) >= 4)
1750 for session in sessions:
1751 self.assertFalse(session.is_static)
1752 self.assertEqual(session.inside_ip_address[0:4],
1753 self.pg4.remote_ip4n)
1754 self.assertEqual(session.outside_ip_address,
1755 addresses[0].ip_address)
1758 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1759 self.assertTrue(len(sessions) >= 3)
1760 for session in sessions:
1761 self.assertTrue(session.is_static)
1762 self.assertEqual(session.inside_ip_address[0:4],
1763 self.pg6.remote_ip4n)
1764 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1765 map(int, static_nat_ip.split('.')))
1766 self.assertTrue(session.inside_port in
1767 [self.tcp_port_in, self.udp_port_in,
1770 def test_hairpinning(self):
1771 """ NAT44 hairpinning - 1:1 NAPT """
1773 host = self.pg0.remote_hosts[0]
1774 server = self.pg0.remote_hosts[1]
1777 server_in_port = 5678
1778 server_out_port = 8765
1780 self.nat44_add_address(self.nat_addr)
1781 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1782 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1784 # add static mapping for server
1785 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1786 server_in_port, server_out_port,
1787 proto=IP_PROTOS.tcp)
1789 # send packet from host to server
1790 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1791 IP(src=host.ip4, dst=self.nat_addr) /
1792 TCP(sport=host_in_port, dport=server_out_port))
1793 self.pg0.add_stream(p)
1794 self.pg_enable_capture(self.pg_interfaces)
1796 capture = self.pg0.get_capture(1)
1801 self.assertEqual(ip.src, self.nat_addr)
1802 self.assertEqual(ip.dst, server.ip4)
1803 self.assertNotEqual(tcp.sport, host_in_port)
1804 self.assertEqual(tcp.dport, server_in_port)
1805 self.assert_packet_checksums_valid(p)
1806 host_out_port = tcp.sport
1808 self.logger.error(ppp("Unexpected or invalid packet:", p))
1811 # send reply from server to host
1812 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1813 IP(src=server.ip4, dst=self.nat_addr) /
1814 TCP(sport=server_in_port, dport=host_out_port))
1815 self.pg0.add_stream(p)
1816 self.pg_enable_capture(self.pg_interfaces)
1818 capture = self.pg0.get_capture(1)
1823 self.assertEqual(ip.src, self.nat_addr)
1824 self.assertEqual(ip.dst, host.ip4)
1825 self.assertEqual(tcp.sport, server_out_port)
1826 self.assertEqual(tcp.dport, host_in_port)
1827 self.assert_packet_checksums_valid(p)
1829 self.logger.error(ppp("Unexpected or invalid packet:", p))
1832 def test_hairpinning2(self):
1833 """ NAT44 hairpinning - 1:1 NAT"""
1835 server1_nat_ip = "10.0.0.10"
1836 server2_nat_ip = "10.0.0.11"
1837 host = self.pg0.remote_hosts[0]
1838 server1 = self.pg0.remote_hosts[1]
1839 server2 = self.pg0.remote_hosts[2]
1840 server_tcp_port = 22
1841 server_udp_port = 20
1843 self.nat44_add_address(self.nat_addr)
1844 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1845 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1848 # add static mapping for servers
1849 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1850 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1854 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1855 IP(src=host.ip4, dst=server1_nat_ip) /
1856 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1858 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1859 IP(src=host.ip4, dst=server1_nat_ip) /
1860 UDP(sport=self.udp_port_in, dport=server_udp_port))
1862 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1863 IP(src=host.ip4, dst=server1_nat_ip) /
1864 ICMP(id=self.icmp_id_in, type='echo-request'))
1866 self.pg0.add_stream(pkts)
1867 self.pg_enable_capture(self.pg_interfaces)
1869 capture = self.pg0.get_capture(len(pkts))
1870 for packet in capture:
1872 self.assertEqual(packet[IP].src, self.nat_addr)
1873 self.assertEqual(packet[IP].dst, server1.ip4)
1874 if packet.haslayer(TCP):
1875 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1876 self.assertEqual(packet[TCP].dport, server_tcp_port)
1877 self.tcp_port_out = packet[TCP].sport
1878 self.assert_packet_checksums_valid(packet)
1879 elif packet.haslayer(UDP):
1880 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1881 self.assertEqual(packet[UDP].dport, server_udp_port)
1882 self.udp_port_out = packet[UDP].sport
1884 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1885 self.icmp_id_out = packet[ICMP].id
1887 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1892 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1893 IP(src=server1.ip4, dst=self.nat_addr) /
1894 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1896 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1897 IP(src=server1.ip4, dst=self.nat_addr) /
1898 UDP(sport=server_udp_port, dport=self.udp_port_out))
1900 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1901 IP(src=server1.ip4, dst=self.nat_addr) /
1902 ICMP(id=self.icmp_id_out, type='echo-reply'))
1904 self.pg0.add_stream(pkts)
1905 self.pg_enable_capture(self.pg_interfaces)
1907 capture = self.pg0.get_capture(len(pkts))
1908 for packet in capture:
1910 self.assertEqual(packet[IP].src, server1_nat_ip)
1911 self.assertEqual(packet[IP].dst, host.ip4)
1912 if packet.haslayer(TCP):
1913 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1914 self.assertEqual(packet[TCP].sport, server_tcp_port)
1915 self.assert_packet_checksums_valid(packet)
1916 elif packet.haslayer(UDP):
1917 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1918 self.assertEqual(packet[UDP].sport, server_udp_port)
1920 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1922 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1925 # server2 to server1
1927 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1928 IP(src=server2.ip4, dst=server1_nat_ip) /
1929 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1931 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1932 IP(src=server2.ip4, dst=server1_nat_ip) /
1933 UDP(sport=self.udp_port_in, dport=server_udp_port))
1935 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1936 IP(src=server2.ip4, dst=server1_nat_ip) /
1937 ICMP(id=self.icmp_id_in, type='echo-request'))
1939 self.pg0.add_stream(pkts)
1940 self.pg_enable_capture(self.pg_interfaces)
1942 capture = self.pg0.get_capture(len(pkts))
1943 for packet in capture:
1945 self.assertEqual(packet[IP].src, server2_nat_ip)
1946 self.assertEqual(packet[IP].dst, server1.ip4)
1947 if packet.haslayer(TCP):
1948 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1949 self.assertEqual(packet[TCP].dport, server_tcp_port)
1950 self.tcp_port_out = packet[TCP].sport
1951 self.assert_packet_checksums_valid(packet)
1952 elif packet.haslayer(UDP):
1953 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1954 self.assertEqual(packet[UDP].dport, server_udp_port)
1955 self.udp_port_out = packet[UDP].sport
1957 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1958 self.icmp_id_out = packet[ICMP].id
1960 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1963 # server1 to server2
1965 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1966 IP(src=server1.ip4, dst=server2_nat_ip) /
1967 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1969 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1970 IP(src=server1.ip4, dst=server2_nat_ip) /
1971 UDP(sport=server_udp_port, dport=self.udp_port_out))
1973 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1974 IP(src=server1.ip4, dst=server2_nat_ip) /
1975 ICMP(id=self.icmp_id_out, type='echo-reply'))
1977 self.pg0.add_stream(pkts)
1978 self.pg_enable_capture(self.pg_interfaces)
1980 capture = self.pg0.get_capture(len(pkts))
1981 for packet in capture:
1983 self.assertEqual(packet[IP].src, server1_nat_ip)
1984 self.assertEqual(packet[IP].dst, server2.ip4)
1985 if packet.haslayer(TCP):
1986 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1987 self.assertEqual(packet[TCP].sport, server_tcp_port)
1988 self.assert_packet_checksums_valid(packet)
1989 elif packet.haslayer(UDP):
1990 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1991 self.assertEqual(packet[UDP].sport, server_udp_port)
1993 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1995 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1998 def test_max_translations_per_user(self):
1999 """ MAX translations per user - recycle the least recently used """
2001 self.nat44_add_address(self.nat_addr)
2002 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2003 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2006 # get maximum number of translations per user
2007 nat44_config = self.vapi.nat_show_config()
2009 # send more than maximum number of translations per user packets
2010 pkts_num = nat44_config.max_translations_per_user + 5
2012 for port in range(0, pkts_num):
2013 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2014 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2015 TCP(sport=1025 + port))
2017 self.pg0.add_stream(pkts)
2018 self.pg_enable_capture(self.pg_interfaces)
2021 # verify number of translated packet
2022 self.pg1.get_capture(pkts_num)
2024 users = self.vapi.nat44_user_dump()
2026 if user.ip_address == self.pg0.remote_ip4n:
2027 self.assertEqual(user.nsessions,
2028 nat44_config.max_translations_per_user)
2029 self.assertEqual(user.nstaticsessions, 0)
2032 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
2034 proto=IP_PROTOS.tcp)
2035 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2036 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2037 TCP(sport=tcp_port))
2038 self.pg0.add_stream(p)
2039 self.pg_enable_capture(self.pg_interfaces)
2041 self.pg1.get_capture(1)
2042 users = self.vapi.nat44_user_dump()
2044 if user.ip_address == self.pg0.remote_ip4n:
2045 self.assertEqual(user.nsessions,
2046 nat44_config.max_translations_per_user - 1)
2047 self.assertEqual(user.nstaticsessions, 1)
2049 def test_interface_addr(self):
2050 """ Acquire NAT44 addresses from interface """
2051 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2053 # no address in NAT pool
2054 adresses = self.vapi.nat44_address_dump()
2055 self.assertEqual(0, len(adresses))
2057 # configure interface address and check NAT address pool
2058 self.pg7.config_ip4()
2059 adresses = self.vapi.nat44_address_dump()
2060 self.assertEqual(1, len(adresses))
2061 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2063 # remove interface address and check NAT address pool
2064 self.pg7.unconfig_ip4()
2065 adresses = self.vapi.nat44_address_dump()
2066 self.assertEqual(0, len(adresses))
2068 def test_interface_addr_static_mapping(self):
2069 """ Static mapping with addresses from interface """
2072 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2073 self.nat44_add_static_mapping(
2075 external_sw_if_index=self.pg7.sw_if_index,
2078 # static mappings with external interface
2079 static_mappings = self.vapi.nat44_static_mapping_dump()
2080 self.assertEqual(1, len(static_mappings))
2081 self.assertEqual(self.pg7.sw_if_index,
2082 static_mappings[0].external_sw_if_index)
2083 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2085 # configure interface address and check static mappings
2086 self.pg7.config_ip4()
2087 static_mappings = self.vapi.nat44_static_mapping_dump()
2088 self.assertEqual(2, len(static_mappings))
2090 for sm in static_mappings:
2091 if sm.external_sw_if_index == 0xFFFFFFFF:
2092 self.assertEqual(sm.external_ip_address[0:4],
2093 self.pg7.local_ip4n)
2094 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2096 self.assertTrue(resolved)
2098 # remove interface address and check static mappings
2099 self.pg7.unconfig_ip4()
2100 static_mappings = self.vapi.nat44_static_mapping_dump()
2101 self.assertEqual(1, len(static_mappings))
2102 self.assertEqual(self.pg7.sw_if_index,
2103 static_mappings[0].external_sw_if_index)
2104 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2106 # configure interface address again and check static mappings
2107 self.pg7.config_ip4()
2108 static_mappings = self.vapi.nat44_static_mapping_dump()
2109 self.assertEqual(2, len(static_mappings))
2111 for sm in static_mappings:
2112 if sm.external_sw_if_index == 0xFFFFFFFF:
2113 self.assertEqual(sm.external_ip_address[0:4],
2114 self.pg7.local_ip4n)
2115 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2117 self.assertTrue(resolved)
2119 # remove static mapping
2120 self.nat44_add_static_mapping(
2122 external_sw_if_index=self.pg7.sw_if_index,
2125 static_mappings = self.vapi.nat44_static_mapping_dump()
2126 self.assertEqual(0, len(static_mappings))
2128 def test_interface_addr_identity_nat(self):
2129 """ Identity NAT with addresses from interface """
2132 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2133 self.vapi.nat44_add_del_identity_mapping(
2134 sw_if_index=self.pg7.sw_if_index,
2136 protocol=IP_PROTOS.tcp,
2139 # identity mappings with external interface
2140 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2141 self.assertEqual(1, len(identity_mappings))
2142 self.assertEqual(self.pg7.sw_if_index,
2143 identity_mappings[0].sw_if_index)
2145 # configure interface address and check identity mappings
2146 self.pg7.config_ip4()
2147 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2149 self.assertEqual(2, len(identity_mappings))
2150 for sm in identity_mappings:
2151 if sm.sw_if_index == 0xFFFFFFFF:
2152 self.assertEqual(identity_mappings[0].ip_address,
2153 self.pg7.local_ip4n)
2154 self.assertEqual(port, identity_mappings[0].port)
2155 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2157 self.assertTrue(resolved)
2159 # remove interface address and check identity mappings
2160 self.pg7.unconfig_ip4()
2161 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2162 self.assertEqual(1, len(identity_mappings))
2163 self.assertEqual(self.pg7.sw_if_index,
2164 identity_mappings[0].sw_if_index)
2166 def test_ipfix_nat44_sess(self):
2167 """ IPFIX logging NAT44 session created/delted """
2168 self.ipfix_domain_id = 10
2169 self.ipfix_src_port = 20202
2170 colector_port = 30303
2171 bind_layers(UDP, IPFIX, dport=30303)
2172 self.nat44_add_address(self.nat_addr)
2173 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2174 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2176 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2177 src_address=self.pg3.local_ip4n,
2179 template_interval=10,
2180 collector_port=colector_port)
2181 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2182 src_port=self.ipfix_src_port)
2184 pkts = self.create_stream_in(self.pg0, self.pg1)
2185 self.pg0.add_stream(pkts)
2186 self.pg_enable_capture(self.pg_interfaces)
2188 capture = self.pg1.get_capture(len(pkts))
2189 self.verify_capture_out(capture)
2190 self.nat44_add_address(self.nat_addr, is_add=0)
2191 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2192 capture = self.pg3.get_capture(9)
2193 ipfix = IPFIXDecoder()
2194 # first load template
2196 self.assertTrue(p.haslayer(IPFIX))
2197 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2198 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2199 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2200 self.assertEqual(p[UDP].dport, colector_port)
2201 self.assertEqual(p[IPFIX].observationDomainID,
2202 self.ipfix_domain_id)
2203 if p.haslayer(Template):
2204 ipfix.add_template(p.getlayer(Template))
2205 # verify events in data set
2207 if p.haslayer(Data):
2208 data = ipfix.decode_data_set(p.getlayer(Set))
2209 self.verify_ipfix_nat44_ses(data)
2211 def test_ipfix_addr_exhausted(self):
2212 """ IPFIX logging NAT addresses exhausted """
2213 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2214 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2216 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2217 src_address=self.pg3.local_ip4n,
2219 template_interval=10)
2220 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2221 src_port=self.ipfix_src_port)
2223 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2224 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2226 self.pg0.add_stream(p)
2227 self.pg_enable_capture(self.pg_interfaces)
2229 self.pg1.assert_nothing_captured()
2231 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2232 capture = self.pg3.get_capture(9)
2233 ipfix = IPFIXDecoder()
2234 # first load template
2236 self.assertTrue(p.haslayer(IPFIX))
2237 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2238 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2239 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2240 self.assertEqual(p[UDP].dport, 4739)
2241 self.assertEqual(p[IPFIX].observationDomainID,
2242 self.ipfix_domain_id)
2243 if p.haslayer(Template):
2244 ipfix.add_template(p.getlayer(Template))
2245 # verify events in data set
2247 if p.haslayer(Data):
2248 data = ipfix.decode_data_set(p.getlayer(Set))
2249 self.verify_ipfix_addr_exhausted(data)
2251 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2252 def test_ipfix_max_sessions(self):
2253 """ IPFIX logging maximum session entries exceeded """
2254 self.nat44_add_address(self.nat_addr)
2255 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2256 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2259 nat44_config = self.vapi.nat_show_config()
2260 max_sessions = 10 * nat44_config.translation_buckets
2263 for i in range(0, max_sessions):
2264 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2265 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2266 IP(src=src, dst=self.pg1.remote_ip4) /
2269 self.pg0.add_stream(pkts)
2270 self.pg_enable_capture(self.pg_interfaces)
2273 self.pg1.get_capture(max_sessions)
2274 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2275 src_address=self.pg3.local_ip4n,
2277 template_interval=10)
2278 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2279 src_port=self.ipfix_src_port)
2281 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2282 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2284 self.pg0.add_stream(p)
2285 self.pg_enable_capture(self.pg_interfaces)
2287 self.pg1.assert_nothing_captured()
2289 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2290 capture = self.pg3.get_capture(9)
2291 ipfix = IPFIXDecoder()
2292 # first load template
2294 self.assertTrue(p.haslayer(IPFIX))
2295 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2296 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2297 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2298 self.assertEqual(p[UDP].dport, 4739)
2299 self.assertEqual(p[IPFIX].observationDomainID,
2300 self.ipfix_domain_id)
2301 if p.haslayer(Template):
2302 ipfix.add_template(p.getlayer(Template))
2303 # verify events in data set
2305 if p.haslayer(Data):
2306 data = ipfix.decode_data_set(p.getlayer(Set))
2307 self.verify_ipfix_max_sessions(data, max_sessions)
2309 def test_pool_addr_fib(self):
2310 """ NAT44 add pool addresses to FIB """
2311 static_addr = '10.0.0.10'
2312 self.nat44_add_address(self.nat_addr)
2313 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2314 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2316 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2319 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2320 ARP(op=ARP.who_has, pdst=self.nat_addr,
2321 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2322 self.pg1.add_stream(p)
2323 self.pg_enable_capture(self.pg_interfaces)
2325 capture = self.pg1.get_capture(1)
2326 self.assertTrue(capture[0].haslayer(ARP))
2327 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2330 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2331 ARP(op=ARP.who_has, pdst=static_addr,
2332 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2333 self.pg1.add_stream(p)
2334 self.pg_enable_capture(self.pg_interfaces)
2336 capture = self.pg1.get_capture(1)
2337 self.assertTrue(capture[0].haslayer(ARP))
2338 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2340 # send ARP to non-NAT44 interface
2341 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2342 ARP(op=ARP.who_has, pdst=self.nat_addr,
2343 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2344 self.pg2.add_stream(p)
2345 self.pg_enable_capture(self.pg_interfaces)
2347 self.pg1.assert_nothing_captured()
2349 # remove addresses and verify
2350 self.nat44_add_address(self.nat_addr, is_add=0)
2351 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2354 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2355 ARP(op=ARP.who_has, pdst=self.nat_addr,
2356 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2357 self.pg1.add_stream(p)
2358 self.pg_enable_capture(self.pg_interfaces)
2360 self.pg1.assert_nothing_captured()
2362 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2363 ARP(op=ARP.who_has, pdst=static_addr,
2364 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2365 self.pg1.add_stream(p)
2366 self.pg_enable_capture(self.pg_interfaces)
2368 self.pg1.assert_nothing_captured()
2370 def test_vrf_mode(self):
2371 """ NAT44 tenant VRF aware address pool mode """
2375 nat_ip1 = "10.0.0.10"
2376 nat_ip2 = "10.0.0.11"
2378 self.pg0.unconfig_ip4()
2379 self.pg1.unconfig_ip4()
2380 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2381 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2382 self.pg0.set_table_ip4(vrf_id1)
2383 self.pg1.set_table_ip4(vrf_id2)
2384 self.pg0.config_ip4()
2385 self.pg1.config_ip4()
2386 self.pg0.resolve_arp()
2387 self.pg1.resolve_arp()
2389 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2390 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2391 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2392 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2393 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2398 pkts = self.create_stream_in(self.pg0, self.pg2)
2399 self.pg0.add_stream(pkts)
2400 self.pg_enable_capture(self.pg_interfaces)
2402 capture = self.pg2.get_capture(len(pkts))
2403 self.verify_capture_out(capture, nat_ip1)
2406 pkts = self.create_stream_in(self.pg1, self.pg2)
2407 self.pg1.add_stream(pkts)
2408 self.pg_enable_capture(self.pg_interfaces)
2410 capture = self.pg2.get_capture(len(pkts))
2411 self.verify_capture_out(capture, nat_ip2)
2414 self.pg0.unconfig_ip4()
2415 self.pg1.unconfig_ip4()
2416 self.pg0.set_table_ip4(0)
2417 self.pg1.set_table_ip4(0)
2418 self.pg0.config_ip4()
2419 self.pg1.config_ip4()
2420 self.pg0.resolve_arp()
2421 self.pg1.resolve_arp()
2422 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2423 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2425 def test_vrf_feature_independent(self):
2426 """ NAT44 tenant VRF independent address pool mode """
2428 nat_ip1 = "10.0.0.10"
2429 nat_ip2 = "10.0.0.11"
2431 self.nat44_add_address(nat_ip1)
2432 self.nat44_add_address(nat_ip2, vrf_id=99)
2433 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2434 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2435 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2439 pkts = self.create_stream_in(self.pg0, self.pg2)
2440 self.pg0.add_stream(pkts)
2441 self.pg_enable_capture(self.pg_interfaces)
2443 capture = self.pg2.get_capture(len(pkts))
2444 self.verify_capture_out(capture, nat_ip1)
2447 pkts = self.create_stream_in(self.pg1, self.pg2)
2448 self.pg1.add_stream(pkts)
2449 self.pg_enable_capture(self.pg_interfaces)
2451 capture = self.pg2.get_capture(len(pkts))
2452 self.verify_capture_out(capture, nat_ip1)
2454 def test_dynamic_ipless_interfaces(self):
2455 """ NAT44 interfaces without configured IP address """
2457 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2458 mactobinary(self.pg7.remote_mac),
2459 self.pg7.remote_ip4n,
2461 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2462 mactobinary(self.pg8.remote_mac),
2463 self.pg8.remote_ip4n,
2466 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2467 dst_address_length=32,
2468 next_hop_address=self.pg7.remote_ip4n,
2469 next_hop_sw_if_index=self.pg7.sw_if_index)
2470 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2471 dst_address_length=32,
2472 next_hop_address=self.pg8.remote_ip4n,
2473 next_hop_sw_if_index=self.pg8.sw_if_index)
2475 self.nat44_add_address(self.nat_addr)
2476 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2477 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2481 pkts = self.create_stream_in(self.pg7, self.pg8)
2482 self.pg7.add_stream(pkts)
2483 self.pg_enable_capture(self.pg_interfaces)
2485 capture = self.pg8.get_capture(len(pkts))
2486 self.verify_capture_out(capture)
2489 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2490 self.pg8.add_stream(pkts)
2491 self.pg_enable_capture(self.pg_interfaces)
2493 capture = self.pg7.get_capture(len(pkts))
2494 self.verify_capture_in(capture, self.pg7)
2496 def test_static_ipless_interfaces(self):
2497 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2499 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2500 mactobinary(self.pg7.remote_mac),
2501 self.pg7.remote_ip4n,
2503 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2504 mactobinary(self.pg8.remote_mac),
2505 self.pg8.remote_ip4n,
2508 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2509 dst_address_length=32,
2510 next_hop_address=self.pg7.remote_ip4n,
2511 next_hop_sw_if_index=self.pg7.sw_if_index)
2512 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2513 dst_address_length=32,
2514 next_hop_address=self.pg8.remote_ip4n,
2515 next_hop_sw_if_index=self.pg8.sw_if_index)
2517 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2518 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2519 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2523 pkts = self.create_stream_out(self.pg8)
2524 self.pg8.add_stream(pkts)
2525 self.pg_enable_capture(self.pg_interfaces)
2527 capture = self.pg7.get_capture(len(pkts))
2528 self.verify_capture_in(capture, self.pg7)
2531 pkts = self.create_stream_in(self.pg7, self.pg8)
2532 self.pg7.add_stream(pkts)
2533 self.pg_enable_capture(self.pg_interfaces)
2535 capture = self.pg8.get_capture(len(pkts))
2536 self.verify_capture_out(capture, self.nat_addr, True)
2538 def test_static_with_port_ipless_interfaces(self):
2539 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2541 self.tcp_port_out = 30606
2542 self.udp_port_out = 30607
2543 self.icmp_id_out = 30608
2545 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2546 mactobinary(self.pg7.remote_mac),
2547 self.pg7.remote_ip4n,
2549 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2550 mactobinary(self.pg8.remote_mac),
2551 self.pg8.remote_ip4n,
2554 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2555 dst_address_length=32,
2556 next_hop_address=self.pg7.remote_ip4n,
2557 next_hop_sw_if_index=self.pg7.sw_if_index)
2558 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2559 dst_address_length=32,
2560 next_hop_address=self.pg8.remote_ip4n,
2561 next_hop_sw_if_index=self.pg8.sw_if_index)
2563 self.nat44_add_address(self.nat_addr)
2564 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2565 self.tcp_port_in, self.tcp_port_out,
2566 proto=IP_PROTOS.tcp)
2567 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2568 self.udp_port_in, self.udp_port_out,
2569 proto=IP_PROTOS.udp)
2570 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2571 self.icmp_id_in, self.icmp_id_out,
2572 proto=IP_PROTOS.icmp)
2573 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2574 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2578 pkts = self.create_stream_out(self.pg8)
2579 self.pg8.add_stream(pkts)
2580 self.pg_enable_capture(self.pg_interfaces)
2582 capture = self.pg7.get_capture(len(pkts))
2583 self.verify_capture_in(capture, self.pg7)
2586 pkts = self.create_stream_in(self.pg7, self.pg8)
2587 self.pg7.add_stream(pkts)
2588 self.pg_enable_capture(self.pg_interfaces)
2590 capture = self.pg8.get_capture(len(pkts))
2591 self.verify_capture_out(capture)
2593 def test_static_unknown_proto(self):
2594 """ 1:1 NAT translate packet with unknown protocol """
2595 nat_ip = "10.0.0.10"
2596 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2597 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2598 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2602 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2603 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2605 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2606 TCP(sport=1234, dport=1234))
2607 self.pg0.add_stream(p)
2608 self.pg_enable_capture(self.pg_interfaces)
2610 p = self.pg1.get_capture(1)
2613 self.assertEqual(packet[IP].src, nat_ip)
2614 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2615 self.assertTrue(packet.haslayer(GRE))
2616 self.assert_packet_checksums_valid(packet)
2618 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2622 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2623 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2625 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2626 TCP(sport=1234, dport=1234))
2627 self.pg1.add_stream(p)
2628 self.pg_enable_capture(self.pg_interfaces)
2630 p = self.pg0.get_capture(1)
2633 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2634 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2635 self.assertTrue(packet.haslayer(GRE))
2636 self.assert_packet_checksums_valid(packet)
2638 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2641 def test_hairpinning_static_unknown_proto(self):
2642 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2644 host = self.pg0.remote_hosts[0]
2645 server = self.pg0.remote_hosts[1]
2647 host_nat_ip = "10.0.0.10"
2648 server_nat_ip = "10.0.0.11"
2650 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2651 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2652 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2653 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2657 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2658 IP(src=host.ip4, dst=server_nat_ip) /
2660 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2661 TCP(sport=1234, dport=1234))
2662 self.pg0.add_stream(p)
2663 self.pg_enable_capture(self.pg_interfaces)
2665 p = self.pg0.get_capture(1)
2668 self.assertEqual(packet[IP].src, host_nat_ip)
2669 self.assertEqual(packet[IP].dst, server.ip4)
2670 self.assertTrue(packet.haslayer(GRE))
2671 self.assert_packet_checksums_valid(packet)
2673 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2677 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2678 IP(src=server.ip4, dst=host_nat_ip) /
2680 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2681 TCP(sport=1234, dport=1234))
2682 self.pg0.add_stream(p)
2683 self.pg_enable_capture(self.pg_interfaces)
2685 p = self.pg0.get_capture(1)
2688 self.assertEqual(packet[IP].src, server_nat_ip)
2689 self.assertEqual(packet[IP].dst, host.ip4)
2690 self.assertTrue(packet.haslayer(GRE))
2691 self.assert_packet_checksums_valid(packet)
2693 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2696 def test_output_feature(self):
2697 """ NAT44 interface output feature (in2out postrouting) """
2698 self.nat44_add_address(self.nat_addr)
2699 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2700 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2701 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2705 pkts = self.create_stream_in(self.pg0, self.pg3)
2706 self.pg0.add_stream(pkts)
2707 self.pg_enable_capture(self.pg_interfaces)
2709 capture = self.pg3.get_capture(len(pkts))
2710 self.verify_capture_out(capture)
2713 pkts = self.create_stream_out(self.pg3)
2714 self.pg3.add_stream(pkts)
2715 self.pg_enable_capture(self.pg_interfaces)
2717 capture = self.pg0.get_capture(len(pkts))
2718 self.verify_capture_in(capture, self.pg0)
2720 # from non-NAT interface to NAT inside interface
2721 pkts = self.create_stream_in(self.pg2, self.pg0)
2722 self.pg2.add_stream(pkts)
2723 self.pg_enable_capture(self.pg_interfaces)
2725 capture = self.pg0.get_capture(len(pkts))
2726 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2728 def test_output_feature_vrf_aware(self):
2729 """ NAT44 interface output feature VRF aware (in2out postrouting) """
2730 nat_ip_vrf10 = "10.0.0.10"
2731 nat_ip_vrf20 = "10.0.0.20"
2733 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2734 dst_address_length=32,
2735 next_hop_address=self.pg3.remote_ip4n,
2736 next_hop_sw_if_index=self.pg3.sw_if_index,
2738 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2739 dst_address_length=32,
2740 next_hop_address=self.pg3.remote_ip4n,
2741 next_hop_sw_if_index=self.pg3.sw_if_index,
2744 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2745 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2746 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2747 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2748 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2752 pkts = self.create_stream_in(self.pg4, self.pg3)
2753 self.pg4.add_stream(pkts)
2754 self.pg_enable_capture(self.pg_interfaces)
2756 capture = self.pg3.get_capture(len(pkts))
2757 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2760 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2761 self.pg3.add_stream(pkts)
2762 self.pg_enable_capture(self.pg_interfaces)
2764 capture = self.pg4.get_capture(len(pkts))
2765 self.verify_capture_in(capture, self.pg4)
2768 pkts = self.create_stream_in(self.pg6, self.pg3)
2769 self.pg6.add_stream(pkts)
2770 self.pg_enable_capture(self.pg_interfaces)
2772 capture = self.pg3.get_capture(len(pkts))
2773 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2776 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2777 self.pg3.add_stream(pkts)
2778 self.pg_enable_capture(self.pg_interfaces)
2780 capture = self.pg6.get_capture(len(pkts))
2781 self.verify_capture_in(capture, self.pg6)
2783 def test_output_feature_hairpinning(self):
2784 """ NAT44 interface output feature hairpinning (in2out postrouting) """
2785 host = self.pg0.remote_hosts[0]
2786 server = self.pg0.remote_hosts[1]
2789 server_in_port = 5678
2790 server_out_port = 8765
2792 self.nat44_add_address(self.nat_addr)
2793 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2794 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2797 # add static mapping for server
2798 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2799 server_in_port, server_out_port,
2800 proto=IP_PROTOS.tcp)
2802 # send packet from host to server
2803 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2804 IP(src=host.ip4, dst=self.nat_addr) /
2805 TCP(sport=host_in_port, dport=server_out_port))
2806 self.pg0.add_stream(p)
2807 self.pg_enable_capture(self.pg_interfaces)
2809 capture = self.pg0.get_capture(1)
2814 self.assertEqual(ip.src, self.nat_addr)
2815 self.assertEqual(ip.dst, server.ip4)
2816 self.assertNotEqual(tcp.sport, host_in_port)
2817 self.assertEqual(tcp.dport, server_in_port)
2818 self.assert_packet_checksums_valid(p)
2819 host_out_port = tcp.sport
2821 self.logger.error(ppp("Unexpected or invalid packet:", p))
2824 # send reply from server to host
2825 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2826 IP(src=server.ip4, dst=self.nat_addr) /
2827 TCP(sport=server_in_port, dport=host_out_port))
2828 self.pg0.add_stream(p)
2829 self.pg_enable_capture(self.pg_interfaces)
2831 capture = self.pg0.get_capture(1)
2836 self.assertEqual(ip.src, self.nat_addr)
2837 self.assertEqual(ip.dst, host.ip4)
2838 self.assertEqual(tcp.sport, server_out_port)
2839 self.assertEqual(tcp.dport, host_in_port)
2840 self.assert_packet_checksums_valid(p)
2842 self.logger.error(ppp("Unexpected or invalid packet:", p))
2845 def test_one_armed_nat44(self):
2846 """ One armed NAT44 """
2847 remote_host = self.pg9.remote_hosts[0]
2848 local_host = self.pg9.remote_hosts[1]
2851 self.nat44_add_address(self.nat_addr)
2852 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2853 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2857 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2858 IP(src=local_host.ip4, dst=remote_host.ip4) /
2859 TCP(sport=12345, dport=80))
2860 self.pg9.add_stream(p)
2861 self.pg_enable_capture(self.pg_interfaces)
2863 capture = self.pg9.get_capture(1)
2868 self.assertEqual(ip.src, self.nat_addr)
2869 self.assertEqual(ip.dst, remote_host.ip4)
2870 self.assertNotEqual(tcp.sport, 12345)
2871 external_port = tcp.sport
2872 self.assertEqual(tcp.dport, 80)
2873 self.assert_packet_checksums_valid(p)
2875 self.logger.error(ppp("Unexpected or invalid packet:", p))
2879 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2880 IP(src=remote_host.ip4, dst=self.nat_addr) /
2881 TCP(sport=80, dport=external_port))
2882 self.pg9.add_stream(p)
2883 self.pg_enable_capture(self.pg_interfaces)
2885 capture = self.pg9.get_capture(1)
2890 self.assertEqual(ip.src, remote_host.ip4)
2891 self.assertEqual(ip.dst, local_host.ip4)
2892 self.assertEqual(tcp.sport, 80)
2893 self.assertEqual(tcp.dport, 12345)
2894 self.assert_packet_checksums_valid(p)
2896 self.logger.error(ppp("Unexpected or invalid packet:", p))
2899 def test_del_session(self):
2900 """ Delete NAT44 session """
2901 self.nat44_add_address(self.nat_addr)
2902 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2903 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2906 pkts = self.create_stream_in(self.pg0, self.pg1)
2907 self.pg0.add_stream(pkts)
2908 self.pg_enable_capture(self.pg_interfaces)
2910 self.pg1.get_capture(len(pkts))
2912 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2913 nsessions = len(sessions)
2915 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2916 sessions[0].inside_port,
2917 sessions[0].protocol)
2918 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2919 sessions[1].outside_port,
2920 sessions[1].protocol,
2923 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2924 self.assertEqual(nsessions - len(sessions), 2)
2926 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2927 sessions[0].inside_port,
2928 sessions[0].protocol)
2930 self.verify_no_nat44_user()
2932 def test_set_get_reass(self):
2933 """ NAT44 set/get virtual fragmentation reassembly """
2934 reas_cfg1 = self.vapi.nat_get_reass()
2936 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
2937 max_reass=reas_cfg1.ip4_max_reass * 2,
2938 max_frag=reas_cfg1.ip4_max_frag * 2)
2940 reas_cfg2 = self.vapi.nat_get_reass()
2942 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
2943 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
2944 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
2946 self.vapi.nat_set_reass(drop_frag=1)
2947 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
2949 def test_frag_in_order(self):
2950 """ NAT44 translate fragments arriving in order """
2951 self.nat44_add_address(self.nat_addr)
2952 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2953 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2956 data = "A" * 4 + "B" * 16 + "C" * 3
2957 self.tcp_port_in = random.randint(1025, 65535)
2959 reass = self.vapi.nat_reass_dump()
2960 reass_n_start = len(reass)
2963 pkts = self.create_stream_frag(self.pg0,
2964 self.pg1.remote_ip4,
2968 self.pg0.add_stream(pkts)
2969 self.pg_enable_capture(self.pg_interfaces)
2971 frags = self.pg1.get_capture(len(pkts))
2972 p = self.reass_frags_and_verify(frags,
2974 self.pg1.remote_ip4)
2975 self.assertEqual(p[TCP].dport, 20)
2976 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2977 self.tcp_port_out = p[TCP].sport
2978 self.assertEqual(data, p[Raw].load)
2981 pkts = self.create_stream_frag(self.pg1,
2986 self.pg1.add_stream(pkts)
2987 self.pg_enable_capture(self.pg_interfaces)
2989 frags = self.pg0.get_capture(len(pkts))
2990 p = self.reass_frags_and_verify(frags,
2991 self.pg1.remote_ip4,
2992 self.pg0.remote_ip4)
2993 self.assertEqual(p[TCP].sport, 20)
2994 self.assertEqual(p[TCP].dport, self.tcp_port_in)
2995 self.assertEqual(data, p[Raw].load)
2997 reass = self.vapi.nat_reass_dump()
2998 reass_n_end = len(reass)
3000 self.assertEqual(reass_n_end - reass_n_start, 2)
3002 def test_reass_hairpinning(self):
3003 """ NAT44 fragments hairpinning """
3004 server = self.pg0.remote_hosts[1]
3005 host_in_port = random.randint(1025, 65535)
3006 server_in_port = random.randint(1025, 65535)
3007 server_out_port = random.randint(1025, 65535)
3008 data = "A" * 4 + "B" * 16 + "C" * 3
3010 self.nat44_add_address(self.nat_addr)
3011 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3012 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3014 # add static mapping for server
3015 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3016 server_in_port, server_out_port,
3017 proto=IP_PROTOS.tcp)
3019 # send packet from host to server
3020 pkts = self.create_stream_frag(self.pg0,
3025 self.pg0.add_stream(pkts)
3026 self.pg_enable_capture(self.pg_interfaces)
3028 frags = self.pg0.get_capture(len(pkts))
3029 p = self.reass_frags_and_verify(frags,
3032 self.assertNotEqual(p[TCP].sport, host_in_port)
3033 self.assertEqual(p[TCP].dport, server_in_port)
3034 self.assertEqual(data, p[Raw].load)
3036 def test_frag_out_of_order(self):
3037 """ NAT44 translate fragments arriving out of order """
3038 self.nat44_add_address(self.nat_addr)
3039 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3040 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3043 data = "A" * 4 + "B" * 16 + "C" * 3
3044 random.randint(1025, 65535)
3047 pkts = self.create_stream_frag(self.pg0,
3048 self.pg1.remote_ip4,
3053 self.pg0.add_stream(pkts)
3054 self.pg_enable_capture(self.pg_interfaces)
3056 frags = self.pg1.get_capture(len(pkts))
3057 p = self.reass_frags_and_verify(frags,
3059 self.pg1.remote_ip4)
3060 self.assertEqual(p[TCP].dport, 20)
3061 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3062 self.tcp_port_out = p[TCP].sport
3063 self.assertEqual(data, p[Raw].load)
3066 pkts = self.create_stream_frag(self.pg1,
3072 self.pg1.add_stream(pkts)
3073 self.pg_enable_capture(self.pg_interfaces)
3075 frags = self.pg0.get_capture(len(pkts))
3076 p = self.reass_frags_and_verify(frags,
3077 self.pg1.remote_ip4,
3078 self.pg0.remote_ip4)
3079 self.assertEqual(p[TCP].sport, 20)
3080 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3081 self.assertEqual(data, p[Raw].load)
3083 def test_port_restricted(self):
3084 """ Port restricted NAT44 (MAP-E CE) """
3085 self.nat44_add_address(self.nat_addr)
3086 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3087 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3089 self.vapi.nat_set_addr_and_port_alloc_alg(alg=1,
3094 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3095 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3096 TCP(sport=4567, dport=22))
3097 self.pg0.add_stream(p)
3098 self.pg_enable_capture(self.pg_interfaces)
3100 capture = self.pg1.get_capture(1)
3105 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3106 self.assertEqual(ip.src, self.nat_addr)
3107 self.assertEqual(tcp.dport, 22)
3108 self.assertNotEqual(tcp.sport, 4567)
3109 self.assertEqual((tcp.sport >> 6) & 63, 10)
3110 self.assert_packet_checksums_valid(p)
3112 self.logger.error(ppp("Unexpected or invalid packet:", p))
3115 def test_port_range(self):
3116 """ External address port range """
3117 self.nat44_add_address(self.nat_addr)
3118 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3119 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3121 self.vapi.nat_set_addr_and_port_alloc_alg(alg=2,
3126 for port in range(0, 5):
3127 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3128 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3129 TCP(sport=1125 + port))
3131 self.pg0.add_stream(pkts)
3132 self.pg_enable_capture(self.pg_interfaces)
3134 capture = self.pg1.get_capture(3)
3137 self.assertGreaterEqual(tcp.sport, 1025)
3138 self.assertLessEqual(tcp.sport, 1027)
3140 def test_ipfix_max_frags(self):
3141 """ IPFIX logging maximum fragments pending reassembly exceeded """
3142 self.nat44_add_address(self.nat_addr)
3143 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3144 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3146 self.vapi.nat_set_reass(max_frag=0)
3147 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3148 src_address=self.pg3.local_ip4n,
3150 template_interval=10)
3151 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3152 src_port=self.ipfix_src_port)
3154 data = "A" * 4 + "B" * 16 + "C" * 3
3155 self.tcp_port_in = random.randint(1025, 65535)
3156 pkts = self.create_stream_frag(self.pg0,
3157 self.pg1.remote_ip4,
3161 self.pg0.add_stream(pkts[-1])
3162 self.pg_enable_capture(self.pg_interfaces)
3164 self.pg1.assert_nothing_captured()
3166 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3167 capture = self.pg3.get_capture(9)
3168 ipfix = IPFIXDecoder()
3169 # first load template
3171 self.assertTrue(p.haslayer(IPFIX))
3172 self.assertEqual(p[IP].src, self.pg3.local_ip4)
3173 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3174 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3175 self.assertEqual(p[UDP].dport, 4739)
3176 self.assertEqual(p[IPFIX].observationDomainID,
3177 self.ipfix_domain_id)
3178 if p.haslayer(Template):
3179 ipfix.add_template(p.getlayer(Template))
3180 # verify events in data set
3182 if p.haslayer(Data):
3183 data = ipfix.decode_data_set(p.getlayer(Set))
3184 self.verify_ipfix_max_fragments_ip4(data, 0,
3185 self.pg0.remote_ip4n)
3187 def test_multiple_outside_vrf(self):
3188 """ Multiple outside VRF """
3192 self.pg1.unconfig_ip4()
3193 self.pg2.unconfig_ip4()
3194 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
3195 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
3196 self.pg1.set_table_ip4(vrf_id1)
3197 self.pg2.set_table_ip4(vrf_id2)
3198 self.pg1.config_ip4()
3199 self.pg2.config_ip4()
3200 self.pg1.resolve_arp()
3201 self.pg2.resolve_arp()
3203 self.nat44_add_address(self.nat_addr)
3204 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3205 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3207 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
3212 pkts = self.create_stream_in(self.pg0, self.pg1)
3213 self.pg0.add_stream(pkts)
3214 self.pg_enable_capture(self.pg_interfaces)
3216 capture = self.pg1.get_capture(len(pkts))
3217 self.verify_capture_out(capture, self.nat_addr)
3219 pkts = self.create_stream_out(self.pg1, self.nat_addr)
3220 self.pg1.add_stream(pkts)
3221 self.pg_enable_capture(self.pg_interfaces)
3223 capture = self.pg0.get_capture(len(pkts))
3224 self.verify_capture_in(capture, self.pg0)
3226 self.tcp_port_in = 60303
3227 self.udp_port_in = 60304
3228 self.icmp_id_in = 60305
3231 pkts = self.create_stream_in(self.pg0, self.pg2)
3232 self.pg0.add_stream(pkts)
3233 self.pg_enable_capture(self.pg_interfaces)
3235 capture = self.pg2.get_capture(len(pkts))
3236 self.verify_capture_out(capture, self.nat_addr)
3238 pkts = self.create_stream_out(self.pg2, self.nat_addr)
3239 self.pg2.add_stream(pkts)
3240 self.pg_enable_capture(self.pg_interfaces)
3242 capture = self.pg0.get_capture(len(pkts))
3243 self.verify_capture_in(capture, self.pg0)
3246 self.pg1.unconfig_ip4()
3247 self.pg2.unconfig_ip4()
3248 self.pg1.set_table_ip4(0)
3249 self.pg2.set_table_ip4(0)
3250 self.pg1.config_ip4()
3251 self.pg2.config_ip4()
3252 self.pg1.resolve_arp()
3253 self.pg2.resolve_arp()
3255 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3256 def test_session_timeout(self):
3257 """ NAT44 session timeouts """
3258 self.nat44_add_address(self.nat_addr)
3259 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3260 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3262 self.vapi.nat_set_timeouts(udp=5)
3266 for i in range(0, max_sessions):
3267 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
3268 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3269 IP(src=src, dst=self.pg1.remote_ip4) /
3270 UDP(sport=1025, dport=53))
3272 self.pg0.add_stream(pkts)
3273 self.pg_enable_capture(self.pg_interfaces)
3275 self.pg1.get_capture(max_sessions)
3280 for i in range(0, max_sessions):
3281 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
3282 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3283 IP(src=src, dst=self.pg1.remote_ip4) /
3284 UDP(sport=1026, dport=53))
3286 self.pg0.add_stream(pkts)
3287 self.pg_enable_capture(self.pg_interfaces)
3289 self.pg1.get_capture(max_sessions)
3292 users = self.vapi.nat44_user_dump()
3294 nsessions = nsessions + user.nsessions
3295 self.assertLess(nsessions, 2 * max_sessions)
3298 super(TestNAT44, self).tearDown()
3299 if not self.vpp_dead:
3300 self.logger.info(self.vapi.cli("show nat44 addresses"))
3301 self.logger.info(self.vapi.cli("show nat44 interfaces"))
3302 self.logger.info(self.vapi.cli("show nat44 static mappings"))
3303 self.logger.info(self.vapi.cli("show nat44 interface address"))
3304 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3305 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3306 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
3307 self.logger.info(self.vapi.cli("show nat timeouts"))
3309 self.vapi.cli("show nat addr-port-assignment-alg"))
3311 self.vapi.cli("clear logging")
3314 class TestNAT44EndpointDependent(MethodHolder):
3315 """ Endpoint-Dependent mapping and filtering test cases """
3318 def setUpConstants(cls):
3319 super(TestNAT44EndpointDependent, cls).setUpConstants()
3320 cls.vpp_cmdline.extend(["nat", "{", "endpoint-dependent", "}"])
3323 def setUpClass(cls):
3324 super(TestNAT44EndpointDependent, cls).setUpClass()
3325 cls.vapi.cli("set log class nat level debug")
3327 cls.tcp_port_in = 6303
3328 cls.tcp_port_out = 6303
3329 cls.udp_port_in = 6304
3330 cls.udp_port_out = 6304
3331 cls.icmp_id_in = 6305
3332 cls.icmp_id_out = 6305
3333 cls.nat_addr = '10.0.0.3'
3334 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3335 cls.ipfix_src_port = 4739
3336 cls.ipfix_domain_id = 1
3337 cls.tcp_external_port = 80
3339 cls.create_pg_interfaces(range(7))
3340 cls.interfaces = list(cls.pg_interfaces[0:3])
3342 for i in cls.interfaces:
3347 cls.pg0.generate_remote_hosts(3)
3348 cls.pg0.configure_ipv4_neighbors()
3352 cls.pg4.generate_remote_hosts(2)
3353 cls.pg4.config_ip4()
3354 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
3355 cls.vapi.sw_interface_add_del_address(cls.pg4.sw_if_index,
3359 cls.pg4.resolve_arp()
3360 cls.pg4._remote_hosts[1]._ip4 = cls.pg4._remote_hosts[0]._ip4
3361 cls.pg4.resolve_arp()
3363 zero_ip4n = socket.inet_pton(socket.AF_INET, "0.0.0.0")
3364 cls.vapi.ip_table_add_del(1, is_add=1)
3366 cls.pg5._local_ip4 = "10.1.1.1"
3367 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET,
3369 cls.pg5._remote_hosts[0]._ip4 = "10.1.1.2"
3370 cls.pg5._remote_hosts[0]._ip4n = socket.inet_pton(
3371 socket.AF_INET, cls.pg5.remote_ip4)
3372 cls.pg5.set_table_ip4(1)
3373 cls.pg5.config_ip4()
3375 cls.vapi.ip_add_del_route(dst_address=cls.pg5.remote_ip4n,
3376 dst_address_length=32,
3378 next_hop_sw_if_index=cls.pg5.sw_if_index,
3379 next_hop_address=zero_ip4n)
3381 cls.pg6._local_ip4 = "10.1.2.1"
3382 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET,
3384 cls.pg6._remote_hosts[0]._ip4 = "10.1.2.2"
3385 cls.pg6._remote_hosts[0]._ip4n = socket.inet_pton(
3386 socket.AF_INET, cls.pg6.remote_ip4)
3387 cls.pg6.set_table_ip4(1)
3388 cls.pg6.config_ip4()
3390 cls.vapi.ip_add_del_route(dst_address=cls.pg6.remote_ip4n,
3391 dst_address_length=32,
3393 next_hop_sw_if_index=cls.pg6.sw_if_index,
3394 next_hop_address=zero_ip4n)
3396 cls.vapi.ip_add_del_route(dst_address=cls.pg6.remote_ip4n,
3397 dst_address_length=16,
3398 next_hop_address=zero_ip4n,
3400 next_hop_table_id=1)
3401 cls.vapi.ip_add_del_route(dst_address=zero_ip4n,
3402 dst_address_length=0,
3403 next_hop_address=zero_ip4n,
3405 next_hop_table_id=0)
3406 cls.vapi.ip_add_del_route(dst_address=zero_ip4n,
3407 dst_address_length=0,
3409 next_hop_sw_if_index=cls.pg1.sw_if_index,
3410 next_hop_address=cls.pg1.local_ip4n)
3412 cls.pg5.resolve_arp()
3413 cls.pg6.resolve_arp()
3416 super(TestNAT44EndpointDependent, cls).tearDownClass()
3419 def test_dynamic(self):
3420 """ NAT44 dynamic translation test """
3422 self.nat44_add_address(self.nat_addr)
3423 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3424 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3427 nat_config = self.vapi.nat_show_config()
3428 self.assertEqual(1, nat_config.endpoint_dependent)
3431 pkts = self.create_stream_in(self.pg0, self.pg1)
3432 self.pg0.add_stream(pkts)
3433 self.pg_enable_capture(self.pg_interfaces)
3435 capture = self.pg1.get_capture(len(pkts))
3436 self.verify_capture_out(capture)
3439 pkts = self.create_stream_out(self.pg1)
3440 self.pg1.add_stream(pkts)
3441 self.pg_enable_capture(self.pg_interfaces)
3443 capture = self.pg0.get_capture(len(pkts))
3444 self.verify_capture_in(capture, self.pg0)
3446 def test_forwarding(self):
3447 """ NAT44 forwarding test """
3449 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3450 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3452 self.vapi.nat44_forwarding_enable_disable(1)
3454 real_ip = self.pg0.remote_ip4n
3455 alias_ip = self.nat_addr_n
3456 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
3457 external_ip=alias_ip)
3460 # in2out - static mapping match
3462 pkts = self.create_stream_out(self.pg1)
3463 self.pg1.add_stream(pkts)
3464 self.pg_enable_capture(self.pg_interfaces)
3466 capture = self.pg0.get_capture(len(pkts))
3467 self.verify_capture_in(capture, self.pg0)
3469 pkts = self.create_stream_in(self.pg0, self.pg1)
3470 self.pg0.add_stream(pkts)
3471 self.pg_enable_capture(self.pg_interfaces)
3473 capture = self.pg1.get_capture(len(pkts))
3474 self.verify_capture_out(capture, same_port=True)
3476 # in2out - no static mapping match
3478 host0 = self.pg0.remote_hosts[0]
3479 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
3481 pkts = self.create_stream_out(self.pg1,
3482 dst_ip=self.pg0.remote_ip4,
3483 use_inside_ports=True)
3484 self.pg1.add_stream(pkts)
3485 self.pg_enable_capture(self.pg_interfaces)
3487 capture = self.pg0.get_capture(len(pkts))
3488 self.verify_capture_in(capture, self.pg0)
3490 pkts = self.create_stream_in(self.pg0, self.pg1)
3491 self.pg0.add_stream(pkts)
3492 self.pg_enable_capture(self.pg_interfaces)
3494 capture = self.pg1.get_capture(len(pkts))
3495 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3498 self.pg0.remote_hosts[0] = host0
3500 user = self.pg0.remote_hosts[1]
3501 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
3502 self.assertEqual(len(sessions), 3)
3503 self.assertTrue(sessions[0].ext_host_valid)
3504 self.vapi.nat44_del_session(
3505 sessions[0].inside_ip_address,
3506 sessions[0].inside_port,
3507 sessions[0].protocol,
3508 ext_host_address=sessions[0].ext_host_address,
3509 ext_host_port=sessions[0].ext_host_port)
3510 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
3511 self.assertEqual(len(sessions), 2)
3514 self.vapi.nat44_forwarding_enable_disable(0)
3515 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
3516 external_ip=alias_ip,
3519 def test_static_lb(self):
3520 """ NAT44 local service load balancing """
3521 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3524 server1 = self.pg0.remote_hosts[0]
3525 server2 = self.pg0.remote_hosts[1]
3527 locals = [{'addr': server1.ip4n,
3531 {'addr': server2.ip4n,
3536 self.nat44_add_address(self.nat_addr)
3537 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3540 local_num=len(locals),
3542 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3543 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3546 # from client to service
3547 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3548 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3549 TCP(sport=12345, dport=external_port))
3550 self.pg1.add_stream(p)
3551 self.pg_enable_capture(self.pg_interfaces)
3553 capture = self.pg0.get_capture(1)
3559 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3560 if ip.dst == server1.ip4:
3564 self.assertEqual(tcp.dport, local_port)
3565 self.assert_packet_checksums_valid(p)
3567 self.logger.error(ppp("Unexpected or invalid packet:", p))
3570 # from service back to client
3571 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3572 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
3573 TCP(sport=local_port, dport=12345))
3574 self.pg0.add_stream(p)
3575 self.pg_enable_capture(self.pg_interfaces)
3577 capture = self.pg1.get_capture(1)
3582 self.assertEqual(ip.src, self.nat_addr)
3583 self.assertEqual(tcp.sport, external_port)
3584 self.assert_packet_checksums_valid(p)
3586 self.logger.error(ppp("Unexpected or invalid packet:", p))
3589 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3590 self.assertEqual(len(sessions), 1)
3591 self.assertTrue(sessions[0].ext_host_valid)
3592 self.vapi.nat44_del_session(
3593 sessions[0].inside_ip_address,
3594 sessions[0].inside_port,
3595 sessions[0].protocol,
3596 ext_host_address=sessions[0].ext_host_address,
3597 ext_host_port=sessions[0].ext_host_port)
3598 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3599 self.assertEqual(len(sessions), 0)
3601 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3602 def test_static_lb_multi_clients(self):
3603 """ NAT44 local service load balancing - multiple clients"""
3605 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3608 server1 = self.pg0.remote_hosts[0]
3609 server2 = self.pg0.remote_hosts[1]
3611 locals = [{'addr': server1.ip4n,
3615 {'addr': server2.ip4n,
3620 self.nat44_add_address(self.nat_addr)
3621 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3624 local_num=len(locals),
3626 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3627 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3632 clients = ip4_range(self.pg1.remote_ip4, 10, 50)
3634 for client in clients:
3635 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3636 IP(src=client, dst=self.nat_addr) /
3637 TCP(sport=12345, dport=external_port))
3639 self.pg1.add_stream(pkts)
3640 self.pg_enable_capture(self.pg_interfaces)
3642 capture = self.pg0.get_capture(len(pkts))
3644 if p[IP].dst == server1.ip4:
3648 self.assertTrue(server1_n > server2_n)
3650 def test_static_lb_2(self):
3651 """ NAT44 local service load balancing (asymmetrical rule) """
3652 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3655 server1 = self.pg0.remote_hosts[0]
3656 server2 = self.pg0.remote_hosts[1]
3658 locals = [{'addr': server1.ip4n,
3662 {'addr': server2.ip4n,
3667 self.vapi.nat44_forwarding_enable_disable(1)
3668 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3672 local_num=len(locals),
3674 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3675 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3678 # from client to service
3679 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3680 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3681 TCP(sport=12345, dport=external_port))
3682 self.pg1.add_stream(p)
3683 self.pg_enable_capture(self.pg_interfaces)
3685 capture = self.pg0.get_capture(1)
3691 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3692 if ip.dst == server1.ip4:
3696 self.assertEqual(tcp.dport, local_port)
3697 self.assert_packet_checksums_valid(p)
3699 self.logger.error(ppp("Unexpected or invalid packet:", p))
3702 # from service back to client
3703 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3704 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
3705 TCP(sport=local_port, dport=12345))
3706 self.pg0.add_stream(p)
3707 self.pg_enable_capture(self.pg_interfaces)
3709 capture = self.pg1.get_capture(1)
3714 self.assertEqual(ip.src, self.nat_addr)
3715 self.assertEqual(tcp.sport, external_port)
3716 self.assert_packet_checksums_valid(p)
3718 self.logger.error(ppp("Unexpected or invalid packet:", p))
3721 # from client to server (no translation)
3722 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3723 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
3724 TCP(sport=12346, dport=local_port))
3725 self.pg1.add_stream(p)
3726 self.pg_enable_capture(self.pg_interfaces)
3728 capture = self.pg0.get_capture(1)
3734 self.assertEqual(ip.dst, server1.ip4)
3735 self.assertEqual(tcp.dport, local_port)
3736 self.assert_packet_checksums_valid(p)
3738 self.logger.error(ppp("Unexpected or invalid packet:", p))
3741 # from service back to client (no translation)
3742 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
3743 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
3744 TCP(sport=local_port, dport=12346))
3745 self.pg0.add_stream(p)
3746 self.pg_enable_capture(self.pg_interfaces)
3748 capture = self.pg1.get_capture(1)
3753 self.assertEqual(ip.src, server1.ip4)
3754 self.assertEqual(tcp.sport, local_port)
3755 self.assert_packet_checksums_valid(p)
3757 self.logger.error(ppp("Unexpected or invalid packet:", p))
3760 def test_lb_affinity(self):
3761 """ NAT44 local service load balancing affinity """
3762 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3765 server1 = self.pg0.remote_hosts[0]
3766 server2 = self.pg0.remote_hosts[1]
3768 locals = [{'addr': server1.ip4n,
3772 {'addr': server2.ip4n,
3777 self.nat44_add_address(self.nat_addr)
3778 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3782 local_num=len(locals),
3784 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3785 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3788 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3789 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3790 TCP(sport=1025, dport=external_port))
3791 self.pg1.add_stream(p)
3792 self.pg_enable_capture(self.pg_interfaces)
3794 capture = self.pg0.get_capture(1)
3795 backend = capture[0][IP].dst
3797 sessions = self.vapi.nat44_user_session_dump(
3798 socket.inet_pton(socket.AF_INET, backend), 0)
3799 self.assertEqual(len(sessions), 1)
3800 self.assertTrue(sessions[0].ext_host_valid)
3801 self.vapi.nat44_del_session(
3802 sessions[0].inside_ip_address,
3803 sessions[0].inside_port,
3804 sessions[0].protocol,
3805 ext_host_address=sessions[0].ext_host_address,
3806 ext_host_port=sessions[0].ext_host_port)
3809 for port in range(1030, 1100):
3810 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3811 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3812 TCP(sport=port, dport=external_port))
3814 self.pg1.add_stream(pkts)
3815 self.pg_enable_capture(self.pg_interfaces)
3817 capture = self.pg0.get_capture(len(pkts))
3819 self.assertEqual(p[IP].dst, backend)
3821 def test_unknown_proto(self):
3822 """ NAT44 translate packet with unknown protocol """
3823 self.nat44_add_address(self.nat_addr)
3824 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3825 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3829 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3830 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3831 TCP(sport=self.tcp_port_in, dport=20))
3832 self.pg0.add_stream(p)
3833 self.pg_enable_capture(self.pg_interfaces)
3835 p = self.pg1.get_capture(1)
3837 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3838 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3840 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3841 TCP(sport=1234, dport=1234))
3842 self.pg0.add_stream(p)
3843 self.pg_enable_capture(self.pg_interfaces)
3845 p = self.pg1.get_capture(1)
3848 self.assertEqual(packet[IP].src, self.nat_addr)
3849 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3850 self.assertTrue(packet.haslayer(GRE))
3851 self.assert_packet_checksums_valid(packet)
3853 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3857 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3858 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3860 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3861 TCP(sport=1234, dport=1234))
3862 self.pg1.add_stream(p)
3863 self.pg_enable_capture(self.pg_interfaces)
3865 p = self.pg0.get_capture(1)
3868 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3869 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3870 self.assertTrue(packet.haslayer(GRE))
3871 self.assert_packet_checksums_valid(packet)
3873 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3876 def test_hairpinning_unknown_proto(self):
3877 """ NAT44 translate packet with unknown protocol - hairpinning """
3878 host = self.pg0.remote_hosts[0]
3879 server = self.pg0.remote_hosts[1]
3881 server_out_port = 8765
3882 server_nat_ip = "10.0.0.11"
3884 self.nat44_add_address(self.nat_addr)
3885 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3886 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3889 # add static mapping for server
3890 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3893 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3894 IP(src=host.ip4, dst=server_nat_ip) /
3895 TCP(sport=host_in_port, dport=server_out_port))
3896 self.pg0.add_stream(p)
3897 self.pg_enable_capture(self.pg_interfaces)
3899 self.pg0.get_capture(1)
3901 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3902 IP(src=host.ip4, dst=server_nat_ip) /
3904 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3905 TCP(sport=1234, dport=1234))
3906 self.pg0.add_stream(p)
3907 self.pg_enable_capture(self.pg_interfaces)
3909 p = self.pg0.get_capture(1)
3912 self.assertEqual(packet[IP].src, self.nat_addr)
3913 self.assertEqual(packet[IP].dst, server.ip4)
3914 self.assertTrue(packet.haslayer(GRE))
3915 self.assert_packet_checksums_valid(packet)
3917 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3921 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3922 IP(src=server.ip4, dst=self.nat_addr) /
3924 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3925 TCP(sport=1234, dport=1234))
3926 self.pg0.add_stream(p)
3927 self.pg_enable_capture(self.pg_interfaces)
3929 p = self.pg0.get_capture(1)
3932 self.assertEqual(packet[IP].src, server_nat_ip)
3933 self.assertEqual(packet[IP].dst, host.ip4)
3934 self.assertTrue(packet.haslayer(GRE))
3935 self.assert_packet_checksums_valid(packet)
3937 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3940 def test_output_feature_and_service(self):
3941 """ NAT44 interface output feature and services """
3942 external_addr = '1.2.3.4'
3946 self.vapi.nat44_forwarding_enable_disable(1)
3947 self.nat44_add_address(self.nat_addr)
3948 self.vapi.nat44_add_del_identity_mapping(ip=self.pg1.remote_ip4n)
3949 self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
3950 local_port, external_port,
3951 proto=IP_PROTOS.tcp, out2in_only=1)
3952 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3953 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3955 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3958 # from client to service
3959 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3960 IP(src=self.pg1.remote_ip4, dst=external_addr) /
3961 TCP(sport=12345, dport=external_port))
3962 self.pg1.add_stream(p)
3963 self.pg_enable_capture(self.pg_interfaces)
3965 capture = self.pg0.get_capture(1)
3970 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3971 self.assertEqual(tcp.dport, local_port)
3972 self.assert_packet_checksums_valid(p)
3974 self.logger.error(ppp("Unexpected or invalid packet:", p))
3977 # from service back to client
3978 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3979 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3980 TCP(sport=local_port, dport=12345))
3981 self.pg0.add_stream(p)
3982 self.pg_enable_capture(self.pg_interfaces)
3984 capture = self.pg1.get_capture(1)
3989 self.assertEqual(ip.src, external_addr)
3990 self.assertEqual(tcp.sport, external_port)
3991 self.assert_packet_checksums_valid(p)
3993 self.logger.error(ppp("Unexpected or invalid packet:", p))
3996 # from local network host to external network
3997 pkts = self.create_stream_in(self.pg0, self.pg1)
3998 self.pg0.add_stream(pkts)
3999 self.pg_enable_capture(self.pg_interfaces)
4001 capture = self.pg1.get_capture(len(pkts))
4002 self.verify_capture_out(capture)
4003 pkts = self.create_stream_in(self.pg0, self.pg1)
4004 self.pg0.add_stream(pkts)
4005 self.pg_enable_capture(self.pg_interfaces)
4007 capture = self.pg1.get_capture(len(pkts))
4008 self.verify_capture_out(capture)
4010 # from external network back to local network host
4011 pkts = self.create_stream_out(self.pg1)
4012 self.pg1.add_stream(pkts)
4013 self.pg_enable_capture(self.pg_interfaces)
4015 capture = self.pg0.get_capture(len(pkts))
4016 self.verify_capture_in(capture, self.pg0)
4018 def test_output_feature_and_service2(self):
4019 """ NAT44 interface output feature and service host direct access """
4020 self.vapi.nat44_forwarding_enable_disable(1)
4021 self.nat44_add_address(self.nat_addr)
4022 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4025 # session initiaded from service host - translate
4026 pkts = self.create_stream_in(self.pg0, self.pg1)
4027 self.pg0.add_stream(pkts)
4028 self.pg_enable_capture(self.pg_interfaces)
4030 capture = self.pg1.get_capture(len(pkts))
4031 self.verify_capture_out(capture)
4033 pkts = self.create_stream_out(self.pg1)
4034 self.pg1.add_stream(pkts)
4035 self.pg_enable_capture(self.pg_interfaces)
4037 capture = self.pg0.get_capture(len(pkts))
4038 self.verify_capture_in(capture, self.pg0)
4040 # session initiaded from remote host - do not translate
4041 self.tcp_port_in = 60303
4042 self.udp_port_in = 60304
4043 self.icmp_id_in = 60305
4044 pkts = self.create_stream_out(self.pg1,
4045 self.pg0.remote_ip4,
4046 use_inside_ports=True)
4047 self.pg1.add_stream(pkts)
4048 self.pg_enable_capture(self.pg_interfaces)
4050 capture = self.pg0.get_capture(len(pkts))
4051 self.verify_capture_in(capture, self.pg0)
4053 pkts = self.create_stream_in(self.pg0, self.pg1)
4054 self.pg0.add_stream(pkts)
4055 self.pg_enable_capture(self.pg_interfaces)
4057 capture = self.pg1.get_capture(len(pkts))
4058 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
4061 def test_output_feature_and_service3(self):
4062 """ NAT44 interface output feature and DST NAT """
4063 external_addr = '1.2.3.4'
4067 self.vapi.nat44_forwarding_enable_disable(1)
4068 self.nat44_add_address(self.nat_addr)
4069 self.nat44_add_static_mapping(self.pg1.remote_ip4, external_addr,
4070 local_port, external_port,
4071 proto=IP_PROTOS.tcp, out2in_only=1)
4072 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4073 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4075 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4078 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4079 IP(src=self.pg0.remote_ip4, dst=external_addr) /
4080 TCP(sport=12345, dport=external_port))
4081 self.pg0.add_stream(p)
4082 self.pg_enable_capture(self.pg_interfaces)
4084 capture = self.pg1.get_capture(1)
4089 self.assertEqual(ip.src, self.pg0.remote_ip4)
4090 self.assertEqual(tcp.sport, 12345)
4091 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4092 self.assertEqual(tcp.dport, local_port)
4093 self.assert_packet_checksums_valid(p)
4095 self.logger.error(ppp("Unexpected or invalid packet:", p))
4098 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4099 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
4100 TCP(sport=local_port, dport=12345))
4101 self.pg1.add_stream(p)
4102 self.pg_enable_capture(self.pg_interfaces)
4104 capture = self.pg0.get_capture(1)
4109 self.assertEqual(ip.src, external_addr)
4110 self.assertEqual(tcp.sport, external_port)
4111 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4112 self.assertEqual(tcp.dport, 12345)
4113 self.assert_packet_checksums_valid(p)
4115 self.logger.error(ppp("Unexpected or invalid packet:", p))
4118 def test_next_src_nat(self):
4119 """ On way back forward packet to nat44-in2out node. """
4120 twice_nat_addr = '10.0.1.3'
4123 post_twice_nat_port = 0
4125 self.vapi.nat44_forwarding_enable_disable(1)
4126 self.nat44_add_address(twice_nat_addr, twice_nat=1)
4127 self.nat44_add_static_mapping(self.pg6.remote_ip4, self.pg1.remote_ip4,
4128 local_port, external_port,
4129 proto=IP_PROTOS.tcp, out2in_only=1,
4130 self_twice_nat=1, vrf_id=1)
4131 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index,
4134 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4135 IP(src=self.pg6.remote_ip4, dst=self.pg1.remote_ip4) /
4136 TCP(sport=12345, dport=external_port))
4137 self.pg6.add_stream(p)
4138 self.pg_enable_capture(self.pg_interfaces)
4140 capture = self.pg6.get_capture(1)
4145 self.assertEqual(ip.src, twice_nat_addr)
4146 self.assertNotEqual(tcp.sport, 12345)
4147 post_twice_nat_port = tcp.sport
4148 self.assertEqual(ip.dst, self.pg6.remote_ip4)
4149 self.assertEqual(tcp.dport, local_port)
4150 self.assert_packet_checksums_valid(p)
4152 self.logger.error(ppp("Unexpected or invalid packet:", p))
4155 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4156 IP(src=self.pg6.remote_ip4, dst=twice_nat_addr) /
4157 TCP(sport=local_port, dport=post_twice_nat_port))
4158 self.pg6.add_stream(p)
4159 self.pg_enable_capture(self.pg_interfaces)
4161 capture = self.pg6.get_capture(1)
4166 self.assertEqual(ip.src, self.pg1.remote_ip4)
4167 self.assertEqual(tcp.sport, external_port)
4168 self.assertEqual(ip.dst, self.pg6.remote_ip4)
4169 self.assertEqual(tcp.dport, 12345)
4170 self.assert_packet_checksums_valid(p)
4172 self.logger.error(ppp("Unexpected or invalid packet:", p))
4175 def twice_nat_common(self, self_twice_nat=False, same_pg=False, lb=False,
4177 twice_nat_addr = '10.0.1.3'
4185 port_in1 = port_in+1
4186 port_in2 = port_in+2
4191 server1 = self.pg0.remote_hosts[0]
4192 server2 = self.pg0.remote_hosts[1]
4204 eh_translate = ((not self_twice_nat) or (not lb and same_pg) or
4207 self.nat44_add_address(self.nat_addr)
4208 self.nat44_add_address(twice_nat_addr, twice_nat=1)
4210 self.nat44_add_static_mapping(pg0.remote_ip4, self.nat_addr,
4212 proto=IP_PROTOS.tcp,
4213 twice_nat=int(not self_twice_nat),
4214 self_twice_nat=int(self_twice_nat))
4216 locals = [{'addr': server1.ip4n,
4220 {'addr': server2.ip4n,
4224 out_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
4225 self.vapi.nat44_add_del_lb_static_mapping(out_addr_n,
4229 not self_twice_nat),
4232 local_num=len(locals),
4234 self.vapi.nat44_interface_add_del_feature(pg0.sw_if_index)
4235 self.vapi.nat44_interface_add_del_feature(pg1.sw_if_index,
4242 assert client_id is not None
4244 client = self.pg0.remote_hosts[0]
4245 elif client_id == 2:
4246 client = self.pg0.remote_hosts[1]
4248 client = pg1.remote_hosts[0]
4249 p = (Ether(src=pg1.remote_mac, dst=pg1.local_mac) /
4250 IP(src=client.ip4, dst=self.nat_addr) /
4251 TCP(sport=eh_port_out, dport=port_out))
4253 self.pg_enable_capture(self.pg_interfaces)
4255 capture = pg0.get_capture(1)
4261 if ip.dst == server1.ip4:
4267 self.assertEqual(ip.dst, server.ip4)
4269 self.assertIn(tcp.dport, [port_in1, port_in2])
4271 self.assertEqual(tcp.dport, port_in)
4273 self.assertEqual(ip.src, twice_nat_addr)
4274 self.assertNotEqual(tcp.sport, eh_port_out)
4276 self.assertEqual(ip.src, client.ip4)
4277 self.assertEqual(tcp.sport, eh_port_out)
4279 eh_port_in = tcp.sport
4280 saved_port_in = tcp.dport
4281 self.assert_packet_checksums_valid(p)
4283 self.logger.error(ppp("Unexpected or invalid packet:", p))
4286 p = (Ether(src=server.mac, dst=pg0.local_mac) /
4287 IP(src=server.ip4, dst=eh_addr_in) /
4288 TCP(sport=saved_port_in, dport=eh_port_in))
4290 self.pg_enable_capture(self.pg_interfaces)
4292 capture = pg1.get_capture(1)
4297 self.assertEqual(ip.dst, client.ip4)
4298 self.assertEqual(ip.src, self.nat_addr)
4299 self.assertEqual(tcp.dport, eh_port_out)
4300 self.assertEqual(tcp.sport, port_out)
4301 self.assert_packet_checksums_valid(p)
4303 self.logger.error(ppp("Unexpected or invalid packet:", p))
4307 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
4308 self.assertEqual(len(sessions), 1)
4309 self.assertTrue(sessions[0].ext_host_valid)
4310 self.assertTrue(sessions[0].is_twicenat)
4311 self.vapi.nat44_del_session(
4312 sessions[0].inside_ip_address,
4313 sessions[0].inside_port,
4314 sessions[0].protocol,
4315 ext_host_address=sessions[0].ext_host_nat_address,
4316 ext_host_port=sessions[0].ext_host_nat_port)
4317 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
4318 self.assertEqual(len(sessions), 0)
4320 def test_twice_nat(self):
4322 self.twice_nat_common()
4324 def test_self_twice_nat_positive(self):
4325 """ Self Twice NAT44 (positive test) """
4326 self.twice_nat_common(self_twice_nat=True, same_pg=True)
4328 def test_self_twice_nat_negative(self):
4329 """ Self Twice NAT44 (negative test) """
4330 self.twice_nat_common(self_twice_nat=True)
4332 def test_twice_nat_lb(self):
4333 """ Twice NAT44 local service load balancing """
4334 self.twice_nat_common(lb=True)
4336 def test_self_twice_nat_lb_positive(self):
4337 """ Self Twice NAT44 local service load balancing (positive test) """
4338 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4341 def test_self_twice_nat_lb_negative(self):
4342 """ Self Twice NAT44 local service load balancing (negative test) """
4343 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4346 def test_twice_nat_interface_addr(self):
4347 """ Acquire twice NAT44 addresses from interface """
4348 self.vapi.nat44_add_interface_addr(self.pg3.sw_if_index, twice_nat=1)
4350 # no address in NAT pool
4351 adresses = self.vapi.nat44_address_dump()
4352 self.assertEqual(0, len(adresses))
4354 # configure interface address and check NAT address pool
4355 self.pg3.config_ip4()
4356 adresses = self.vapi.nat44_address_dump()
4357 self.assertEqual(1, len(adresses))
4358 self.assertEqual(adresses[0].ip_address[0:4], self.pg3.local_ip4n)
4359 self.assertEqual(adresses[0].twice_nat, 1)
4361 # remove interface address and check NAT address pool
4362 self.pg3.unconfig_ip4()
4363 adresses = self.vapi.nat44_address_dump()
4364 self.assertEqual(0, len(adresses))
4366 def test_tcp_session_close_in(self):
4367 """ Close TCP session from inside network """
4368 self.tcp_port_out = 10505
4369 self.nat44_add_address(self.nat_addr)
4370 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4374 proto=IP_PROTOS.tcp,
4376 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4377 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4380 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4381 start_sessnum = len(sessions)
4383 self.initiate_tcp_session(self.pg0, self.pg1)
4385 # FIN packet in -> out
4386 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4387 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4388 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4389 flags="FA", seq=100, ack=300))
4390 self.pg0.add_stream(p)
4391 self.pg_enable_capture(self.pg_interfaces)
4393 self.pg1.get_capture(1)
4397 # ACK packet out -> in
4398 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4399 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4400 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4401 flags="A", seq=300, ack=101))
4404 # FIN packet out -> in
4405 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4406 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4407 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4408 flags="FA", seq=300, ack=101))
4411 self.pg1.add_stream(pkts)
4412 self.pg_enable_capture(self.pg_interfaces)
4414 self.pg0.get_capture(2)
4416 # ACK packet in -> out
4417 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4418 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4419 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4420 flags="A", seq=101, ack=301))
4421 self.pg0.add_stream(p)
4422 self.pg_enable_capture(self.pg_interfaces)
4424 self.pg1.get_capture(1)
4426 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4428 self.assertEqual(len(sessions) - start_sessnum, 0)
4430 def test_tcp_session_close_out(self):
4431 """ Close TCP session from outside network """
4432 self.tcp_port_out = 10505
4433 self.nat44_add_address(self.nat_addr)
4434 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4438 proto=IP_PROTOS.tcp,
4440 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4441 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4444 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4445 start_sessnum = len(sessions)
4447 self.initiate_tcp_session(self.pg0, self.pg1)
4449 # FIN packet out -> in
4450 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4451 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4452 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4453 flags="FA", seq=100, ack=300))
4454 self.pg1.add_stream(p)
4455 self.pg_enable_capture(self.pg_interfaces)
4457 self.pg0.get_capture(1)
4459 # FIN+ACK packet in -> out
4460 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4461 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4462 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4463 flags="FA", seq=300, ack=101))
4465 self.pg0.add_stream(p)
4466 self.pg_enable_capture(self.pg_interfaces)
4468 self.pg1.get_capture(1)
4470 # ACK packet out -> in
4471 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4472 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4473 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4474 flags="A", seq=101, ack=301))
4475 self.pg1.add_stream(p)
4476 self.pg_enable_capture(self.pg_interfaces)
4478 self.pg0.get_capture(1)
4480 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4482 self.assertEqual(len(sessions) - start_sessnum, 0)
4484 def test_tcp_session_close_simultaneous(self):
4485 """ Close TCP session from inside network """
4486 self.tcp_port_out = 10505
4487 self.nat44_add_address(self.nat_addr)
4488 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4492 proto=IP_PROTOS.tcp,
4494 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4495 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4498 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4499 start_sessnum = len(sessions)
4501 self.initiate_tcp_session(self.pg0, self.pg1)
4503 # FIN packet in -> out
4504 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4505 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4506 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4507 flags="FA", seq=100, ack=300))
4508 self.pg0.add_stream(p)
4509 self.pg_enable_capture(self.pg_interfaces)
4511 self.pg1.get_capture(1)
4513 # FIN packet out -> in
4514 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4515 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4516 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4517 flags="FA", seq=300, ack=100))
4518 self.pg1.add_stream(p)
4519 self.pg_enable_capture(self.pg_interfaces)
4521 self.pg0.get_capture(1)
4523 # ACK packet in -> out
4524 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4525 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4526 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4527 flags="A", seq=101, ack=301))
4528 self.pg0.add_stream(p)
4529 self.pg_enable_capture(self.pg_interfaces)
4531 self.pg1.get_capture(1)
4533 # ACK packet out -> in
4534 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4535 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4536 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4537 flags="A", seq=301, ack=101))
4538 self.pg1.add_stream(p)
4539 self.pg_enable_capture(self.pg_interfaces)
4541 self.pg0.get_capture(1)
4543 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4545 self.assertEqual(len(sessions) - start_sessnum, 0)
4547 def test_one_armed_nat44_static(self):
4548 """ One armed NAT44 and 1:1 NAPT asymmetrical rule """
4549 remote_host = self.pg4.remote_hosts[0]
4550 local_host = self.pg4.remote_hosts[1]
4555 self.vapi.nat44_forwarding_enable_disable(1)
4556 self.nat44_add_address(self.nat_addr, twice_nat=1)
4557 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
4558 local_port, external_port,
4559 proto=IP_PROTOS.tcp, out2in_only=1,
4561 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
4562 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index,
4565 # from client to service
4566 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
4567 IP(src=remote_host.ip4, dst=self.nat_addr) /
4568 TCP(sport=12345, dport=external_port))
4569 self.pg4.add_stream(p)
4570 self.pg_enable_capture(self.pg_interfaces)
4572 capture = self.pg4.get_capture(1)
4577 self.assertEqual(ip.dst, local_host.ip4)
4578 self.assertEqual(ip.src, self.nat_addr)
4579 self.assertEqual(tcp.dport, local_port)
4580 self.assertNotEqual(tcp.sport, 12345)
4581 eh_port_in = tcp.sport
4582 self.assert_packet_checksums_valid(p)
4584 self.logger.error(ppp("Unexpected or invalid packet:", p))
4587 # from service back to client
4588 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
4589 IP(src=local_host.ip4, dst=self.nat_addr) /
4590 TCP(sport=local_port, dport=eh_port_in))
4591 self.pg4.add_stream(p)
4592 self.pg_enable_capture(self.pg_interfaces)
4594 capture = self.pg4.get_capture(1)
4599 self.assertEqual(ip.src, self.nat_addr)
4600 self.assertEqual(ip.dst, remote_host.ip4)
4601 self.assertEqual(tcp.sport, external_port)
4602 self.assertEqual(tcp.dport, 12345)
4603 self.assert_packet_checksums_valid(p)
4605 self.logger.error(ppp("Unexpected or invalid packet:", p))
4608 def test_static_with_port_out2(self):
4609 """ 1:1 NAPT asymmetrical rule """
4614 self.vapi.nat44_forwarding_enable_disable(1)
4615 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
4616 local_port, external_port,
4617 proto=IP_PROTOS.tcp, out2in_only=1)
4618 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4619 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4622 # from client to service
4623 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4624 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4625 TCP(sport=12345, dport=external_port))
4626 self.pg1.add_stream(p)
4627 self.pg_enable_capture(self.pg_interfaces)
4629 capture = self.pg0.get_capture(1)
4634 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4635 self.assertEqual(tcp.dport, local_port)
4636 self.assert_packet_checksums_valid(p)
4638 self.logger.error(ppp("Unexpected or invalid packet:", p))
4642 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4643 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4644 ICMP(type=11) / capture[0][IP])
4645 self.pg0.add_stream(p)
4646 self.pg_enable_capture(self.pg_interfaces)
4648 capture = self.pg1.get_capture(1)
4651 self.assertEqual(p[IP].src, self.nat_addr)
4653 self.assertEqual(inner.dst, self.nat_addr)
4654 self.assertEqual(inner[TCPerror].dport, external_port)
4656 self.logger.error(ppp("Unexpected or invalid packet:", p))
4659 # from service back to client
4660 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4661 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4662 TCP(sport=local_port, dport=12345))
4663 self.pg0.add_stream(p)
4664 self.pg_enable_capture(self.pg_interfaces)
4666 capture = self.pg1.get_capture(1)
4671 self.assertEqual(ip.src, self.nat_addr)
4672 self.assertEqual(tcp.sport, external_port)
4673 self.assert_packet_checksums_valid(p)
4675 self.logger.error(ppp("Unexpected or invalid packet:", p))
4679 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4680 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4681 ICMP(type=11) / capture[0][IP])
4682 self.pg1.add_stream(p)
4683 self.pg_enable_capture(self.pg_interfaces)
4685 capture = self.pg0.get_capture(1)
4688 self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
4690 self.assertEqual(inner.src, self.pg0.remote_ip4)
4691 self.assertEqual(inner[TCPerror].sport, local_port)
4693 self.logger.error(ppp("Unexpected or invalid packet:", p))
4696 # from client to server (no translation)
4697 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4698 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
4699 TCP(sport=12346, dport=local_port))
4700 self.pg1.add_stream(p)
4701 self.pg_enable_capture(self.pg_interfaces)
4703 capture = self.pg0.get_capture(1)
4708 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4709 self.assertEqual(tcp.dport, local_port)
4710 self.assert_packet_checksums_valid(p)
4712 self.logger.error(ppp("Unexpected or invalid packet:", p))
4715 # from service back to client (no translation)
4716 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4717 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4718 TCP(sport=local_port, dport=12346))
4719 self.pg0.add_stream(p)
4720 self.pg_enable_capture(self.pg_interfaces)
4722 capture = self.pg1.get_capture(1)
4727 self.assertEqual(ip.src, self.pg0.remote_ip4)
4728 self.assertEqual(tcp.sport, local_port)
4729 self.assert_packet_checksums_valid(p)
4731 self.logger.error(ppp("Unexpected or invalid packet:", p))
4734 def test_output_feature(self):
4735 """ NAT44 interface output feature (in2out postrouting) """
4736 self.vapi.nat44_forwarding_enable_disable(1)
4737 self.nat44_add_address(self.nat_addr)
4738 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4740 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4744 pkts = self.create_stream_in(self.pg0, self.pg1)
4745 self.pg0.add_stream(pkts)
4746 self.pg_enable_capture(self.pg_interfaces)
4748 capture = self.pg1.get_capture(len(pkts))
4749 self.verify_capture_out(capture)
4752 pkts = self.create_stream_out(self.pg1)
4753 self.pg1.add_stream(pkts)
4754 self.pg_enable_capture(self.pg_interfaces)
4756 capture = self.pg0.get_capture(len(pkts))
4757 self.verify_capture_in(capture, self.pg0)
4759 def test_multiple_vrf(self):
4760 """ Multiple VRF setup """
4761 external_addr = '1.2.3.4'
4766 self.vapi.nat44_forwarding_enable_disable(1)
4767 self.nat44_add_address(self.nat_addr)
4768 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4769 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4771 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4773 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
4774 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index,
4776 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index,
4778 self.nat44_add_static_mapping(self.pg5.remote_ip4, external_addr,
4779 local_port, external_port, vrf_id=1,
4780 proto=IP_PROTOS.tcp, out2in_only=1)
4781 self.nat44_add_static_mapping(
4782 self.pg0.remote_ip4, external_sw_if_index=self.pg0.sw_if_index,
4783 local_port=local_port, vrf_id=0, external_port=external_port,
4784 proto=IP_PROTOS.tcp, out2in_only=1)
4786 # from client to service (both VRF1)
4787 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4788 IP(src=self.pg6.remote_ip4, dst=external_addr) /
4789 TCP(sport=12345, dport=external_port))
4790 self.pg6.add_stream(p)
4791 self.pg_enable_capture(self.pg_interfaces)
4793 capture = self.pg5.get_capture(1)
4798 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4799 self.assertEqual(tcp.dport, local_port)
4800 self.assert_packet_checksums_valid(p)
4802 self.logger.error(ppp("Unexpected or invalid packet:", p))
4805 # from service back to client (both VRF1)
4806 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4807 IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) /
4808 TCP(sport=local_port, dport=12345))
4809 self.pg5.add_stream(p)
4810 self.pg_enable_capture(self.pg_interfaces)
4812 capture = self.pg6.get_capture(1)
4817 self.assertEqual(ip.src, external_addr)
4818 self.assertEqual(tcp.sport, external_port)
4819 self.assert_packet_checksums_valid(p)
4821 self.logger.error(ppp("Unexpected or invalid packet:", p))
4824 # dynamic NAT from VRF1 to VRF0 (output-feature)
4825 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4826 IP(src=self.pg5.remote_ip4, dst=self.pg1.remote_ip4) /
4827 TCP(sport=2345, dport=22))
4828 self.pg5.add_stream(p)
4829 self.pg_enable_capture(self.pg_interfaces)
4831 capture = self.pg1.get_capture(1)
4836 self.assertEqual(ip.src, self.nat_addr)
4837 self.assertNotEqual(tcp.sport, 2345)
4838 self.assert_packet_checksums_valid(p)
4841 self.logger.error(ppp("Unexpected or invalid packet:", p))
4844 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4845 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4846 TCP(sport=22, dport=port))
4847 self.pg1.add_stream(p)
4848 self.pg_enable_capture(self.pg_interfaces)
4850 capture = self.pg5.get_capture(1)
4855 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4856 self.assertEqual(tcp.dport, 2345)
4857 self.assert_packet_checksums_valid(p)
4859 self.logger.error(ppp("Unexpected or invalid packet:", p))
4862 # from client VRF1 to service VRF0
4863 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4864 IP(src=self.pg6.remote_ip4, dst=self.pg0.local_ip4) /
4865 TCP(sport=12346, dport=external_port))
4866 self.pg6.add_stream(p)
4867 self.pg_enable_capture(self.pg_interfaces)
4869 capture = self.pg0.get_capture(1)
4874 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4875 self.assertEqual(tcp.dport, local_port)
4876 self.assert_packet_checksums_valid(p)
4878 self.logger.error(ppp("Unexpected or invalid packet:", p))
4881 # from service VRF0 back to client VRF1
4882 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4883 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
4884 TCP(sport=local_port, dport=12346))
4885 self.pg0.add_stream(p)
4886 self.pg_enable_capture(self.pg_interfaces)
4888 capture = self.pg6.get_capture(1)
4893 self.assertEqual(ip.src, self.pg0.local_ip4)
4894 self.assertEqual(tcp.sport, external_port)
4895 self.assert_packet_checksums_valid(p)
4897 self.logger.error(ppp("Unexpected or invalid packet:", p))
4900 # from client VRF0 to service VRF1
4901 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4902 IP(src=self.pg0.remote_ip4, dst=external_addr) /
4903 TCP(sport=12347, dport=external_port))
4904 self.pg0.add_stream(p)
4905 self.pg_enable_capture(self.pg_interfaces)
4907 capture = self.pg5.get_capture(1)
4912 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4913 self.assertEqual(tcp.dport, local_port)
4914 self.assert_packet_checksums_valid(p)
4916 self.logger.error(ppp("Unexpected or invalid packet:", p))
4919 # from service VRF1 back to client VRF0
4920 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4921 IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) /
4922 TCP(sport=local_port, dport=12347))
4923 self.pg5.add_stream(p)
4924 self.pg_enable_capture(self.pg_interfaces)
4926 capture = self.pg0.get_capture(1)
4931 self.assertEqual(ip.src, external_addr)
4932 self.assertEqual(tcp.sport, external_port)
4933 self.assert_packet_checksums_valid(p)
4935 self.logger.error(ppp("Unexpected or invalid packet:", p))
4938 # from client to server (both VRF1, no translation)
4939 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4940 IP(src=self.pg6.remote_ip4, dst=self.pg5.remote_ip4) /
4941 TCP(sport=12348, dport=local_port))
4942 self.pg6.add_stream(p)
4943 self.pg_enable_capture(self.pg_interfaces)
4945 capture = self.pg5.get_capture(1)
4950 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4951 self.assertEqual(tcp.dport, local_port)
4952 self.assert_packet_checksums_valid(p)
4954 self.logger.error(ppp("Unexpected or invalid packet:", p))
4957 # from server back to client (both VRF1, no translation)
4958 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4959 IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) /
4960 TCP(sport=local_port, dport=12348))
4961 self.pg5.add_stream(p)
4962 self.pg_enable_capture(self.pg_interfaces)
4964 capture = self.pg6.get_capture(1)
4969 self.assertEqual(ip.src, self.pg5.remote_ip4)
4970 self.assertEqual(tcp.sport, local_port)
4971 self.assert_packet_checksums_valid(p)
4973 self.logger.error(ppp("Unexpected or invalid packet:", p))
4976 # from client VRF1 to server VRF0 (no translation)
4977 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4978 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
4979 TCP(sport=local_port, dport=12349))
4980 self.pg0.add_stream(p)
4981 self.pg_enable_capture(self.pg_interfaces)
4983 capture = self.pg6.get_capture(1)
4988 self.assertEqual(ip.src, self.pg0.remote_ip4)
4989 self.assertEqual(tcp.sport, local_port)
4990 self.assert_packet_checksums_valid(p)
4992 self.logger.error(ppp("Unexpected or invalid packet:", p))
4995 # from server VRF0 back to client VRF1 (no translation)
4996 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4997 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
4998 TCP(sport=local_port, dport=12349))
4999 self.pg0.add_stream(p)
5000 self.pg_enable_capture(self.pg_interfaces)
5002 capture = self.pg6.get_capture(1)
5007 self.assertEqual(ip.src, self.pg0.remote_ip4)
5008 self.assertEqual(tcp.sport, local_port)
5009 self.assert_packet_checksums_valid(p)
5011 self.logger.error(ppp("Unexpected or invalid packet:", p))
5014 # from client VRF0 to server VRF1 (no translation)
5015 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5016 IP(src=self.pg0.remote_ip4, dst=self.pg5.remote_ip4) /
5017 TCP(sport=12344, dport=local_port))
5018 self.pg0.add_stream(p)
5019 self.pg_enable_capture(self.pg_interfaces)
5021 capture = self.pg5.get_capture(1)
5026 self.assertEqual(ip.dst, self.pg5.remote_ip4)
5027 self.assertEqual(tcp.dport, local_port)
5028 self.assert_packet_checksums_valid(p)
5030 self.logger.error(ppp("Unexpected or invalid packet:", p))
5033 # from server VRF1 back to client VRF0 (no translation)
5034 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
5035 IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) /
5036 TCP(sport=local_port, dport=12344))
5037 self.pg5.add_stream(p)
5038 self.pg_enable_capture(self.pg_interfaces)
5040 capture = self.pg0.get_capture(1)
5045 self.assertEqual(ip.src, self.pg5.remote_ip4)
5046 self.assertEqual(tcp.sport, local_port)
5047 self.assert_packet_checksums_valid(p)
5049 self.logger.error(ppp("Unexpected or invalid packet:", p))
5052 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5053 def test_session_timeout(self):
5054 """ NAT44 session timeouts """
5055 self.nat44_add_address(self.nat_addr)
5056 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5057 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5059 self.vapi.nat_set_timeouts(icmp=5)
5063 for i in range(0, max_sessions):
5064 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
5065 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5066 IP(src=src, dst=self.pg1.remote_ip4) /
5067 ICMP(id=1025, type='echo-request'))
5069 self.pg0.add_stream(pkts)
5070 self.pg_enable_capture(self.pg_interfaces)
5072 self.pg1.get_capture(max_sessions)
5077 for i in range(0, max_sessions):
5078 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
5079 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5080 IP(src=src, dst=self.pg1.remote_ip4) /
5081 ICMP(id=1026, type='echo-request'))
5083 self.pg0.add_stream(pkts)
5084 self.pg_enable_capture(self.pg_interfaces)
5086 self.pg1.get_capture(max_sessions)
5089 users = self.vapi.nat44_user_dump()
5091 nsessions = nsessions + user.nsessions
5092 self.assertLess(nsessions, 2 * max_sessions)
5094 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5095 def test_session_limit_per_user(self):
5096 """ Maximum sessions per user limit """
5097 self.nat44_add_address(self.nat_addr)
5098 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5099 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5101 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
5102 src_address=self.pg2.local_ip4n,
5104 template_interval=10)
5106 # get maximum number of translations per user
5107 nat44_config = self.vapi.nat_show_config()
5110 for port in range(0, nat44_config.max_translations_per_user):
5111 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5112 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5113 UDP(sport=1025 + port, dport=1025 + port))
5116 self.pg0.add_stream(pkts)
5117 self.pg_enable_capture(self.pg_interfaces)
5119 capture = self.pg1.get_capture(len(pkts))
5121 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5122 src_port=self.ipfix_src_port)
5124 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5125 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5126 UDP(sport=3001, dport=3002))
5127 self.pg0.add_stream(p)
5128 self.pg_enable_capture(self.pg_interfaces)
5130 capture = self.pg1.assert_nothing_captured()
5132 # verify IPFIX logging
5133 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5135 capture = self.pg2.get_capture(10)
5136 ipfix = IPFIXDecoder()
5137 # first load template
5139 self.assertTrue(p.haslayer(IPFIX))
5140 if p.haslayer(Template):
5141 ipfix.add_template(p.getlayer(Template))
5142 # verify events in data set
5144 if p.haslayer(Data):
5145 data = ipfix.decode_data_set(p.getlayer(Set))
5146 self.verify_ipfix_max_entries_per_user(
5148 nat44_config.max_translations_per_user,
5149 self.pg0.remote_ip4n)
5152 super(TestNAT44EndpointDependent, self).tearDown()
5153 if not self.vpp_dead:
5154 self.logger.info(self.vapi.cli("show nat44 addresses"))
5155 self.logger.info(self.vapi.cli("show nat44 interfaces"))
5156 self.logger.info(self.vapi.cli("show nat44 static mappings"))
5157 self.logger.info(self.vapi.cli("show nat44 interface address"))
5158 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
5159 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
5160 self.logger.info(self.vapi.cli("show nat timeouts"))
5162 self.vapi.cli("clear logging")
5165 class TestNAT44Out2InDPO(MethodHolder):
5166 """ NAT44 Test Cases using out2in DPO """
5169 def setUpConstants(cls):
5170 super(TestNAT44Out2InDPO, cls).setUpConstants()
5171 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
5174 def setUpClass(cls):
5175 super(TestNAT44Out2InDPO, cls).setUpClass()
5176 cls.vapi.cli("set log class nat level debug")
5179 cls.tcp_port_in = 6303
5180 cls.tcp_port_out = 6303
5181 cls.udp_port_in = 6304
5182 cls.udp_port_out = 6304
5183 cls.icmp_id_in = 6305
5184 cls.icmp_id_out = 6305
5185 cls.nat_addr = '10.0.0.3'
5186 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5187 cls.dst_ip4 = '192.168.70.1'
5189 cls.create_pg_interfaces(range(2))
5192 cls.pg0.config_ip4()
5193 cls.pg0.resolve_arp()
5196 cls.pg1.config_ip6()
5197 cls.pg1.resolve_ndp()
5199 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
5200 dst_address_length=0,
5201 next_hop_address=cls.pg1.remote_ip6n,
5202 next_hop_sw_if_index=cls.pg1.sw_if_index)
5205 super(TestNAT44Out2InDPO, cls).tearDownClass()
5208 def configure_xlat(self):
5209 self.dst_ip6_pfx = '1:2:3::'
5210 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
5212 self.dst_ip6_pfx_len = 96
5213 self.src_ip6_pfx = '4:5:6::'
5214 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
5216 self.src_ip6_pfx_len = 96
5217 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
5218 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
5219 '\x00\x00\x00\x00', 0, is_translation=1,
5222 def test_464xlat_ce(self):
5223 """ Test 464XLAT CE with NAT44 """
5225 nat_config = self.vapi.nat_show_config()
5226 self.assertEqual(1, nat_config.out2in_dpo)
5228 self.configure_xlat()
5230 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5231 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
5233 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
5234 self.dst_ip6_pfx_len)
5235 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
5236 self.src_ip6_pfx_len)
5239 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
5240 self.pg0.add_stream(pkts)
5241 self.pg_enable_capture(self.pg_interfaces)
5243 capture = self.pg1.get_capture(len(pkts))
5244 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
5247 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
5249 self.pg1.add_stream(pkts)
5250 self.pg_enable_capture(self.pg_interfaces)
5252 capture = self.pg0.get_capture(len(pkts))
5253 self.verify_capture_in(capture, self.pg0)
5255 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
5257 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
5258 self.nat_addr_n, is_add=0)
5260 def test_464xlat_ce_no_nat(self):
5261 """ Test 464XLAT CE without NAT44 """
5263 self.configure_xlat()
5265 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
5266 self.dst_ip6_pfx_len)
5267 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
5268 self.src_ip6_pfx_len)
5270 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
5271 self.pg0.add_stream(pkts)
5272 self.pg_enable_capture(self.pg_interfaces)
5274 capture = self.pg1.get_capture(len(pkts))
5275 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
5276 nat_ip=out_dst_ip6, same_port=True)
5278 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
5279 self.pg1.add_stream(pkts)
5280 self.pg_enable_capture(self.pg_interfaces)
5282 capture = self.pg0.get_capture(len(pkts))
5283 self.verify_capture_in(capture, self.pg0)
5286 class TestDeterministicNAT(MethodHolder):
5287 """ Deterministic NAT Test Cases """
5290 def setUpConstants(cls):
5291 super(TestDeterministicNAT, cls).setUpConstants()
5292 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
5295 def setUpClass(cls):
5296 super(TestDeterministicNAT, cls).setUpClass()
5297 cls.vapi.cli("set log class nat level debug")
5300 cls.tcp_port_in = 6303
5301 cls.tcp_external_port = 6303
5302 cls.udp_port_in = 6304
5303 cls.udp_external_port = 6304
5304 cls.icmp_id_in = 6305
5305 cls.nat_addr = '10.0.0.3'
5307 cls.create_pg_interfaces(range(3))
5308 cls.interfaces = list(cls.pg_interfaces)
5310 for i in cls.interfaces:
5315 cls.pg0.generate_remote_hosts(2)
5316 cls.pg0.configure_ipv4_neighbors()
5319 super(TestDeterministicNAT, cls).tearDownClass()
5322 def create_stream_in(self, in_if, out_if, ttl=64):
5324 Create packet stream for inside network
5326 :param in_if: Inside interface
5327 :param out_if: Outside interface
5328 :param ttl: TTL of generated packets
5332 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5333 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5334 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
5338 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5339 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5340 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
5344 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5345 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5346 ICMP(id=self.icmp_id_in, type='echo-request'))
5351 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
5353 Create packet stream for outside network
5355 :param out_if: Outside interface
5356 :param dst_ip: Destination IP address (Default use global NAT address)
5357 :param ttl: TTL of generated packets
5360 dst_ip = self.nat_addr
5363 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5364 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5365 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
5369 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5370 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5371 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
5375 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5376 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5377 ICMP(id=self.icmp_external_id, type='echo-reply'))
5382 def verify_capture_out(self, capture, nat_ip=None):
5384 Verify captured packets on outside network
5386 :param capture: Captured packets
5387 :param nat_ip: Translated IP address (Default use global NAT address)
5388 :param same_port: Sorce port number is not translated (Default False)
5391 nat_ip = self.nat_addr
5392 for packet in capture:
5394 self.assertEqual(packet[IP].src, nat_ip)
5395 if packet.haslayer(TCP):
5396 self.tcp_port_out = packet[TCP].sport
5397 elif packet.haslayer(UDP):
5398 self.udp_port_out = packet[UDP].sport
5400 self.icmp_external_id = packet[ICMP].id
5402 self.logger.error(ppp("Unexpected or invalid packet "
5403 "(outside network):", packet))
5406 def test_deterministic_mode(self):
5407 """ NAT plugin run deterministic mode """
5408 in_addr = '172.16.255.0'
5409 out_addr = '172.17.255.50'
5410 in_addr_t = '172.16.255.20'
5411 in_addr_n = socket.inet_aton(in_addr)
5412 out_addr_n = socket.inet_aton(out_addr)
5413 in_addr_t_n = socket.inet_aton(in_addr_t)
5417 nat_config = self.vapi.nat_show_config()
5418 self.assertEqual(1, nat_config.deterministic)
5420 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
5422 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
5423 self.assertEqual(rep1.out_addr[:4], out_addr_n)
5424 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
5425 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
5427 deterministic_mappings = self.vapi.nat_det_map_dump()
5428 self.assertEqual(len(deterministic_mappings), 1)
5429 dsm = deterministic_mappings[0]
5430 self.assertEqual(in_addr_n, dsm.in_addr[:4])
5431 self.assertEqual(in_plen, dsm.in_plen)
5432 self.assertEqual(out_addr_n, dsm.out_addr[:4])
5433 self.assertEqual(out_plen, dsm.out_plen)
5435 self.clear_nat_det()
5436 deterministic_mappings = self.vapi.nat_det_map_dump()
5437 self.assertEqual(len(deterministic_mappings), 0)
5439 def test_set_timeouts(self):
5440 """ Set deterministic NAT timeouts """
5441 timeouts_before = self.vapi.nat_get_timeouts()
5443 self.vapi.nat_set_timeouts(timeouts_before.udp + 10,
5444 timeouts_before.tcp_established + 10,
5445 timeouts_before.tcp_transitory + 10,
5446 timeouts_before.icmp + 10)
5448 timeouts_after = self.vapi.nat_get_timeouts()
5450 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
5451 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
5452 self.assertNotEqual(timeouts_before.tcp_established,
5453 timeouts_after.tcp_established)
5454 self.assertNotEqual(timeouts_before.tcp_transitory,
5455 timeouts_after.tcp_transitory)
5457 def test_det_in(self):
5458 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
5460 nat_ip = "10.0.0.10"
5462 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5464 socket.inet_aton(nat_ip),
5466 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5467 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5471 pkts = self.create_stream_in(self.pg0, self.pg1)
5472 self.pg0.add_stream(pkts)
5473 self.pg_enable_capture(self.pg_interfaces)
5475 capture = self.pg1.get_capture(len(pkts))
5476 self.verify_capture_out(capture, nat_ip)
5479 pkts = self.create_stream_out(self.pg1, nat_ip)
5480 self.pg1.add_stream(pkts)
5481 self.pg_enable_capture(self.pg_interfaces)
5483 capture = self.pg0.get_capture(len(pkts))
5484 self.verify_capture_in(capture, self.pg0)
5487 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
5488 self.assertEqual(len(sessions), 3)
5492 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5493 self.assertEqual(s.in_port, self.tcp_port_in)
5494 self.assertEqual(s.out_port, self.tcp_port_out)
5495 self.assertEqual(s.ext_port, self.tcp_external_port)
5499 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5500 self.assertEqual(s.in_port, self.udp_port_in)
5501 self.assertEqual(s.out_port, self.udp_port_out)
5502 self.assertEqual(s.ext_port, self.udp_external_port)
5506 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5507 self.assertEqual(s.in_port, self.icmp_id_in)
5508 self.assertEqual(s.out_port, self.icmp_external_id)
5510 def test_multiple_users(self):
5511 """ Deterministic NAT multiple users """
5513 nat_ip = "10.0.0.10"
5515 external_port = 6303
5517 host0 = self.pg0.remote_hosts[0]
5518 host1 = self.pg0.remote_hosts[1]
5520 self.vapi.nat_det_add_del_map(host0.ip4n,
5522 socket.inet_aton(nat_ip),
5524 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5525 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5529 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
5530 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
5531 TCP(sport=port_in, dport=external_port))
5532 self.pg0.add_stream(p)
5533 self.pg_enable_capture(self.pg_interfaces)
5535 capture = self.pg1.get_capture(1)
5540 self.assertEqual(ip.src, nat_ip)
5541 self.assertEqual(ip.dst, self.pg1.remote_ip4)
5542 self.assertEqual(tcp.dport, external_port)
5543 port_out0 = tcp.sport
5545 self.logger.error(ppp("Unexpected or invalid packet:", p))
5549 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
5550 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
5551 TCP(sport=port_in, dport=external_port))
5552 self.pg0.add_stream(p)
5553 self.pg_enable_capture(self.pg_interfaces)
5555 capture = self.pg1.get_capture(1)
5560 self.assertEqual(ip.src, nat_ip)
5561 self.assertEqual(ip.dst, self.pg1.remote_ip4)
5562 self.assertEqual(tcp.dport, external_port)
5563 port_out1 = tcp.sport
5565 self.logger.error(ppp("Unexpected or invalid packet:", p))
5568 dms = self.vapi.nat_det_map_dump()
5569 self.assertEqual(1, len(dms))
5570 self.assertEqual(2, dms[0].ses_num)
5573 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5574 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
5575 TCP(sport=external_port, dport=port_out0))
5576 self.pg1.add_stream(p)
5577 self.pg_enable_capture(self.pg_interfaces)
5579 capture = self.pg0.get_capture(1)
5584 self.assertEqual(ip.src, self.pg1.remote_ip4)
5585 self.assertEqual(ip.dst, host0.ip4)
5586 self.assertEqual(tcp.dport, port_in)
5587 self.assertEqual(tcp.sport, external_port)
5589 self.logger.error(ppp("Unexpected or invalid packet:", p))
5593 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5594 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
5595 TCP(sport=external_port, dport=port_out1))
5596 self.pg1.add_stream(p)
5597 self.pg_enable_capture(self.pg_interfaces)
5599 capture = self.pg0.get_capture(1)
5604 self.assertEqual(ip.src, self.pg1.remote_ip4)
5605 self.assertEqual(ip.dst, host1.ip4)
5606 self.assertEqual(tcp.dport, port_in)
5607 self.assertEqual(tcp.sport, external_port)
5609 self.logger.error(ppp("Unexpected or invalid packet", p))
5612 # session close api test
5613 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
5615 self.pg1.remote_ip4n,
5617 dms = self.vapi.nat_det_map_dump()
5618 self.assertEqual(dms[0].ses_num, 1)
5620 self.vapi.nat_det_close_session_in(host0.ip4n,
5622 self.pg1.remote_ip4n,
5624 dms = self.vapi.nat_det_map_dump()
5625 self.assertEqual(dms[0].ses_num, 0)
5627 def test_tcp_session_close_detection_in(self):
5628 """ Deterministic NAT TCP session close from inside network """
5629 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5631 socket.inet_aton(self.nat_addr),
5633 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5634 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5637 self.initiate_tcp_session(self.pg0, self.pg1)
5639 # close the session from inside
5641 # FIN packet in -> out
5642 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5643 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5644 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5646 self.pg0.add_stream(p)
5647 self.pg_enable_capture(self.pg_interfaces)
5649 self.pg1.get_capture(1)
5653 # ACK packet out -> in
5654 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5655 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5656 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5660 # FIN packet out -> in
5661 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5662 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5663 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5667 self.pg1.add_stream(pkts)
5668 self.pg_enable_capture(self.pg_interfaces)
5670 self.pg0.get_capture(2)
5672 # ACK packet in -> out
5673 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5674 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5675 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5677 self.pg0.add_stream(p)
5678 self.pg_enable_capture(self.pg_interfaces)
5680 self.pg1.get_capture(1)
5682 # Check if deterministic NAT44 closed the session
5683 dms = self.vapi.nat_det_map_dump()
5684 self.assertEqual(0, dms[0].ses_num)
5686 self.logger.error("TCP session termination failed")
5689 def test_tcp_session_close_detection_out(self):
5690 """ Deterministic NAT TCP session close from outside network """
5691 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5693 socket.inet_aton(self.nat_addr),
5695 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5696 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5699 self.initiate_tcp_session(self.pg0, self.pg1)
5701 # close the session from outside
5703 # FIN packet out -> in
5704 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5705 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5706 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5708 self.pg1.add_stream(p)
5709 self.pg_enable_capture(self.pg_interfaces)
5711 self.pg0.get_capture(1)
5715 # ACK packet in -> out
5716 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5717 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5718 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5722 # ACK packet in -> out
5723 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5724 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5725 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5729 self.pg0.add_stream(pkts)
5730 self.pg_enable_capture(self.pg_interfaces)
5732 self.pg1.get_capture(2)
5734 # ACK packet out -> in
5735 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5736 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5737 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5739 self.pg1.add_stream(p)
5740 self.pg_enable_capture(self.pg_interfaces)
5742 self.pg0.get_capture(1)
5744 # Check if deterministic NAT44 closed the session
5745 dms = self.vapi.nat_det_map_dump()
5746 self.assertEqual(0, dms[0].ses_num)
5748 self.logger.error("TCP session termination failed")
5751 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5752 def test_session_timeout(self):
5753 """ Deterministic NAT session timeouts """
5754 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5756 socket.inet_aton(self.nat_addr),
5758 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5759 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5762 self.initiate_tcp_session(self.pg0, self.pg1)
5763 self.vapi.nat_set_timeouts(5, 5, 5, 5)
5764 pkts = self.create_stream_in(self.pg0, self.pg1)
5765 self.pg0.add_stream(pkts)
5766 self.pg_enable_capture(self.pg_interfaces)
5768 capture = self.pg1.get_capture(len(pkts))
5771 dms = self.vapi.nat_det_map_dump()
5772 self.assertEqual(0, dms[0].ses_num)
5774 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5775 def test_session_limit_per_user(self):
5776 """ Deterministic NAT maximum sessions per user limit """
5777 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5779 socket.inet_aton(self.nat_addr),
5781 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5782 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5784 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
5785 src_address=self.pg2.local_ip4n,
5787 template_interval=10)
5788 self.vapi.nat_ipfix()
5791 for port in range(1025, 2025):
5792 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5793 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5794 UDP(sport=port, dport=port))
5797 self.pg0.add_stream(pkts)
5798 self.pg_enable_capture(self.pg_interfaces)
5800 capture = self.pg1.get_capture(len(pkts))
5802 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5803 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5804 UDP(sport=3001, dport=3002))
5805 self.pg0.add_stream(p)
5806 self.pg_enable_capture(self.pg_interfaces)
5808 capture = self.pg1.assert_nothing_captured()
5810 # verify ICMP error packet
5811 capture = self.pg0.get_capture(1)
5813 self.assertTrue(p.haslayer(ICMP))
5815 self.assertEqual(icmp.type, 3)
5816 self.assertEqual(icmp.code, 1)
5817 self.assertTrue(icmp.haslayer(IPerror))
5818 inner_ip = icmp[IPerror]
5819 self.assertEqual(inner_ip[UDPerror].sport, 3001)
5820 self.assertEqual(inner_ip[UDPerror].dport, 3002)
5822 dms = self.vapi.nat_det_map_dump()
5824 self.assertEqual(1000, dms[0].ses_num)
5826 # verify IPFIX logging
5827 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5829 capture = self.pg2.get_capture(2)
5830 ipfix = IPFIXDecoder()
5831 # first load template
5833 self.assertTrue(p.haslayer(IPFIX))
5834 if p.haslayer(Template):
5835 ipfix.add_template(p.getlayer(Template))
5836 # verify events in data set
5838 if p.haslayer(Data):
5839 data = ipfix.decode_data_set(p.getlayer(Set))
5840 self.verify_ipfix_max_entries_per_user(data,
5842 self.pg0.remote_ip4n)
5844 def clear_nat_det(self):
5846 Clear deterministic NAT configuration.
5848 self.vapi.nat_ipfix(enable=0)
5849 self.vapi.nat_set_timeouts()
5850 deterministic_mappings = self.vapi.nat_det_map_dump()
5851 for dsm in deterministic_mappings:
5852 self.vapi.nat_det_add_del_map(dsm.in_addr,
5858 interfaces = self.vapi.nat44_interface_dump()
5859 for intf in interfaces:
5860 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
5865 super(TestDeterministicNAT, self).tearDown()
5866 if not self.vpp_dead:
5867 self.logger.info(self.vapi.cli("show nat44 interfaces"))
5868 self.logger.info(self.vapi.cli("show nat timeouts"))
5870 self.vapi.cli("show nat44 deterministic mappings"))
5872 self.vapi.cli("show nat44 deterministic sessions"))
5873 self.clear_nat_det()
5876 class TestNAT64(MethodHolder):
5877 """ NAT64 Test Cases """
5880 def setUpConstants(cls):
5881 super(TestNAT64, cls).setUpConstants()
5882 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
5883 "nat64 st hash buckets 256", "}"])
5886 def setUpClass(cls):
5887 super(TestNAT64, cls).setUpClass()
5890 cls.tcp_port_in = 6303
5891 cls.tcp_port_out = 6303
5892 cls.udp_port_in = 6304
5893 cls.udp_port_out = 6304
5894 cls.icmp_id_in = 6305
5895 cls.icmp_id_out = 6305
5896 cls.nat_addr = '10.0.0.3'
5897 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5899 cls.vrf1_nat_addr = '10.0.10.3'
5900 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
5902 cls.ipfix_src_port = 4739
5903 cls.ipfix_domain_id = 1
5905 cls.create_pg_interfaces(range(6))
5906 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
5907 cls.ip6_interfaces.append(cls.pg_interfaces[2])
5908 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
5910 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
5912 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
5914 cls.pg0.generate_remote_hosts(2)
5916 for i in cls.ip6_interfaces:
5919 i.configure_ipv6_neighbors()
5921 for i in cls.ip4_interfaces:
5927 cls.pg3.config_ip4()
5928 cls.pg3.resolve_arp()
5929 cls.pg3.config_ip6()
5930 cls.pg3.configure_ipv6_neighbors()
5933 cls.pg5.config_ip6()
5936 super(TestNAT64, cls).tearDownClass()
5939 def test_nat64_inside_interface_handles_neighbor_advertisement(self):
5940 """ NAT64 inside interface handles Neighbor Advertisement """
5942 self.vapi.nat64_add_del_interface(self.pg5.sw_if_index)
5945 ping = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
5946 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
5947 ICMPv6EchoRequest())
5949 self.pg5.add_stream(pkts)
5950 self.pg_enable_capture(self.pg_interfaces)
5953 # Wait for Neighbor Solicitation
5954 capture = self.pg5.get_capture(len(pkts))
5957 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
5958 self.assertTrue(packet.haslayer(ICMPv6ND_NS))
5959 tgt = packet[ICMPv6ND_NS].tgt
5961 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5964 # Send Neighbor Advertisement
5965 p = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
5966 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
5967 ICMPv6ND_NA(tgt=tgt) /
5968 ICMPv6NDOptDstLLAddr(lladdr=self.pg5.remote_mac))
5970 self.pg5.add_stream(pkts)
5971 self.pg_enable_capture(self.pg_interfaces)
5974 # Try to send ping again
5976 self.pg5.add_stream(pkts)
5977 self.pg_enable_capture(self.pg_interfaces)
5980 # Wait for ping reply
5981 capture = self.pg5.get_capture(len(pkts))
5984 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
5985 self.assertEqual(packet[IPv6].dst, self.pg5.remote_ip6)
5986 self.assertTrue(packet.haslayer(ICMPv6EchoReply))
5988 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5991 def test_pool(self):
5992 """ Add/delete address to NAT64 pool """
5993 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
5995 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
5997 addresses = self.vapi.nat64_pool_addr_dump()
5998 self.assertEqual(len(addresses), 1)
5999 self.assertEqual(addresses[0].address, nat_addr)
6001 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
6003 addresses = self.vapi.nat64_pool_addr_dump()
6004 self.assertEqual(len(addresses), 0)
6006 def test_interface(self):
6007 """ Enable/disable NAT64 feature on the interface """
6008 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6009 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6011 interfaces = self.vapi.nat64_interface_dump()
6012 self.assertEqual(len(interfaces), 2)
6015 for intf in interfaces:
6016 if intf.sw_if_index == self.pg0.sw_if_index:
6017 self.assertEqual(intf.is_inside, 1)
6019 elif intf.sw_if_index == self.pg1.sw_if_index:
6020 self.assertEqual(intf.is_inside, 0)
6022 self.assertTrue(pg0_found)
6023 self.assertTrue(pg1_found)
6025 features = self.vapi.cli("show interface features pg0")
6026 self.assertNotEqual(features.find('nat64-in2out'), -1)
6027 features = self.vapi.cli("show interface features pg1")
6028 self.assertNotEqual(features.find('nat64-out2in'), -1)
6030 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
6031 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
6033 interfaces = self.vapi.nat64_interface_dump()
6034 self.assertEqual(len(interfaces), 0)
6036 def test_static_bib(self):
6037 """ Add/delete static BIB entry """
6038 in_addr = socket.inet_pton(socket.AF_INET6,
6039 '2001:db8:85a3::8a2e:370:7334')
6040 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
6043 proto = IP_PROTOS.tcp
6045 self.vapi.nat64_add_del_static_bib(in_addr,
6050 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
6055 self.assertEqual(bibe.i_addr, in_addr)
6056 self.assertEqual(bibe.o_addr, out_addr)
6057 self.assertEqual(bibe.i_port, in_port)
6058 self.assertEqual(bibe.o_port, out_port)
6059 self.assertEqual(static_bib_num, 1)
6061 self.vapi.nat64_add_del_static_bib(in_addr,
6067 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
6072 self.assertEqual(static_bib_num, 0)
6074 def test_set_timeouts(self):
6075 """ Set NAT64 timeouts """
6076 # verify default values
6077 timeouts = self.vapi.nat_get_timeouts()
6078 self.assertEqual(timeouts.udp, 300)
6079 self.assertEqual(timeouts.icmp, 60)
6080 self.assertEqual(timeouts.tcp_transitory, 240)
6081 self.assertEqual(timeouts.tcp_established, 7440)
6083 # set and verify custom values
6084 self.vapi.nat_set_timeouts(udp=200, icmp=30, tcp_transitory=250,
6085 tcp_established=7450)
6086 timeouts = self.vapi.nat_get_timeouts()
6087 self.assertEqual(timeouts.udp, 200)
6088 self.assertEqual(timeouts.icmp, 30)
6089 self.assertEqual(timeouts.tcp_transitory, 250)
6090 self.assertEqual(timeouts.tcp_established, 7450)
6092 def test_dynamic(self):
6093 """ NAT64 dynamic translation test """
6094 self.tcp_port_in = 6303
6095 self.udp_port_in = 6304
6096 self.icmp_id_in = 6305
6098 ses_num_start = self.nat64_get_ses_num()
6100 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6102 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6103 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6106 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6107 self.pg0.add_stream(pkts)
6108 self.pg_enable_capture(self.pg_interfaces)
6110 capture = self.pg1.get_capture(len(pkts))
6111 self.verify_capture_out(capture, nat_ip=self.nat_addr,
6112 dst_ip=self.pg1.remote_ip4)
6115 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6116 self.pg1.add_stream(pkts)
6117 self.pg_enable_capture(self.pg_interfaces)
6119 capture = self.pg0.get_capture(len(pkts))
6120 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
6121 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
6124 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6125 self.pg0.add_stream(pkts)
6126 self.pg_enable_capture(self.pg_interfaces)
6128 capture = self.pg1.get_capture(len(pkts))
6129 self.verify_capture_out(capture, nat_ip=self.nat_addr,
6130 dst_ip=self.pg1.remote_ip4)
6133 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6134 self.pg1.add_stream(pkts)
6135 self.pg_enable_capture(self.pg_interfaces)
6137 capture = self.pg0.get_capture(len(pkts))
6138 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
6140 ses_num_end = self.nat64_get_ses_num()
6142 self.assertEqual(ses_num_end - ses_num_start, 3)
6144 # tenant with specific VRF
6145 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
6146 self.vrf1_nat_addr_n,
6147 vrf_id=self.vrf1_id)
6148 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
6150 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
6151 self.pg2.add_stream(pkts)
6152 self.pg_enable_capture(self.pg_interfaces)
6154 capture = self.pg1.get_capture(len(pkts))
6155 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
6156 dst_ip=self.pg1.remote_ip4)
6158 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
6159 self.pg1.add_stream(pkts)
6160 self.pg_enable_capture(self.pg_interfaces)
6162 capture = self.pg2.get_capture(len(pkts))
6163 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
6165 def test_static(self):
6166 """ NAT64 static translation test """
6167 self.tcp_port_in = 60303
6168 self.udp_port_in = 60304
6169 self.icmp_id_in = 60305
6170 self.tcp_port_out = 60303
6171 self.udp_port_out = 60304
6172 self.icmp_id_out = 60305
6174 ses_num_start = self.nat64_get_ses_num()
6176 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6178 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6179 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6181 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
6186 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
6191 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
6198 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6199 self.pg0.add_stream(pkts)
6200 self.pg_enable_capture(self.pg_interfaces)
6202 capture = self.pg1.get_capture(len(pkts))
6203 self.verify_capture_out(capture, nat_ip=self.nat_addr,
6204 dst_ip=self.pg1.remote_ip4, same_port=True)
6207 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6208 self.pg1.add_stream(pkts)
6209 self.pg_enable_capture(self.pg_interfaces)
6211 capture = self.pg0.get_capture(len(pkts))
6212 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
6213 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
6215 ses_num_end = self.nat64_get_ses_num()
6217 self.assertEqual(ses_num_end - ses_num_start, 3)
6219 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
6220 def test_session_timeout(self):
6221 """ NAT64 session timeout """
6222 self.icmp_id_in = 1234
6223 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6225 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6226 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6227 self.vapi.nat_set_timeouts(icmp=5, tcp_transitory=5, tcp_established=5)
6229 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6230 self.pg0.add_stream(pkts)
6231 self.pg_enable_capture(self.pg_interfaces)
6233 capture = self.pg1.get_capture(len(pkts))
6235 ses_num_before_timeout = self.nat64_get_ses_num()
6239 # ICMP and TCP session after timeout
6240 ses_num_after_timeout = self.nat64_get_ses_num()
6241 self.assertEqual(ses_num_before_timeout - ses_num_after_timeout, 2)
6243 def test_icmp_error(self):
6244 """ NAT64 ICMP Error message translation """
6245 self.tcp_port_in = 6303
6246 self.udp_port_in = 6304
6247 self.icmp_id_in = 6305
6249 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6251 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6252 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6254 # send some packets to create sessions
6255 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6256 self.pg0.add_stream(pkts)
6257 self.pg_enable_capture(self.pg_interfaces)
6259 capture_ip4 = self.pg1.get_capture(len(pkts))
6260 self.verify_capture_out(capture_ip4,
6261 nat_ip=self.nat_addr,
6262 dst_ip=self.pg1.remote_ip4)
6264 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6265 self.pg1.add_stream(pkts)
6266 self.pg_enable_capture(self.pg_interfaces)
6268 capture_ip6 = self.pg0.get_capture(len(pkts))
6269 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
6270 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
6271 self.pg0.remote_ip6)
6274 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6275 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
6276 ICMPv6DestUnreach(code=1) /
6277 packet[IPv6] for packet in capture_ip6]
6278 self.pg0.add_stream(pkts)
6279 self.pg_enable_capture(self.pg_interfaces)
6281 capture = self.pg1.get_capture(len(pkts))
6282 for packet in capture:
6284 self.assertEqual(packet[IP].src, self.nat_addr)
6285 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
6286 self.assertEqual(packet[ICMP].type, 3)
6287 self.assertEqual(packet[ICMP].code, 13)
6288 inner = packet[IPerror]
6289 self.assertEqual(inner.src, self.pg1.remote_ip4)
6290 self.assertEqual(inner.dst, self.nat_addr)
6291 self.assert_packet_checksums_valid(packet)
6292 if inner.haslayer(TCPerror):
6293 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
6294 elif inner.haslayer(UDPerror):
6295 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
6297 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
6299 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6303 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6304 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
6305 ICMP(type=3, code=13) /
6306 packet[IP] for packet in capture_ip4]
6307 self.pg1.add_stream(pkts)
6308 self.pg_enable_capture(self.pg_interfaces)
6310 capture = self.pg0.get_capture(len(pkts))
6311 for packet in capture:
6313 self.assertEqual(packet[IPv6].src, ip.src)
6314 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6315 icmp = packet[ICMPv6DestUnreach]
6316 self.assertEqual(icmp.code, 1)
6317 inner = icmp[IPerror6]
6318 self.assertEqual(inner.src, self.pg0.remote_ip6)
6319 self.assertEqual(inner.dst, ip.src)
6320 self.assert_icmpv6_checksum_valid(packet)
6321 if inner.haslayer(TCPerror):
6322 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
6323 elif inner.haslayer(UDPerror):
6324 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
6326 self.assertEqual(inner[ICMPv6EchoRequest].id,
6329 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6332 def test_hairpinning(self):
6333 """ NAT64 hairpinning """
6335 client = self.pg0.remote_hosts[0]
6336 server = self.pg0.remote_hosts[1]
6337 server_tcp_in_port = 22
6338 server_tcp_out_port = 4022
6339 server_udp_in_port = 23
6340 server_udp_out_port = 4023
6341 client_tcp_in_port = 1234
6342 client_udp_in_port = 1235
6343 client_tcp_out_port = 0
6344 client_udp_out_port = 0
6345 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
6346 nat_addr_ip6 = ip.src
6348 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6350 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6351 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6353 self.vapi.nat64_add_del_static_bib(server.ip6n,
6356 server_tcp_out_port,
6358 self.vapi.nat64_add_del_static_bib(server.ip6n,
6361 server_udp_out_port,
6366 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6367 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6368 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
6370 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6371 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6372 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
6374 self.pg0.add_stream(pkts)
6375 self.pg_enable_capture(self.pg_interfaces)
6377 capture = self.pg0.get_capture(len(pkts))
6378 for packet in capture:
6380 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6381 self.assertEqual(packet[IPv6].dst, server.ip6)
6382 self.assert_packet_checksums_valid(packet)
6383 if packet.haslayer(TCP):
6384 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
6385 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
6386 client_tcp_out_port = packet[TCP].sport
6388 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
6389 self.assertEqual(packet[UDP].dport, server_udp_in_port)
6390 client_udp_out_port = packet[UDP].sport
6392 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6397 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6398 IPv6(src=server.ip6, dst=nat_addr_ip6) /
6399 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
6401 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6402 IPv6(src=server.ip6, dst=nat_addr_ip6) /
6403 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
6405 self.pg0.add_stream(pkts)
6406 self.pg_enable_capture(self.pg_interfaces)
6408 capture = self.pg0.get_capture(len(pkts))
6409 for packet in capture:
6411 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6412 self.assertEqual(packet[IPv6].dst, client.ip6)
6413 self.assert_packet_checksums_valid(packet)
6414 if packet.haslayer(TCP):
6415 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
6416 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
6418 self.assertEqual(packet[UDP].sport, server_udp_out_port)
6419 self.assertEqual(packet[UDP].dport, client_udp_in_port)
6421 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6426 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6427 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6428 ICMPv6DestUnreach(code=1) /
6429 packet[IPv6] for packet in capture]
6430 self.pg0.add_stream(pkts)
6431 self.pg_enable_capture(self.pg_interfaces)
6433 capture = self.pg0.get_capture(len(pkts))
6434 for packet in capture:
6436 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6437 self.assertEqual(packet[IPv6].dst, server.ip6)
6438 icmp = packet[ICMPv6DestUnreach]
6439 self.assertEqual(icmp.code, 1)
6440 inner = icmp[IPerror6]
6441 self.assertEqual(inner.src, server.ip6)
6442 self.assertEqual(inner.dst, nat_addr_ip6)
6443 self.assert_packet_checksums_valid(packet)
6444 if inner.haslayer(TCPerror):
6445 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
6446 self.assertEqual(inner[TCPerror].dport,
6447 client_tcp_out_port)
6449 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
6450 self.assertEqual(inner[UDPerror].dport,
6451 client_udp_out_port)
6453 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6456 def test_prefix(self):
6457 """ NAT64 Network-Specific Prefix """
6459 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6461 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6462 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6463 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
6464 self.vrf1_nat_addr_n,
6465 vrf_id=self.vrf1_id)
6466 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
6469 global_pref64 = "2001:db8::"
6470 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
6471 global_pref64_len = 32
6472 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
6474 prefix = self.vapi.nat64_prefix_dump()
6475 self.assertEqual(len(prefix), 1)
6476 self.assertEqual(prefix[0].prefix, global_pref64_n)
6477 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
6478 self.assertEqual(prefix[0].vrf_id, 0)
6480 # Add tenant specific prefix
6481 vrf1_pref64 = "2001:db8:122:300::"
6482 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
6483 vrf1_pref64_len = 56
6484 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
6486 vrf_id=self.vrf1_id)
6487 prefix = self.vapi.nat64_prefix_dump()
6488 self.assertEqual(len(prefix), 2)
6491 pkts = self.create_stream_in_ip6(self.pg0,
6494 plen=global_pref64_len)
6495 self.pg0.add_stream(pkts)
6496 self.pg_enable_capture(self.pg_interfaces)
6498 capture = self.pg1.get_capture(len(pkts))
6499 self.verify_capture_out(capture, nat_ip=self.nat_addr,
6500 dst_ip=self.pg1.remote_ip4)
6502 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6503 self.pg1.add_stream(pkts)
6504 self.pg_enable_capture(self.pg_interfaces)
6506 capture = self.pg0.get_capture(len(pkts))
6507 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
6510 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
6512 # Tenant specific prefix
6513 pkts = self.create_stream_in_ip6(self.pg2,
6516 plen=vrf1_pref64_len)
6517 self.pg2.add_stream(pkts)
6518 self.pg_enable_capture(self.pg_interfaces)
6520 capture = self.pg1.get_capture(len(pkts))
6521 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
6522 dst_ip=self.pg1.remote_ip4)
6524 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
6525 self.pg1.add_stream(pkts)
6526 self.pg_enable_capture(self.pg_interfaces)
6528 capture = self.pg2.get_capture(len(pkts))
6529 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
6532 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
6534 def test_unknown_proto(self):
6535 """ NAT64 translate packet with unknown protocol """
6537 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6539 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6540 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6541 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6544 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6545 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
6546 TCP(sport=self.tcp_port_in, dport=20))
6547 self.pg0.add_stream(p)
6548 self.pg_enable_capture(self.pg_interfaces)
6550 p = self.pg1.get_capture(1)
6552 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6553 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
6555 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
6556 TCP(sport=1234, dport=1234))
6557 self.pg0.add_stream(p)
6558 self.pg_enable_capture(self.pg_interfaces)
6560 p = self.pg1.get_capture(1)
6563 self.assertEqual(packet[IP].src, self.nat_addr)
6564 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
6565 self.assertTrue(packet.haslayer(GRE))
6566 self.assert_packet_checksums_valid(packet)
6568 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6572 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6573 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
6575 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
6576 TCP(sport=1234, dport=1234))
6577 self.pg1.add_stream(p)
6578 self.pg_enable_capture(self.pg_interfaces)
6580 p = self.pg0.get_capture(1)
6583 self.assertEqual(packet[IPv6].src, remote_ip6)
6584 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6585 self.assertEqual(packet[IPv6].nh, 47)
6587 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6590 def test_hairpinning_unknown_proto(self):
6591 """ NAT64 translate packet with unknown protocol - hairpinning """
6593 client = self.pg0.remote_hosts[0]
6594 server = self.pg0.remote_hosts[1]
6595 server_tcp_in_port = 22
6596 server_tcp_out_port = 4022
6597 client_tcp_in_port = 1234
6598 client_tcp_out_port = 1235
6599 server_nat_ip = "10.0.0.100"
6600 client_nat_ip = "10.0.0.110"
6601 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
6602 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
6603 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
6604 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
6606 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
6608 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6609 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6611 self.vapi.nat64_add_del_static_bib(server.ip6n,
6614 server_tcp_out_port,
6617 self.vapi.nat64_add_del_static_bib(server.ip6n,
6623 self.vapi.nat64_add_del_static_bib(client.ip6n,
6626 client_tcp_out_port,
6630 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6631 IPv6(src=client.ip6, dst=server_nat_ip6) /
6632 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
6633 self.pg0.add_stream(p)
6634 self.pg_enable_capture(self.pg_interfaces)
6636 p = self.pg0.get_capture(1)
6638 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6639 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
6641 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
6642 TCP(sport=1234, dport=1234))
6643 self.pg0.add_stream(p)
6644 self.pg_enable_capture(self.pg_interfaces)
6646 p = self.pg0.get_capture(1)
6649 self.assertEqual(packet[IPv6].src, client_nat_ip6)
6650 self.assertEqual(packet[IPv6].dst, server.ip6)
6651 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
6653 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6657 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6658 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
6660 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
6661 TCP(sport=1234, dport=1234))
6662 self.pg0.add_stream(p)
6663 self.pg_enable_capture(self.pg_interfaces)
6665 p = self.pg0.get_capture(1)
6668 self.assertEqual(packet[IPv6].src, server_nat_ip6)
6669 self.assertEqual(packet[IPv6].dst, client.ip6)
6670 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
6672 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6675 def test_one_armed_nat64(self):
6676 """ One armed NAT64 """
6678 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
6682 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6684 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
6685 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
6688 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
6689 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
6690 TCP(sport=12345, dport=80))
6691 self.pg3.add_stream(p)
6692 self.pg_enable_capture(self.pg_interfaces)
6694 capture = self.pg3.get_capture(1)
6699 self.assertEqual(ip.src, self.nat_addr)
6700 self.assertEqual(ip.dst, self.pg3.remote_ip4)
6701 self.assertNotEqual(tcp.sport, 12345)
6702 external_port = tcp.sport
6703 self.assertEqual(tcp.dport, 80)
6704 self.assert_packet_checksums_valid(p)
6706 self.logger.error(ppp("Unexpected or invalid packet:", p))
6710 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
6711 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
6712 TCP(sport=80, dport=external_port))
6713 self.pg3.add_stream(p)
6714 self.pg_enable_capture(self.pg_interfaces)
6716 capture = self.pg3.get_capture(1)
6721 self.assertEqual(ip.src, remote_host_ip6)
6722 self.assertEqual(ip.dst, self.pg3.remote_ip6)
6723 self.assertEqual(tcp.sport, 80)
6724 self.assertEqual(tcp.dport, 12345)
6725 self.assert_packet_checksums_valid(p)
6727 self.logger.error(ppp("Unexpected or invalid packet:", p))
6730 def test_frag_in_order(self):
6731 """ NAT64 translate fragments arriving in order """
6732 self.tcp_port_in = random.randint(1025, 65535)
6734 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6736 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6737 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6739 reass = self.vapi.nat_reass_dump()
6740 reass_n_start = len(reass)
6744 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6745 self.tcp_port_in, 20, data)
6746 self.pg0.add_stream(pkts)
6747 self.pg_enable_capture(self.pg_interfaces)
6749 frags = self.pg1.get_capture(len(pkts))
6750 p = self.reass_frags_and_verify(frags,
6752 self.pg1.remote_ip4)
6753 self.assertEqual(p[TCP].dport, 20)
6754 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
6755 self.tcp_port_out = p[TCP].sport
6756 self.assertEqual(data, p[Raw].load)
6759 data = "A" * 4 + "b" * 16 + "C" * 3
6760 pkts = self.create_stream_frag(self.pg1,
6765 self.pg1.add_stream(pkts)
6766 self.pg_enable_capture(self.pg_interfaces)
6768 frags = self.pg0.get_capture(len(pkts))
6769 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6770 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
6771 self.assertEqual(p[TCP].sport, 20)
6772 self.assertEqual(p[TCP].dport, self.tcp_port_in)
6773 self.assertEqual(data, p[Raw].load)
6775 reass = self.vapi.nat_reass_dump()
6776 reass_n_end = len(reass)
6778 self.assertEqual(reass_n_end - reass_n_start, 2)
6780 def test_reass_hairpinning(self):
6781 """ NAT64 fragments hairpinning """
6783 server = self.pg0.remote_hosts[1]
6784 server_in_port = random.randint(1025, 65535)
6785 server_out_port = random.randint(1025, 65535)
6786 client_in_port = random.randint(1025, 65535)
6787 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
6788 nat_addr_ip6 = ip.src
6790 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6792 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6793 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6795 # add static BIB entry for server
6796 self.vapi.nat64_add_del_static_bib(server.ip6n,
6802 # send packet from host to server
6803 pkts = self.create_stream_frag_ip6(self.pg0,
6808 self.pg0.add_stream(pkts)
6809 self.pg_enable_capture(self.pg_interfaces)
6811 frags = self.pg0.get_capture(len(pkts))
6812 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
6813 self.assertNotEqual(p[TCP].sport, client_in_port)
6814 self.assertEqual(p[TCP].dport, server_in_port)
6815 self.assertEqual(data, p[Raw].load)
6817 def test_frag_out_of_order(self):
6818 """ NAT64 translate fragments arriving out of order """
6819 self.tcp_port_in = random.randint(1025, 65535)
6821 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6823 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6824 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6828 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6829 self.tcp_port_in, 20, data)
6831 self.pg0.add_stream(pkts)
6832 self.pg_enable_capture(self.pg_interfaces)
6834 frags = self.pg1.get_capture(len(pkts))
6835 p = self.reass_frags_and_verify(frags,
6837 self.pg1.remote_ip4)
6838 self.assertEqual(p[TCP].dport, 20)
6839 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
6840 self.tcp_port_out = p[TCP].sport
6841 self.assertEqual(data, p[Raw].load)
6844 data = "A" * 4 + "B" * 16 + "C" * 3
6845 pkts = self.create_stream_frag(self.pg1,
6851 self.pg1.add_stream(pkts)
6852 self.pg_enable_capture(self.pg_interfaces)
6854 frags = self.pg0.get_capture(len(pkts))
6855 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6856 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
6857 self.assertEqual(p[TCP].sport, 20)
6858 self.assertEqual(p[TCP].dport, self.tcp_port_in)
6859 self.assertEqual(data, p[Raw].load)
6861 def test_interface_addr(self):
6862 """ Acquire NAT64 pool addresses from interface """
6863 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
6865 # no address in NAT64 pool
6866 adresses = self.vapi.nat44_address_dump()
6867 self.assertEqual(0, len(adresses))
6869 # configure interface address and check NAT64 address pool
6870 self.pg4.config_ip4()
6871 addresses = self.vapi.nat64_pool_addr_dump()
6872 self.assertEqual(len(addresses), 1)
6873 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
6875 # remove interface address and check NAT64 address pool
6876 self.pg4.unconfig_ip4()
6877 addresses = self.vapi.nat64_pool_addr_dump()
6878 self.assertEqual(0, len(adresses))
6880 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
6881 def test_ipfix_max_bibs_sessions(self):
6882 """ IPFIX logging maximum session and BIB entries exceeded """
6885 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
6889 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6891 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6892 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6896 for i in range(0, max_bibs):
6897 src = "fd01:aa::%x" % (i)
6898 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6899 IPv6(src=src, dst=remote_host_ip6) /
6900 TCP(sport=12345, dport=80))
6902 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6903 IPv6(src=src, dst=remote_host_ip6) /
6904 TCP(sport=12345, dport=22))
6906 self.pg0.add_stream(pkts)
6907 self.pg_enable_capture(self.pg_interfaces)
6909 self.pg1.get_capture(max_sessions)
6911 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6912 src_address=self.pg3.local_ip4n,
6914 template_interval=10)
6915 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6916 src_port=self.ipfix_src_port)
6918 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6919 IPv6(src=src, dst=remote_host_ip6) /
6920 TCP(sport=12345, dport=25))
6921 self.pg0.add_stream(p)
6922 self.pg_enable_capture(self.pg_interfaces)
6924 self.pg1.assert_nothing_captured()
6926 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6927 capture = self.pg3.get_capture(9)
6928 ipfix = IPFIXDecoder()
6929 # first load template
6931 self.assertTrue(p.haslayer(IPFIX))
6932 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6933 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6934 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6935 self.assertEqual(p[UDP].dport, 4739)
6936 self.assertEqual(p[IPFIX].observationDomainID,
6937 self.ipfix_domain_id)
6938 if p.haslayer(Template):
6939 ipfix.add_template(p.getlayer(Template))
6940 # verify events in data set
6942 if p.haslayer(Data):
6943 data = ipfix.decode_data_set(p.getlayer(Set))
6944 self.verify_ipfix_max_sessions(data, max_sessions)
6946 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6947 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
6948 TCP(sport=12345, dport=80))
6949 self.pg0.add_stream(p)
6950 self.pg_enable_capture(self.pg_interfaces)
6952 self.pg1.assert_nothing_captured()
6954 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6955 capture = self.pg3.get_capture(1)
6956 # verify events in data set
6958 self.assertTrue(p.haslayer(IPFIX))
6959 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6960 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6961 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6962 self.assertEqual(p[UDP].dport, 4739)
6963 self.assertEqual(p[IPFIX].observationDomainID,
6964 self.ipfix_domain_id)
6965 if p.haslayer(Data):
6966 data = ipfix.decode_data_set(p.getlayer(Set))
6967 self.verify_ipfix_max_bibs(data, max_bibs)
6969 def test_ipfix_max_frags(self):
6970 """ IPFIX logging maximum fragments pending reassembly exceeded """
6971 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6973 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6974 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6975 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
6976 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6977 src_address=self.pg3.local_ip4n,
6979 template_interval=10)
6980 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6981 src_port=self.ipfix_src_port)
6984 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6985 self.tcp_port_in, 20, data)
6986 self.pg0.add_stream(pkts[-1])
6987 self.pg_enable_capture(self.pg_interfaces)
6989 self.pg1.assert_nothing_captured()
6991 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6992 capture = self.pg3.get_capture(9)
6993 ipfix = IPFIXDecoder()
6994 # first load template
6996 self.assertTrue(p.haslayer(IPFIX))
6997 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6998 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6999 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
7000 self.assertEqual(p[UDP].dport, 4739)
7001 self.assertEqual(p[IPFIX].observationDomainID,
7002 self.ipfix_domain_id)
7003 if p.haslayer(Template):
7004 ipfix.add_template(p.getlayer(Template))
7005 # verify events in data set
7007 if p.haslayer(Data):
7008 data = ipfix.decode_data_set(p.getlayer(Set))
7009 self.verify_ipfix_max_fragments_ip6(data, 0,
7010 self.pg0.remote_ip6n)
7012 def test_ipfix_bib_ses(self):
7013 """ IPFIX logging NAT64 BIB/session create and delete events """
7014 self.tcp_port_in = random.randint(1025, 65535)
7015 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
7019 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
7021 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
7022 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
7023 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
7024 src_address=self.pg3.local_ip4n,
7026 template_interval=10)
7027 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
7028 src_port=self.ipfix_src_port)
7031 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
7032 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
7033 TCP(sport=self.tcp_port_in, dport=25))
7034 self.pg0.add_stream(p)
7035 self.pg_enable_capture(self.pg_interfaces)
7037 p = self.pg1.get_capture(1)
7038 self.tcp_port_out = p[0][TCP].sport
7039 self.vapi.cli("ipfix flush") # FIXME this should be an API call
7040 capture = self.pg3.get_capture(10)
7041 ipfix = IPFIXDecoder()
7042 # first load template
7044 self.assertTrue(p.haslayer(IPFIX))
7045 self.assertEqual(p[IP].src, self.pg3.local_ip4)
7046 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
7047 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
7048 self.assertEqual(p[UDP].dport, 4739)
7049 self.assertEqual(p[IPFIX].observationDomainID,
7050 self.ipfix_domain_id)
7051 if p.haslayer(Template):
7052 ipfix.add_template(p.getlayer(Template))
7053 # verify events in data set
7055 if p.haslayer(Data):
7056 data = ipfix.decode_data_set(p.getlayer(Set))
7057 if ord(data[0][230]) == 10:
7058 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
7059 elif ord(data[0][230]) == 6:
7060 self.verify_ipfix_nat64_ses(data,
7062 self.pg0.remote_ip6n,
7063 self.pg1.remote_ip4,
7066 self.logger.error(ppp("Unexpected or invalid packet: ", p))
7069 self.pg_enable_capture(self.pg_interfaces)
7070 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
7073 self.vapi.cli("ipfix flush") # FIXME this should be an API call
7074 capture = self.pg3.get_capture(2)
7075 # verify events in data set
7077 self.assertTrue(p.haslayer(IPFIX))
7078 self.assertEqual(p[IP].src, self.pg3.local_ip4)
7079 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
7080 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
7081 self.assertEqual(p[UDP].dport, 4739)
7082 self.assertEqual(p[IPFIX].observationDomainID,
7083 self.ipfix_domain_id)
7084 if p.haslayer(Data):
7085 data = ipfix.decode_data_set(p.getlayer(Set))
7086 if ord(data[0][230]) == 11:
7087 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
7088 elif ord(data[0][230]) == 7:
7089 self.verify_ipfix_nat64_ses(data,
7091 self.pg0.remote_ip6n,
7092 self.pg1.remote_ip4,
7095 self.logger.error(ppp("Unexpected or invalid packet: ", p))
7097 def nat64_get_ses_num(self):
7099 Return number of active NAT64 sessions.
7101 st = self.vapi.nat64_st_dump()
7104 def clear_nat64(self):
7106 Clear NAT64 configuration.
7108 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
7109 domain_id=self.ipfix_domain_id)
7110 self.ipfix_src_port = 4739
7111 self.ipfix_domain_id = 1
7113 self.vapi.nat_set_timeouts()
7115 interfaces = self.vapi.nat64_interface_dump()
7116 for intf in interfaces:
7117 if intf.is_inside > 1:
7118 self.vapi.nat64_add_del_interface(intf.sw_if_index,
7121 self.vapi.nat64_add_del_interface(intf.sw_if_index,
7125 bib = self.vapi.nat64_bib_dump(255)
7128 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
7136 adresses = self.vapi.nat64_pool_addr_dump()
7137 for addr in adresses:
7138 self.vapi.nat64_add_del_pool_addr_range(addr.address,
7143 prefixes = self.vapi.nat64_prefix_dump()
7144 for prefix in prefixes:
7145 self.vapi.nat64_add_del_prefix(prefix.prefix,
7147 vrf_id=prefix.vrf_id,
7151 super(TestNAT64, self).tearDown()
7152 if not self.vpp_dead:
7153 self.logger.info(self.vapi.cli("show nat64 pool"))
7154 self.logger.info(self.vapi.cli("show nat64 interfaces"))
7155 self.logger.info(self.vapi.cli("show nat64 prefix"))
7156 self.logger.info(self.vapi.cli("show nat64 bib all"))
7157 self.logger.info(self.vapi.cli("show nat64 session table all"))
7158 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
7162 class TestDSlite(MethodHolder):
7163 """ DS-Lite Test Cases """
7166 def setUpClass(cls):
7167 super(TestDSlite, cls).setUpClass()
7170 cls.nat_addr = '10.0.0.3'
7171 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
7173 cls.create_pg_interfaces(range(2))
7175 cls.pg0.config_ip4()
7176 cls.pg0.resolve_arp()
7178 cls.pg1.config_ip6()
7179 cls.pg1.generate_remote_hosts(2)
7180 cls.pg1.configure_ipv6_neighbors()
7183 super(TestDSlite, cls).tearDownClass()
7186 def test_dslite(self):
7187 """ Test DS-Lite """
7188 nat_config = self.vapi.nat_show_config()
7189 self.assertEqual(0, nat_config.dslite_ce)
7191 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
7193 aftr_ip4 = '192.0.0.1'
7194 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
7195 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
7196 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
7197 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
7200 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7201 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
7202 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
7203 UDP(sport=20000, dport=10000))
7204 self.pg1.add_stream(p)
7205 self.pg_enable_capture(self.pg_interfaces)
7207 capture = self.pg0.get_capture(1)
7208 capture = capture[0]
7209 self.assertFalse(capture.haslayer(IPv6))
7210 self.assertEqual(capture[IP].src, self.nat_addr)
7211 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
7212 self.assertNotEqual(capture[UDP].sport, 20000)
7213 self.assertEqual(capture[UDP].dport, 10000)
7214 self.assert_packet_checksums_valid(capture)
7215 out_port = capture[UDP].sport
7217 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7218 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
7219 UDP(sport=10000, dport=out_port))
7220 self.pg0.add_stream(p)
7221 self.pg_enable_capture(self.pg_interfaces)
7223 capture = self.pg1.get_capture(1)
7224 capture = capture[0]
7225 self.assertEqual(capture[IPv6].src, aftr_ip6)
7226 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
7227 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7228 self.assertEqual(capture[IP].dst, '192.168.1.1')
7229 self.assertEqual(capture[UDP].sport, 10000)
7230 self.assertEqual(capture[UDP].dport, 20000)
7231 self.assert_packet_checksums_valid(capture)
7234 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7235 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
7236 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
7237 TCP(sport=20001, dport=10001))
7238 self.pg1.add_stream(p)
7239 self.pg_enable_capture(self.pg_interfaces)
7241 capture = self.pg0.get_capture(1)
7242 capture = capture[0]
7243 self.assertFalse(capture.haslayer(IPv6))
7244 self.assertEqual(capture[IP].src, self.nat_addr)
7245 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
7246 self.assertNotEqual(capture[TCP].sport, 20001)
7247 self.assertEqual(capture[TCP].dport, 10001)
7248 self.assert_packet_checksums_valid(capture)
7249 out_port = capture[TCP].sport
7251 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7252 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
7253 TCP(sport=10001, dport=out_port))
7254 self.pg0.add_stream(p)
7255 self.pg_enable_capture(self.pg_interfaces)
7257 capture = self.pg1.get_capture(1)
7258 capture = capture[0]
7259 self.assertEqual(capture[IPv6].src, aftr_ip6)
7260 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
7261 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7262 self.assertEqual(capture[IP].dst, '192.168.1.1')
7263 self.assertEqual(capture[TCP].sport, 10001)
7264 self.assertEqual(capture[TCP].dport, 20001)
7265 self.assert_packet_checksums_valid(capture)
7268 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7269 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
7270 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
7271 ICMP(id=4000, type='echo-request'))
7272 self.pg1.add_stream(p)
7273 self.pg_enable_capture(self.pg_interfaces)
7275 capture = self.pg0.get_capture(1)
7276 capture = capture[0]
7277 self.assertFalse(capture.haslayer(IPv6))
7278 self.assertEqual(capture[IP].src, self.nat_addr)
7279 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
7280 self.assertNotEqual(capture[ICMP].id, 4000)
7281 self.assert_packet_checksums_valid(capture)
7282 out_id = capture[ICMP].id
7284 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7285 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
7286 ICMP(id=out_id, type='echo-reply'))
7287 self.pg0.add_stream(p)
7288 self.pg_enable_capture(self.pg_interfaces)
7290 capture = self.pg1.get_capture(1)
7291 capture = capture[0]
7292 self.assertEqual(capture[IPv6].src, aftr_ip6)
7293 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
7294 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7295 self.assertEqual(capture[IP].dst, '192.168.1.1')
7296 self.assertEqual(capture[ICMP].id, 4000)
7297 self.assert_packet_checksums_valid(capture)
7299 # ping DS-Lite AFTR tunnel endpoint address
7300 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7301 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
7302 ICMPv6EchoRequest())
7303 self.pg1.add_stream(p)
7304 self.pg_enable_capture(self.pg_interfaces)
7306 capture = self.pg1.get_capture(1)
7307 capture = capture[0]
7308 self.assertEqual(capture[IPv6].src, aftr_ip6)
7309 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
7310 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
7313 super(TestDSlite, self).tearDown()
7314 if not self.vpp_dead:
7315 self.logger.info(self.vapi.cli("show dslite pool"))
7317 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
7318 self.logger.info(self.vapi.cli("show dslite sessions"))
7321 class TestDSliteCE(MethodHolder):
7322 """ DS-Lite CE Test Cases """
7325 def setUpConstants(cls):
7326 super(TestDSliteCE, cls).setUpConstants()
7327 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
7330 def setUpClass(cls):
7331 super(TestDSliteCE, cls).setUpClass()
7334 cls.create_pg_interfaces(range(2))
7336 cls.pg0.config_ip4()
7337 cls.pg0.resolve_arp()
7339 cls.pg1.config_ip6()
7340 cls.pg1.generate_remote_hosts(1)
7341 cls.pg1.configure_ipv6_neighbors()
7344 super(TestDSliteCE, cls).tearDownClass()
7347 def test_dslite_ce(self):
7348 """ Test DS-Lite CE """
7350 nat_config = self.vapi.nat_show_config()
7351 self.assertEqual(1, nat_config.dslite_ce)
7353 b4_ip4 = '192.0.0.2'
7354 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
7355 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
7356 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
7357 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
7359 aftr_ip4 = '192.0.0.1'
7360 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
7361 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
7362 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
7363 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
7365 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
7366 dst_address_length=128,
7367 next_hop_address=self.pg1.remote_ip6n,
7368 next_hop_sw_if_index=self.pg1.sw_if_index,
7372 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7373 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
7374 UDP(sport=10000, dport=20000))
7375 self.pg0.add_stream(p)
7376 self.pg_enable_capture(self.pg_interfaces)
7378 capture = self.pg1.get_capture(1)
7379 capture = capture[0]
7380 self.assertEqual(capture[IPv6].src, b4_ip6)
7381 self.assertEqual(capture[IPv6].dst, aftr_ip6)
7382 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7383 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
7384 self.assertEqual(capture[UDP].sport, 10000)
7385 self.assertEqual(capture[UDP].dport, 20000)
7386 self.assert_packet_checksums_valid(capture)
7389 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7390 IPv6(dst=b4_ip6, src=aftr_ip6) /
7391 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
7392 UDP(sport=20000, dport=10000))
7393 self.pg1.add_stream(p)
7394 self.pg_enable_capture(self.pg_interfaces)
7396 capture = self.pg0.get_capture(1)
7397 capture = capture[0]
7398 self.assertFalse(capture.haslayer(IPv6))
7399 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
7400 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
7401 self.assertEqual(capture[UDP].sport, 20000)
7402 self.assertEqual(capture[UDP].dport, 10000)
7403 self.assert_packet_checksums_valid(capture)
7405 # ping DS-Lite B4 tunnel endpoint address
7406 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7407 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
7408 ICMPv6EchoRequest())
7409 self.pg1.add_stream(p)
7410 self.pg_enable_capture(self.pg_interfaces)
7412 capture = self.pg1.get_capture(1)
7413 capture = capture[0]
7414 self.assertEqual(capture[IPv6].src, b4_ip6)
7415 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
7416 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
7419 super(TestDSliteCE, self).tearDown()
7420 if not self.vpp_dead:
7422 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
7424 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
7427 class TestNAT66(MethodHolder):
7428 """ NAT66 Test Cases """
7431 def setUpClass(cls):
7432 super(TestNAT66, cls).setUpClass()
7435 cls.nat_addr = 'fd01:ff::2'
7436 cls.nat_addr_n = socket.inet_pton(socket.AF_INET6, cls.nat_addr)
7438 cls.create_pg_interfaces(range(2))
7439 cls.interfaces = list(cls.pg_interfaces)
7441 for i in cls.interfaces:
7444 i.configure_ipv6_neighbors()
7447 super(TestNAT66, cls).tearDownClass()
7450 def test_static(self):
7451 """ 1:1 NAT66 test """
7452 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
7453 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index, is_inside=0)
7454 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
7459 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7460 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7463 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7464 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7467 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7468 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7469 ICMPv6EchoRequest())
7471 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7472 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7473 GRE() / IP() / TCP())
7475 self.pg0.add_stream(pkts)
7476 self.pg_enable_capture(self.pg_interfaces)
7478 capture = self.pg1.get_capture(len(pkts))
7479 for packet in capture:
7481 self.assertEqual(packet[IPv6].src, self.nat_addr)
7482 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
7483 self.assert_packet_checksums_valid(packet)
7485 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7490 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7491 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7494 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7495 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7498 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7499 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7502 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7503 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7504 GRE() / IP() / TCP())
7506 self.pg1.add_stream(pkts)
7507 self.pg_enable_capture(self.pg_interfaces)
7509 capture = self.pg0.get_capture(len(pkts))
7510 for packet in capture:
7512 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
7513 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
7514 self.assert_packet_checksums_valid(packet)
7516 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7519 sm = self.vapi.nat66_static_mapping_dump()
7520 self.assertEqual(len(sm), 1)
7521 self.assertEqual(sm[0].total_pkts, 8)
7523 def test_check_no_translate(self):
7524 """ NAT66 translate only when egress interface is outside interface """
7525 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
7526 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index)
7527 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
7531 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7532 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7534 self.pg0.add_stream([p])
7535 self.pg_enable_capture(self.pg_interfaces)
7537 capture = self.pg1.get_capture(1)
7540 self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
7541 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
7543 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7546 def clear_nat66(self):
7548 Clear NAT66 configuration.
7550 interfaces = self.vapi.nat66_interface_dump()
7551 for intf in interfaces:
7552 self.vapi.nat66_add_del_interface(intf.sw_if_index,
7556 static_mappings = self.vapi.nat66_static_mapping_dump()
7557 for sm in static_mappings:
7558 self.vapi.nat66_add_del_static_mapping(sm.local_ip_address,
7559 sm.external_ip_address,
7564 super(TestNAT66, self).tearDown()
7565 if not self.vpp_dead:
7566 self.logger.info(self.vapi.cli("show nat66 interfaces"))
7567 self.logger.info(self.vapi.cli("show nat66 static mappings"))
7571 if __name__ == '__main__':
7572 unittest.main(testRunner=VppTestRunner)