9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
12 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
13 ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
29 def clear_nat44(self):
31 Clear NAT44 configuration.
33 if hasattr(self, 'pg7') and hasattr(self, 'pg8'):
34 # I found no elegant way to do this
35 self.vapi.ip_add_del_route(
36 dst_address=self.pg7.remote_ip4n,
37 dst_address_length=32,
38 next_hop_address=self.pg7.remote_ip4n,
39 next_hop_sw_if_index=self.pg7.sw_if_index,
41 self.vapi.ip_add_del_route(
42 dst_address=self.pg8.remote_ip4n,
43 dst_address_length=32,
44 next_hop_address=self.pg8.remote_ip4n,
45 next_hop_sw_if_index=self.pg8.sw_if_index,
48 for intf in [self.pg7, self.pg8]:
49 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
51 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
56 if self.pg7.has_ip4_config:
57 self.pg7.unconfig_ip4()
59 self.vapi.nat44_forwarding_enable_disable(0)
61 interfaces = self.vapi.nat44_interface_addr_dump()
62 for intf in interfaces:
63 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
64 twice_nat=intf.twice_nat,
67 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
68 domain_id=self.ipfix_domain_id)
69 self.ipfix_src_port = 4739
70 self.ipfix_domain_id = 1
72 interfaces = self.vapi.nat44_interface_dump()
73 for intf in interfaces:
74 if intf.is_inside > 1:
75 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
78 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
82 interfaces = self.vapi.nat44_interface_output_feature_dump()
83 for intf in interfaces:
84 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
88 static_mappings = self.vapi.nat44_static_mapping_dump()
89 for sm in static_mappings:
90 self.vapi.nat44_add_del_static_mapping(
92 sm.external_ip_address,
93 local_port=sm.local_port,
94 external_port=sm.external_port,
95 addr_only=sm.addr_only,
98 twice_nat=sm.twice_nat,
99 self_twice_nat=sm.self_twice_nat,
100 out2in_only=sm.out2in_only,
102 external_sw_if_index=sm.external_sw_if_index,
105 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
106 for lb_sm in lb_static_mappings:
107 self.vapi.nat44_add_del_lb_static_mapping(
111 twice_nat=lb_sm.twice_nat,
112 self_twice_nat=lb_sm.self_twice_nat,
113 out2in_only=lb_sm.out2in_only,
119 identity_mappings = self.vapi.nat44_identity_mapping_dump()
120 for id_m in identity_mappings:
121 self.vapi.nat44_add_del_identity_mapping(
122 addr_only=id_m.addr_only,
125 sw_if_index=id_m.sw_if_index,
127 protocol=id_m.protocol,
130 adresses = self.vapi.nat44_address_dump()
131 for addr in adresses:
132 self.vapi.nat44_add_del_address_range(addr.ip_address,
134 twice_nat=addr.twice_nat,
137 self.vapi.nat_set_reass()
138 self.vapi.nat_set_reass(is_ip6=1)
139 self.verify_no_nat44_user()
140 self.vapi.nat_set_timeouts()
142 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
143 local_port=0, external_port=0, vrf_id=0,
144 is_add=1, external_sw_if_index=0xFFFFFFFF,
145 proto=0, twice_nat=0, self_twice_nat=0,
146 out2in_only=0, tag=""):
148 Add/delete NAT44 static mapping
150 :param local_ip: Local IP address
151 :param external_ip: External IP address
152 :param local_port: Local port number (Optional)
153 :param external_port: External port number (Optional)
154 :param vrf_id: VRF ID (Default 0)
155 :param is_add: 1 if add, 0 if delete (Default add)
156 :param external_sw_if_index: External interface instead of IP address
157 :param proto: IP protocol (Mandatory if port specified)
158 :param twice_nat: 1 if translate external host address and port
159 :param self_twice_nat: 1 if translate external host address and port
160 whenever external host address equals
161 local address of internal host
162 :param out2in_only: if 1 rule is matching only out2in direction
163 :param tag: Opaque string tag
166 if local_port and external_port:
168 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
169 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
170 self.vapi.nat44_add_del_static_mapping(
173 external_sw_if_index,
185 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
187 Add/delete NAT44 address
189 :param ip: IP address
190 :param is_add: 1 if add, 0 if delete (Default add)
191 :param twice_nat: twice NAT address for extenal hosts
193 nat_addr = socket.inet_pton(socket.AF_INET, ip)
194 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
198 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
200 Create packet stream for inside network
202 :param in_if: Inside interface
203 :param out_if: Outside interface
204 :param dst_ip: Destination address
205 :param ttl: TTL of generated packets
208 dst_ip = out_if.remote_ip4
212 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
213 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
214 TCP(sport=self.tcp_port_in, dport=20))
218 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
219 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
220 UDP(sport=self.udp_port_in, dport=20))
224 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
225 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
226 ICMP(id=self.icmp_id_in, type='echo-request'))
231 def compose_ip6(self, ip4, pref, plen):
233 Compose IPv4-embedded IPv6 addresses
235 :param ip4: IPv4 address
236 :param pref: IPv6 prefix
237 :param plen: IPv6 prefix length
238 :returns: IPv4-embedded IPv6 addresses
240 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
241 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
256 pref_n[10] = ip4_n[3]
260 pref_n[10] = ip4_n[2]
261 pref_n[11] = ip4_n[3]
264 pref_n[10] = ip4_n[1]
265 pref_n[11] = ip4_n[2]
266 pref_n[12] = ip4_n[3]
268 pref_n[12] = ip4_n[0]
269 pref_n[13] = ip4_n[1]
270 pref_n[14] = ip4_n[2]
271 pref_n[15] = ip4_n[3]
272 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
274 def extract_ip4(self, ip6, plen):
276 Extract IPv4 address embedded in IPv6 addresses
278 :param ip6: IPv6 address
279 :param plen: IPv6 prefix length
280 :returns: extracted IPv4 address
282 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
314 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
316 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
318 Create IPv6 packet stream for inside network
320 :param in_if: Inside interface
321 :param out_if: Outside interface
322 :param ttl: Hop Limit of generated packets
323 :param pref: NAT64 prefix
324 :param plen: NAT64 prefix length
328 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
330 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
333 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
334 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
335 TCP(sport=self.tcp_port_in, dport=20))
339 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
340 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
341 UDP(sport=self.udp_port_in, dport=20))
345 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
346 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
347 ICMPv6EchoRequest(id=self.icmp_id_in))
352 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
353 use_inside_ports=False):
355 Create packet stream for outside network
357 :param out_if: Outside interface
358 :param dst_ip: Destination IP address (Default use global NAT address)
359 :param ttl: TTL of generated packets
360 :param use_inside_ports: Use inside NAT ports as destination ports
361 instead of outside ports
364 dst_ip = self.nat_addr
365 if not use_inside_ports:
366 tcp_port = self.tcp_port_out
367 udp_port = self.udp_port_out
368 icmp_id = self.icmp_id_out
370 tcp_port = self.tcp_port_in
371 udp_port = self.udp_port_in
372 icmp_id = self.icmp_id_in
375 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
376 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
377 TCP(dport=tcp_port, sport=20))
381 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
382 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
383 UDP(dport=udp_port, sport=20))
387 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
388 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
389 ICMP(id=icmp_id, type='echo-reply'))
394 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
396 Create packet stream for outside network
398 :param out_if: Outside interface
399 :param dst_ip: Destination IP address (Default use global NAT address)
400 :param hl: HL of generated packets
404 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
405 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
406 TCP(dport=self.tcp_port_out, sport=20))
410 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
411 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
412 UDP(dport=self.udp_port_out, sport=20))
416 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
417 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
418 ICMPv6EchoReply(id=self.icmp_id_out))
423 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
424 packet_num=3, dst_ip=None, is_ip6=False):
426 Verify captured packets on outside network
428 :param capture: Captured packets
429 :param nat_ip: Translated IP address (Default use global NAT address)
430 :param same_port: Sorce port number is not translated (Default False)
431 :param packet_num: Expected number of packets (Default 3)
432 :param dst_ip: Destination IP address (Default do not verify)
433 :param is_ip6: If L3 protocol is IPv6 (Default False)
437 ICMP46 = ICMPv6EchoRequest
442 nat_ip = self.nat_addr
443 self.assertEqual(packet_num, len(capture))
444 for packet in capture:
447 self.assert_packet_checksums_valid(packet)
448 self.assertEqual(packet[IP46].src, nat_ip)
449 if dst_ip is not None:
450 self.assertEqual(packet[IP46].dst, dst_ip)
451 if packet.haslayer(TCP):
453 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
456 packet[TCP].sport, self.tcp_port_in)
457 self.tcp_port_out = packet[TCP].sport
458 self.assert_packet_checksums_valid(packet)
459 elif packet.haslayer(UDP):
461 self.assertEqual(packet[UDP].sport, self.udp_port_in)
464 packet[UDP].sport, self.udp_port_in)
465 self.udp_port_out = packet[UDP].sport
468 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
470 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
471 self.icmp_id_out = packet[ICMP46].id
472 self.assert_packet_checksums_valid(packet)
474 self.logger.error(ppp("Unexpected or invalid packet "
475 "(outside network):", packet))
478 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
479 packet_num=3, dst_ip=None):
481 Verify captured packets on outside network
483 :param capture: Captured packets
484 :param nat_ip: Translated IP address
485 :param same_port: Sorce port number is not translated (Default False)
486 :param packet_num: Expected number of packets (Default 3)
487 :param dst_ip: Destination IP address (Default do not verify)
489 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
492 def verify_capture_in(self, capture, in_if, packet_num=3):
494 Verify captured packets on inside network
496 :param capture: Captured packets
497 :param in_if: Inside interface
498 :param packet_num: Expected number of packets (Default 3)
500 self.assertEqual(packet_num, len(capture))
501 for packet in capture:
503 self.assert_packet_checksums_valid(packet)
504 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
505 if packet.haslayer(TCP):
506 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
507 elif packet.haslayer(UDP):
508 self.assertEqual(packet[UDP].dport, self.udp_port_in)
510 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
512 self.logger.error(ppp("Unexpected or invalid packet "
513 "(inside network):", packet))
516 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
518 Verify captured IPv6 packets on inside network
520 :param capture: Captured packets
521 :param src_ip: Source IP
522 :param dst_ip: Destination IP address
523 :param packet_num: Expected number of packets (Default 3)
525 self.assertEqual(packet_num, len(capture))
526 for packet in capture:
528 self.assertEqual(packet[IPv6].src, src_ip)
529 self.assertEqual(packet[IPv6].dst, dst_ip)
530 self.assert_packet_checksums_valid(packet)
531 if packet.haslayer(TCP):
532 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
533 elif packet.haslayer(UDP):
534 self.assertEqual(packet[UDP].dport, self.udp_port_in)
536 self.assertEqual(packet[ICMPv6EchoReply].id,
539 self.logger.error(ppp("Unexpected or invalid packet "
540 "(inside network):", packet))
543 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
545 Verify captured packet that don't have to be translated
547 :param capture: Captured packets
548 :param ingress_if: Ingress interface
549 :param egress_if: Egress interface
551 for packet in capture:
553 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
554 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
555 if packet.haslayer(TCP):
556 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
557 elif packet.haslayer(UDP):
558 self.assertEqual(packet[UDP].sport, self.udp_port_in)
560 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
562 self.logger.error(ppp("Unexpected or invalid packet "
563 "(inside network):", packet))
566 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
567 packet_num=3, icmp_type=11):
569 Verify captured packets with ICMP errors on outside network
571 :param capture: Captured packets
572 :param src_ip: Translated IP address or IP address of VPP
573 (Default use global NAT address)
574 :param packet_num: Expected number of packets (Default 3)
575 :param icmp_type: Type of error ICMP packet
576 we are expecting (Default 11)
579 src_ip = self.nat_addr
580 self.assertEqual(packet_num, len(capture))
581 for packet in capture:
583 self.assertEqual(packet[IP].src, src_ip)
584 self.assertTrue(packet.haslayer(ICMP))
586 self.assertEqual(icmp.type, icmp_type)
587 self.assertTrue(icmp.haslayer(IPerror))
588 inner_ip = icmp[IPerror]
589 if inner_ip.haslayer(TCPerror):
590 self.assertEqual(inner_ip[TCPerror].dport,
592 elif inner_ip.haslayer(UDPerror):
593 self.assertEqual(inner_ip[UDPerror].dport,
596 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
598 self.logger.error(ppp("Unexpected or invalid packet "
599 "(outside network):", packet))
602 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
605 Verify captured packets with ICMP errors on inside network
607 :param capture: Captured packets
608 :param in_if: Inside interface
609 :param packet_num: Expected number of packets (Default 3)
610 :param icmp_type: Type of error ICMP packet
611 we are expecting (Default 11)
613 self.assertEqual(packet_num, len(capture))
614 for packet in capture:
616 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
617 self.assertTrue(packet.haslayer(ICMP))
619 self.assertEqual(icmp.type, icmp_type)
620 self.assertTrue(icmp.haslayer(IPerror))
621 inner_ip = icmp[IPerror]
622 if inner_ip.haslayer(TCPerror):
623 self.assertEqual(inner_ip[TCPerror].sport,
625 elif inner_ip.haslayer(UDPerror):
626 self.assertEqual(inner_ip[UDPerror].sport,
629 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
631 self.logger.error(ppp("Unexpected or invalid packet "
632 "(inside network):", packet))
635 def create_stream_frag(self, src_if, dst, sport, dport, data):
637 Create fragmented packet stream
639 :param src_if: Source interface
640 :param dst: Destination IPv4 address
641 :param sport: Source TCP port
642 :param dport: Destination TCP port
643 :param data: Payload data
646 id = random.randint(0, 65535)
647 p = (IP(src=src_if.remote_ip4, dst=dst) /
648 TCP(sport=sport, dport=dport) /
650 p = p.__class__(str(p))
651 chksum = p['TCP'].chksum
653 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
654 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
655 TCP(sport=sport, dport=dport, chksum=chksum) /
658 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
659 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
660 proto=IP_PROTOS.tcp) /
663 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
664 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
670 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
671 pref=None, plen=0, frag_size=128):
673 Create fragmented packet stream
675 :param src_if: Source interface
676 :param dst: Destination IPv4 address
677 :param sport: Source TCP port
678 :param dport: Destination TCP port
679 :param data: Payload data
680 :param pref: NAT64 prefix
681 :param plen: NAT64 prefix length
682 :param fragsize: size of fragments
686 dst_ip6 = ''.join(['64:ff9b::', dst])
688 dst_ip6 = self.compose_ip6(dst, pref, plen)
690 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
691 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
692 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
693 TCP(sport=sport, dport=dport) /
696 return fragment6(p, frag_size)
698 def reass_frags_and_verify(self, frags, src, dst):
700 Reassemble and verify fragmented packet
702 :param frags: Captured fragments
703 :param src: Source IPv4 address to verify
704 :param dst: Destination IPv4 address to verify
706 :returns: Reassembled IPv4 packet
708 buffer = StringIO.StringIO()
710 self.assertEqual(p[IP].src, src)
711 self.assertEqual(p[IP].dst, dst)
712 self.assert_ip_checksum_valid(p)
713 buffer.seek(p[IP].frag * 8)
714 buffer.write(p[IP].payload)
715 ip = frags[0].getlayer(IP)
716 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
717 proto=frags[0][IP].proto)
718 if ip.proto == IP_PROTOS.tcp:
719 p = (ip / TCP(buffer.getvalue()))
720 self.assert_tcp_checksum_valid(p)
721 elif ip.proto == IP_PROTOS.udp:
722 p = (ip / UDP(buffer.getvalue()))
725 def reass_frags_and_verify_ip6(self, frags, src, dst):
727 Reassemble and verify fragmented packet
729 :param frags: Captured fragments
730 :param src: Source IPv6 address to verify
731 :param dst: Destination IPv6 address to verify
733 :returns: Reassembled IPv6 packet
735 buffer = StringIO.StringIO()
737 self.assertEqual(p[IPv6].src, src)
738 self.assertEqual(p[IPv6].dst, dst)
739 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
740 buffer.write(p[IPv6ExtHdrFragment].payload)
741 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
742 nh=frags[0][IPv6ExtHdrFragment].nh)
743 if ip.nh == IP_PROTOS.tcp:
744 p = (ip / TCP(buffer.getvalue()))
745 elif ip.nh == IP_PROTOS.udp:
746 p = (ip / UDP(buffer.getvalue()))
747 self.assert_packet_checksums_valid(p)
750 def initiate_tcp_session(self, in_if, out_if):
752 Initiates TCP session
754 :param in_if: Inside interface
755 :param out_if: Outside interface
759 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
760 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
761 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
764 self.pg_enable_capture(self.pg_interfaces)
766 capture = out_if.get_capture(1)
768 self.tcp_port_out = p[TCP].sport
770 # SYN + ACK packet out->in
771 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
772 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
773 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
776 self.pg_enable_capture(self.pg_interfaces)
781 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
782 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
783 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
786 self.pg_enable_capture(self.pg_interfaces)
788 out_if.get_capture(1)
791 self.logger.error("TCP 3 way handshake failed")
794 def verify_ipfix_nat44_ses(self, data):
796 Verify IPFIX NAT44 session create/delete event
798 :param data: Decoded IPFIX data records
800 nat44_ses_create_num = 0
801 nat44_ses_delete_num = 0
802 self.assertEqual(6, len(data))
805 self.assertIn(ord(record[230]), [4, 5])
806 if ord(record[230]) == 4:
807 nat44_ses_create_num += 1
809 nat44_ses_delete_num += 1
811 self.assertEqual(self.pg0.remote_ip4n, record[8])
812 # postNATSourceIPv4Address
813 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
816 self.assertEqual(struct.pack("!I", 0), record[234])
817 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
818 if IP_PROTOS.icmp == ord(record[4]):
819 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
820 self.assertEqual(struct.pack("!H", self.icmp_id_out),
822 elif IP_PROTOS.tcp == ord(record[4]):
823 self.assertEqual(struct.pack("!H", self.tcp_port_in),
825 self.assertEqual(struct.pack("!H", self.tcp_port_out),
827 elif IP_PROTOS.udp == ord(record[4]):
828 self.assertEqual(struct.pack("!H", self.udp_port_in),
830 self.assertEqual(struct.pack("!H", self.udp_port_out),
833 self.fail("Invalid protocol")
834 self.assertEqual(3, nat44_ses_create_num)
835 self.assertEqual(3, nat44_ses_delete_num)
837 def verify_ipfix_addr_exhausted(self, data):
839 Verify IPFIX NAT addresses event
841 :param data: Decoded IPFIX data records
843 self.assertEqual(1, len(data))
846 self.assertEqual(ord(record[230]), 3)
848 self.assertEqual(struct.pack("!I", 0), record[283])
850 def verify_ipfix_max_sessions(self, data, limit):
852 Verify IPFIX maximum session entries exceeded event
854 :param data: Decoded IPFIX data records
855 :param limit: Number of maximum session entries that can be created.
857 self.assertEqual(1, len(data))
860 self.assertEqual(ord(record[230]), 13)
861 # natQuotaExceededEvent
862 self.assertEqual(struct.pack("I", 1), record[466])
864 self.assertEqual(struct.pack("I", limit), record[471])
866 def verify_ipfix_max_bibs(self, data, limit):
868 Verify IPFIX maximum BIB entries exceeded event
870 :param data: Decoded IPFIX data records
871 :param limit: Number of maximum BIB entries that can be created.
873 self.assertEqual(1, len(data))
876 self.assertEqual(ord(record[230]), 13)
877 # natQuotaExceededEvent
878 self.assertEqual(struct.pack("I", 2), record[466])
880 self.assertEqual(struct.pack("I", limit), record[472])
882 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
884 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
886 :param data: Decoded IPFIX data records
887 :param limit: Number of maximum fragments pending reassembly
888 :param src_addr: IPv6 source address
890 self.assertEqual(1, len(data))
893 self.assertEqual(ord(record[230]), 13)
894 # natQuotaExceededEvent
895 self.assertEqual(struct.pack("I", 5), record[466])
896 # maxFragmentsPendingReassembly
897 self.assertEqual(struct.pack("I", limit), record[475])
899 self.assertEqual(src_addr, record[27])
901 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
903 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
905 :param data: Decoded IPFIX data records
906 :param limit: Number of maximum fragments pending reassembly
907 :param src_addr: IPv4 source address
909 self.assertEqual(1, len(data))
912 self.assertEqual(ord(record[230]), 13)
913 # natQuotaExceededEvent
914 self.assertEqual(struct.pack("I", 5), record[466])
915 # maxFragmentsPendingReassembly
916 self.assertEqual(struct.pack("I", limit), record[475])
918 self.assertEqual(src_addr, record[8])
920 def verify_ipfix_bib(self, data, is_create, src_addr):
922 Verify IPFIX NAT64 BIB create and delete events
924 :param data: Decoded IPFIX data records
925 :param is_create: Create event if nonzero value otherwise delete event
926 :param src_addr: IPv6 source address
928 self.assertEqual(1, len(data))
932 self.assertEqual(ord(record[230]), 10)
934 self.assertEqual(ord(record[230]), 11)
936 self.assertEqual(src_addr, record[27])
937 # postNATSourceIPv4Address
938 self.assertEqual(self.nat_addr_n, record[225])
940 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
942 self.assertEqual(struct.pack("!I", 0), record[234])
943 # sourceTransportPort
944 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
945 # postNAPTSourceTransportPort
946 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
948 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
951 Verify IPFIX NAT64 session create and delete events
953 :param data: Decoded IPFIX data records
954 :param is_create: Create event if nonzero value otherwise delete event
955 :param src_addr: IPv6 source address
956 :param dst_addr: IPv4 destination address
957 :param dst_port: destination TCP port
959 self.assertEqual(1, len(data))
963 self.assertEqual(ord(record[230]), 6)
965 self.assertEqual(ord(record[230]), 7)
967 self.assertEqual(src_addr, record[27])
968 # destinationIPv6Address
969 self.assertEqual(socket.inet_pton(socket.AF_INET6,
970 self.compose_ip6(dst_addr,
974 # postNATSourceIPv4Address
975 self.assertEqual(self.nat_addr_n, record[225])
976 # postNATDestinationIPv4Address
977 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
980 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
982 self.assertEqual(struct.pack("!I", 0), record[234])
983 # sourceTransportPort
984 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
985 # postNAPTSourceTransportPort
986 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
987 # destinationTransportPort
988 self.assertEqual(struct.pack("!H", dst_port), record[11])
989 # postNAPTDestinationTransportPort
990 self.assertEqual(struct.pack("!H", dst_port), record[228])
992 def verify_no_nat44_user(self):
993 """ Verify that there is no NAT44 user """
994 users = self.vapi.nat44_user_dump()
995 self.assertEqual(len(users), 0)
997 def verify_ipfix_max_entries_per_user(self, data, limit, src_addr):
999 Verify IPFIX maximum entries per user exceeded event
1001 :param data: Decoded IPFIX data records
1002 :param limit: Number of maximum entries per user
1003 :param src_addr: IPv4 source address
1005 self.assertEqual(1, len(data))
1008 self.assertEqual(ord(record[230]), 13)
1009 # natQuotaExceededEvent
1010 self.assertEqual(struct.pack("I", 3), record[466])
1012 self.assertEqual(struct.pack("I", limit), record[473])
1014 self.assertEqual(src_addr, record[8])
1017 class TestNAT44(MethodHolder):
1018 """ NAT44 Test Cases """
1021 def setUpClass(cls):
1022 super(TestNAT44, cls).setUpClass()
1023 cls.vapi.cli("set log class nat level debug")
1026 cls.tcp_port_in = 6303
1027 cls.tcp_port_out = 6303
1028 cls.udp_port_in = 6304
1029 cls.udp_port_out = 6304
1030 cls.icmp_id_in = 6305
1031 cls.icmp_id_out = 6305
1032 cls.nat_addr = '10.0.0.3'
1033 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
1034 cls.ipfix_src_port = 4739
1035 cls.ipfix_domain_id = 1
1036 cls.tcp_external_port = 80
1038 cls.create_pg_interfaces(range(10))
1039 cls.interfaces = list(cls.pg_interfaces[0:4])
1041 for i in cls.interfaces:
1046 cls.pg0.generate_remote_hosts(3)
1047 cls.pg0.configure_ipv4_neighbors()
1049 cls.pg1.generate_remote_hosts(1)
1050 cls.pg1.configure_ipv4_neighbors()
1052 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
1053 cls.vapi.ip_table_add_del(10, is_add=1)
1054 cls.vapi.ip_table_add_del(20, is_add=1)
1056 cls.pg4._local_ip4 = "172.16.255.1"
1057 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1058 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
1059 cls.pg4.set_table_ip4(10)
1060 cls.pg5._local_ip4 = "172.17.255.3"
1061 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1062 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
1063 cls.pg5.set_table_ip4(10)
1064 cls.pg6._local_ip4 = "172.16.255.1"
1065 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1066 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
1067 cls.pg6.set_table_ip4(20)
1068 for i in cls.overlapping_interfaces:
1076 cls.pg9.generate_remote_hosts(2)
1077 cls.pg9.config_ip4()
1078 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
1079 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
1083 cls.pg9.resolve_arp()
1084 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
1085 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
1086 cls.pg9.resolve_arp()
1089 super(TestNAT44, cls).tearDownClass()
1092 def test_dynamic(self):
1093 """ NAT44 dynamic translation test """
1095 self.nat44_add_address(self.nat_addr)
1096 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1097 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1101 pkts = self.create_stream_in(self.pg0, self.pg1)
1102 self.pg0.add_stream(pkts)
1103 self.pg_enable_capture(self.pg_interfaces)
1105 capture = self.pg1.get_capture(len(pkts))
1106 self.verify_capture_out(capture)
1109 pkts = self.create_stream_out(self.pg1)
1110 self.pg1.add_stream(pkts)
1111 self.pg_enable_capture(self.pg_interfaces)
1113 capture = self.pg0.get_capture(len(pkts))
1114 self.verify_capture_in(capture, self.pg0)
1116 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1117 """ NAT44 handling of client packets with TTL=1 """
1119 self.nat44_add_address(self.nat_addr)
1120 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1121 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1124 # Client side - generate traffic
1125 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1126 self.pg0.add_stream(pkts)
1127 self.pg_enable_capture(self.pg_interfaces)
1130 # Client side - verify ICMP type 11 packets
1131 capture = self.pg0.get_capture(len(pkts))
1132 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1134 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1135 """ NAT44 handling of server packets with TTL=1 """
1137 self.nat44_add_address(self.nat_addr)
1138 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1139 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1142 # Client side - create sessions
1143 pkts = self.create_stream_in(self.pg0, self.pg1)
1144 self.pg0.add_stream(pkts)
1145 self.pg_enable_capture(self.pg_interfaces)
1148 # Server side - generate traffic
1149 capture = self.pg1.get_capture(len(pkts))
1150 self.verify_capture_out(capture)
1151 pkts = self.create_stream_out(self.pg1, ttl=1)
1152 self.pg1.add_stream(pkts)
1153 self.pg_enable_capture(self.pg_interfaces)
1156 # Server side - verify ICMP type 11 packets
1157 capture = self.pg1.get_capture(len(pkts))
1158 self.verify_capture_out_with_icmp_errors(capture,
1159 src_ip=self.pg1.local_ip4)
1161 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1162 """ NAT44 handling of error responses to client packets with TTL=2 """
1164 self.nat44_add_address(self.nat_addr)
1165 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1166 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1169 # Client side - generate traffic
1170 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1171 self.pg0.add_stream(pkts)
1172 self.pg_enable_capture(self.pg_interfaces)
1175 # Server side - simulate ICMP type 11 response
1176 capture = self.pg1.get_capture(len(pkts))
1177 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1178 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1179 ICMP(type=11) / packet[IP] for packet in capture]
1180 self.pg1.add_stream(pkts)
1181 self.pg_enable_capture(self.pg_interfaces)
1184 # Client side - verify ICMP type 11 packets
1185 capture = self.pg0.get_capture(len(pkts))
1186 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1188 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1189 """ NAT44 handling of error responses to server packets with TTL=2 """
1191 self.nat44_add_address(self.nat_addr)
1192 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1193 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1196 # Client side - create sessions
1197 pkts = self.create_stream_in(self.pg0, self.pg1)
1198 self.pg0.add_stream(pkts)
1199 self.pg_enable_capture(self.pg_interfaces)
1202 # Server side - generate traffic
1203 capture = self.pg1.get_capture(len(pkts))
1204 self.verify_capture_out(capture)
1205 pkts = self.create_stream_out(self.pg1, ttl=2)
1206 self.pg1.add_stream(pkts)
1207 self.pg_enable_capture(self.pg_interfaces)
1210 # Client side - simulate ICMP type 11 response
1211 capture = self.pg0.get_capture(len(pkts))
1212 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1213 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1214 ICMP(type=11) / packet[IP] for packet in capture]
1215 self.pg0.add_stream(pkts)
1216 self.pg_enable_capture(self.pg_interfaces)
1219 # Server side - verify ICMP type 11 packets
1220 capture = self.pg1.get_capture(len(pkts))
1221 self.verify_capture_out_with_icmp_errors(capture)
1223 def test_ping_out_interface_from_outside(self):
1224 """ Ping NAT44 out interface from outside network """
1226 self.nat44_add_address(self.nat_addr)
1227 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1228 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1231 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1232 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1233 ICMP(id=self.icmp_id_out, type='echo-request'))
1235 self.pg1.add_stream(pkts)
1236 self.pg_enable_capture(self.pg_interfaces)
1238 capture = self.pg1.get_capture(len(pkts))
1239 self.assertEqual(1, len(capture))
1242 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1243 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1244 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1245 self.assertEqual(packet[ICMP].type, 0) # echo reply
1247 self.logger.error(ppp("Unexpected or invalid packet "
1248 "(outside network):", packet))
1251 def test_ping_internal_host_from_outside(self):
1252 """ Ping internal host from outside network """
1254 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1255 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1256 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1260 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1261 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1262 ICMP(id=self.icmp_id_out, type='echo-request'))
1263 self.pg1.add_stream(pkt)
1264 self.pg_enable_capture(self.pg_interfaces)
1266 capture = self.pg0.get_capture(1)
1267 self.verify_capture_in(capture, self.pg0, packet_num=1)
1268 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1271 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1272 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1273 ICMP(id=self.icmp_id_in, type='echo-reply'))
1274 self.pg0.add_stream(pkt)
1275 self.pg_enable_capture(self.pg_interfaces)
1277 capture = self.pg1.get_capture(1)
1278 self.verify_capture_out(capture, same_port=True, packet_num=1)
1279 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1281 def test_forwarding(self):
1282 """ NAT44 forwarding test """
1284 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1285 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1287 self.vapi.nat44_forwarding_enable_disable(1)
1289 real_ip = self.pg0.remote_ip4n
1290 alias_ip = self.nat_addr_n
1291 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1292 external_ip=alias_ip)
1295 # static mapping match
1297 pkts = self.create_stream_out(self.pg1)
1298 self.pg1.add_stream(pkts)
1299 self.pg_enable_capture(self.pg_interfaces)
1301 capture = self.pg0.get_capture(len(pkts))
1302 self.verify_capture_in(capture, self.pg0)
1304 pkts = self.create_stream_in(self.pg0, self.pg1)
1305 self.pg0.add_stream(pkts)
1306 self.pg_enable_capture(self.pg_interfaces)
1308 capture = self.pg1.get_capture(len(pkts))
1309 self.verify_capture_out(capture, same_port=True)
1311 # no static mapping match
1313 host0 = self.pg0.remote_hosts[0]
1314 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1316 pkts = self.create_stream_out(self.pg1,
1317 dst_ip=self.pg0.remote_ip4,
1318 use_inside_ports=True)
1319 self.pg1.add_stream(pkts)
1320 self.pg_enable_capture(self.pg_interfaces)
1322 capture = self.pg0.get_capture(len(pkts))
1323 self.verify_capture_in(capture, self.pg0)
1325 pkts = self.create_stream_in(self.pg0, self.pg1)
1326 self.pg0.add_stream(pkts)
1327 self.pg_enable_capture(self.pg_interfaces)
1329 capture = self.pg1.get_capture(len(pkts))
1330 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1333 self.pg0.remote_hosts[0] = host0
1336 self.vapi.nat44_forwarding_enable_disable(0)
1337 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1338 external_ip=alias_ip,
1341 def test_static_in(self):
1342 """ 1:1 NAT initialized from inside network """
1344 nat_ip = "10.0.0.10"
1345 self.tcp_port_out = 6303
1346 self.udp_port_out = 6304
1347 self.icmp_id_out = 6305
1349 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1350 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1351 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1353 sm = self.vapi.nat44_static_mapping_dump()
1354 self.assertEqual(len(sm), 1)
1355 self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1356 self.assertEqual(sm[0].protocol, 0)
1357 self.assertEqual(sm[0].local_port, 0)
1358 self.assertEqual(sm[0].external_port, 0)
1361 pkts = self.create_stream_in(self.pg0, self.pg1)
1362 self.pg0.add_stream(pkts)
1363 self.pg_enable_capture(self.pg_interfaces)
1365 capture = self.pg1.get_capture(len(pkts))
1366 self.verify_capture_out(capture, nat_ip, True)
1369 pkts = self.create_stream_out(self.pg1, nat_ip)
1370 self.pg1.add_stream(pkts)
1371 self.pg_enable_capture(self.pg_interfaces)
1373 capture = self.pg0.get_capture(len(pkts))
1374 self.verify_capture_in(capture, self.pg0)
1376 def test_static_out(self):
1377 """ 1:1 NAT initialized from outside network """
1379 nat_ip = "10.0.0.20"
1380 self.tcp_port_out = 6303
1381 self.udp_port_out = 6304
1382 self.icmp_id_out = 6305
1385 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1386 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1387 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1389 sm = self.vapi.nat44_static_mapping_dump()
1390 self.assertEqual(len(sm), 1)
1391 self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1394 pkts = self.create_stream_out(self.pg1, nat_ip)
1395 self.pg1.add_stream(pkts)
1396 self.pg_enable_capture(self.pg_interfaces)
1398 capture = self.pg0.get_capture(len(pkts))
1399 self.verify_capture_in(capture, self.pg0)
1402 pkts = self.create_stream_in(self.pg0, self.pg1)
1403 self.pg0.add_stream(pkts)
1404 self.pg_enable_capture(self.pg_interfaces)
1406 capture = self.pg1.get_capture(len(pkts))
1407 self.verify_capture_out(capture, nat_ip, True)
1409 def test_static_with_port_in(self):
1410 """ 1:1 NAPT initialized from inside network """
1412 self.tcp_port_out = 3606
1413 self.udp_port_out = 3607
1414 self.icmp_id_out = 3608
1416 self.nat44_add_address(self.nat_addr)
1417 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1418 self.tcp_port_in, self.tcp_port_out,
1419 proto=IP_PROTOS.tcp)
1420 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1421 self.udp_port_in, self.udp_port_out,
1422 proto=IP_PROTOS.udp)
1423 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1424 self.icmp_id_in, self.icmp_id_out,
1425 proto=IP_PROTOS.icmp)
1426 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1427 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1431 pkts = self.create_stream_in(self.pg0, self.pg1)
1432 self.pg0.add_stream(pkts)
1433 self.pg_enable_capture(self.pg_interfaces)
1435 capture = self.pg1.get_capture(len(pkts))
1436 self.verify_capture_out(capture)
1439 pkts = self.create_stream_out(self.pg1)
1440 self.pg1.add_stream(pkts)
1441 self.pg_enable_capture(self.pg_interfaces)
1443 capture = self.pg0.get_capture(len(pkts))
1444 self.verify_capture_in(capture, self.pg0)
1446 def test_static_with_port_out(self):
1447 """ 1:1 NAPT initialized from outside network """
1449 self.tcp_port_out = 30606
1450 self.udp_port_out = 30607
1451 self.icmp_id_out = 30608
1453 self.nat44_add_address(self.nat_addr)
1454 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1455 self.tcp_port_in, self.tcp_port_out,
1456 proto=IP_PROTOS.tcp)
1457 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1458 self.udp_port_in, self.udp_port_out,
1459 proto=IP_PROTOS.udp)
1460 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1461 self.icmp_id_in, self.icmp_id_out,
1462 proto=IP_PROTOS.icmp)
1463 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1464 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1468 pkts = self.create_stream_out(self.pg1)
1469 self.pg1.add_stream(pkts)
1470 self.pg_enable_capture(self.pg_interfaces)
1472 capture = self.pg0.get_capture(len(pkts))
1473 self.verify_capture_in(capture, self.pg0)
1476 pkts = self.create_stream_in(self.pg0, self.pg1)
1477 self.pg0.add_stream(pkts)
1478 self.pg_enable_capture(self.pg_interfaces)
1480 capture = self.pg1.get_capture(len(pkts))
1481 self.verify_capture_out(capture)
1483 def test_static_vrf_aware(self):
1484 """ 1:1 NAT VRF awareness """
1486 nat_ip1 = "10.0.0.30"
1487 nat_ip2 = "10.0.0.40"
1488 self.tcp_port_out = 6303
1489 self.udp_port_out = 6304
1490 self.icmp_id_out = 6305
1492 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1494 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1496 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1498 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1499 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1501 # inside interface VRF match NAT44 static mapping VRF
1502 pkts = self.create_stream_in(self.pg4, self.pg3)
1503 self.pg4.add_stream(pkts)
1504 self.pg_enable_capture(self.pg_interfaces)
1506 capture = self.pg3.get_capture(len(pkts))
1507 self.verify_capture_out(capture, nat_ip1, True)
1509 # inside interface VRF don't match NAT44 static mapping VRF (packets
1511 pkts = self.create_stream_in(self.pg0, self.pg3)
1512 self.pg0.add_stream(pkts)
1513 self.pg_enable_capture(self.pg_interfaces)
1515 self.pg3.assert_nothing_captured()
1517 def test_dynamic_to_static(self):
1518 """ Switch from dynamic translation to 1:1NAT """
1519 nat_ip = "10.0.0.10"
1520 self.tcp_port_out = 6303
1521 self.udp_port_out = 6304
1522 self.icmp_id_out = 6305
1524 self.nat44_add_address(self.nat_addr)
1525 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1526 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1530 pkts = self.create_stream_in(self.pg0, self.pg1)
1531 self.pg0.add_stream(pkts)
1532 self.pg_enable_capture(self.pg_interfaces)
1534 capture = self.pg1.get_capture(len(pkts))
1535 self.verify_capture_out(capture)
1538 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1539 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
1540 self.assertEqual(len(sessions), 0)
1541 pkts = self.create_stream_in(self.pg0, self.pg1)
1542 self.pg0.add_stream(pkts)
1543 self.pg_enable_capture(self.pg_interfaces)
1545 capture = self.pg1.get_capture(len(pkts))
1546 self.verify_capture_out(capture, nat_ip, True)
1548 def test_identity_nat(self):
1549 """ Identity NAT """
1551 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1552 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1553 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1556 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1557 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1558 TCP(sport=12345, dport=56789))
1559 self.pg1.add_stream(p)
1560 self.pg_enable_capture(self.pg_interfaces)
1562 capture = self.pg0.get_capture(1)
1567 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1568 self.assertEqual(ip.src, self.pg1.remote_ip4)
1569 self.assertEqual(tcp.dport, 56789)
1570 self.assertEqual(tcp.sport, 12345)
1571 self.assert_packet_checksums_valid(p)
1573 self.logger.error(ppp("Unexpected or invalid packet:", p))
1576 def test_multiple_inside_interfaces(self):
1577 """ NAT44 multiple non-overlapping address space inside interfaces """
1579 self.nat44_add_address(self.nat_addr)
1580 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1581 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1582 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1585 # between two NAT44 inside interfaces (no translation)
1586 pkts = self.create_stream_in(self.pg0, self.pg1)
1587 self.pg0.add_stream(pkts)
1588 self.pg_enable_capture(self.pg_interfaces)
1590 capture = self.pg1.get_capture(len(pkts))
1591 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1593 # from NAT44 inside to interface without NAT44 feature (no translation)
1594 pkts = self.create_stream_in(self.pg0, self.pg2)
1595 self.pg0.add_stream(pkts)
1596 self.pg_enable_capture(self.pg_interfaces)
1598 capture = self.pg2.get_capture(len(pkts))
1599 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1601 # in2out 1st interface
1602 pkts = self.create_stream_in(self.pg0, self.pg3)
1603 self.pg0.add_stream(pkts)
1604 self.pg_enable_capture(self.pg_interfaces)
1606 capture = self.pg3.get_capture(len(pkts))
1607 self.verify_capture_out(capture)
1609 # out2in 1st interface
1610 pkts = self.create_stream_out(self.pg3)
1611 self.pg3.add_stream(pkts)
1612 self.pg_enable_capture(self.pg_interfaces)
1614 capture = self.pg0.get_capture(len(pkts))
1615 self.verify_capture_in(capture, self.pg0)
1617 # in2out 2nd interface
1618 pkts = self.create_stream_in(self.pg1, self.pg3)
1619 self.pg1.add_stream(pkts)
1620 self.pg_enable_capture(self.pg_interfaces)
1622 capture = self.pg3.get_capture(len(pkts))
1623 self.verify_capture_out(capture)
1625 # out2in 2nd interface
1626 pkts = self.create_stream_out(self.pg3)
1627 self.pg3.add_stream(pkts)
1628 self.pg_enable_capture(self.pg_interfaces)
1630 capture = self.pg1.get_capture(len(pkts))
1631 self.verify_capture_in(capture, self.pg1)
1633 def test_inside_overlapping_interfaces(self):
1634 """ NAT44 multiple inside interfaces with overlapping address space """
1636 static_nat_ip = "10.0.0.10"
1637 self.nat44_add_address(self.nat_addr)
1638 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1640 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1641 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1642 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1643 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1646 # between NAT44 inside interfaces with same VRF (no translation)
1647 pkts = self.create_stream_in(self.pg4, self.pg5)
1648 self.pg4.add_stream(pkts)
1649 self.pg_enable_capture(self.pg_interfaces)
1651 capture = self.pg5.get_capture(len(pkts))
1652 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1654 # between NAT44 inside interfaces with different VRF (hairpinning)
1655 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1656 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1657 TCP(sport=1234, dport=5678))
1658 self.pg4.add_stream(p)
1659 self.pg_enable_capture(self.pg_interfaces)
1661 capture = self.pg6.get_capture(1)
1666 self.assertEqual(ip.src, self.nat_addr)
1667 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1668 self.assertNotEqual(tcp.sport, 1234)
1669 self.assertEqual(tcp.dport, 5678)
1671 self.logger.error(ppp("Unexpected or invalid packet:", p))
1674 # in2out 1st interface
1675 pkts = self.create_stream_in(self.pg4, self.pg3)
1676 self.pg4.add_stream(pkts)
1677 self.pg_enable_capture(self.pg_interfaces)
1679 capture = self.pg3.get_capture(len(pkts))
1680 self.verify_capture_out(capture)
1682 # out2in 1st interface
1683 pkts = self.create_stream_out(self.pg3)
1684 self.pg3.add_stream(pkts)
1685 self.pg_enable_capture(self.pg_interfaces)
1687 capture = self.pg4.get_capture(len(pkts))
1688 self.verify_capture_in(capture, self.pg4)
1690 # in2out 2nd interface
1691 pkts = self.create_stream_in(self.pg5, self.pg3)
1692 self.pg5.add_stream(pkts)
1693 self.pg_enable_capture(self.pg_interfaces)
1695 capture = self.pg3.get_capture(len(pkts))
1696 self.verify_capture_out(capture)
1698 # out2in 2nd interface
1699 pkts = self.create_stream_out(self.pg3)
1700 self.pg3.add_stream(pkts)
1701 self.pg_enable_capture(self.pg_interfaces)
1703 capture = self.pg5.get_capture(len(pkts))
1704 self.verify_capture_in(capture, self.pg5)
1707 addresses = self.vapi.nat44_address_dump()
1708 self.assertEqual(len(addresses), 1)
1709 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1710 self.assertEqual(len(sessions), 3)
1711 for session in sessions:
1712 self.assertFalse(session.is_static)
1713 self.assertEqual(session.inside_ip_address[0:4],
1714 self.pg5.remote_ip4n)
1715 self.assertEqual(session.outside_ip_address,
1716 addresses[0].ip_address)
1717 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1718 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1719 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1720 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1721 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1722 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1723 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1724 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1725 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1727 # in2out 3rd interface
1728 pkts = self.create_stream_in(self.pg6, self.pg3)
1729 self.pg6.add_stream(pkts)
1730 self.pg_enable_capture(self.pg_interfaces)
1732 capture = self.pg3.get_capture(len(pkts))
1733 self.verify_capture_out(capture, static_nat_ip, True)
1735 # out2in 3rd interface
1736 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1737 self.pg3.add_stream(pkts)
1738 self.pg_enable_capture(self.pg_interfaces)
1740 capture = self.pg6.get_capture(len(pkts))
1741 self.verify_capture_in(capture, self.pg6)
1743 # general user and session dump verifications
1744 users = self.vapi.nat44_user_dump()
1745 self.assertTrue(len(users) >= 3)
1746 addresses = self.vapi.nat44_address_dump()
1747 self.assertEqual(len(addresses), 1)
1749 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1751 for session in sessions:
1752 self.assertEqual(user.ip_address, session.inside_ip_address)
1753 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1754 self.assertTrue(session.protocol in
1755 [IP_PROTOS.tcp, IP_PROTOS.udp,
1757 self.assertFalse(session.ext_host_valid)
1760 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1761 self.assertTrue(len(sessions) >= 4)
1762 for session in sessions:
1763 self.assertFalse(session.is_static)
1764 self.assertEqual(session.inside_ip_address[0:4],
1765 self.pg4.remote_ip4n)
1766 self.assertEqual(session.outside_ip_address,
1767 addresses[0].ip_address)
1770 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1771 self.assertTrue(len(sessions) >= 3)
1772 for session in sessions:
1773 self.assertTrue(session.is_static)
1774 self.assertEqual(session.inside_ip_address[0:4],
1775 self.pg6.remote_ip4n)
1776 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1777 map(int, static_nat_ip.split('.')))
1778 self.assertTrue(session.inside_port in
1779 [self.tcp_port_in, self.udp_port_in,
1782 def test_hairpinning(self):
1783 """ NAT44 hairpinning - 1:1 NAPT """
1785 host = self.pg0.remote_hosts[0]
1786 server = self.pg0.remote_hosts[1]
1789 server_in_port = 5678
1790 server_out_port = 8765
1792 self.nat44_add_address(self.nat_addr)
1793 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1794 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1796 # add static mapping for server
1797 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1798 server_in_port, server_out_port,
1799 proto=IP_PROTOS.tcp)
1801 # send packet from host to server
1802 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1803 IP(src=host.ip4, dst=self.nat_addr) /
1804 TCP(sport=host_in_port, dport=server_out_port))
1805 self.pg0.add_stream(p)
1806 self.pg_enable_capture(self.pg_interfaces)
1808 capture = self.pg0.get_capture(1)
1813 self.assertEqual(ip.src, self.nat_addr)
1814 self.assertEqual(ip.dst, server.ip4)
1815 self.assertNotEqual(tcp.sport, host_in_port)
1816 self.assertEqual(tcp.dport, server_in_port)
1817 self.assert_packet_checksums_valid(p)
1818 host_out_port = tcp.sport
1820 self.logger.error(ppp("Unexpected or invalid packet:", p))
1823 # send reply from server to host
1824 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1825 IP(src=server.ip4, dst=self.nat_addr) /
1826 TCP(sport=server_in_port, dport=host_out_port))
1827 self.pg0.add_stream(p)
1828 self.pg_enable_capture(self.pg_interfaces)
1830 capture = self.pg0.get_capture(1)
1835 self.assertEqual(ip.src, self.nat_addr)
1836 self.assertEqual(ip.dst, host.ip4)
1837 self.assertEqual(tcp.sport, server_out_port)
1838 self.assertEqual(tcp.dport, host_in_port)
1839 self.assert_packet_checksums_valid(p)
1841 self.logger.error(ppp("Unexpected or invalid packet:", p))
1844 def test_hairpinning2(self):
1845 """ NAT44 hairpinning - 1:1 NAT"""
1847 server1_nat_ip = "10.0.0.10"
1848 server2_nat_ip = "10.0.0.11"
1849 host = self.pg0.remote_hosts[0]
1850 server1 = self.pg0.remote_hosts[1]
1851 server2 = self.pg0.remote_hosts[2]
1852 server_tcp_port = 22
1853 server_udp_port = 20
1855 self.nat44_add_address(self.nat_addr)
1856 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1857 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1860 # add static mapping for servers
1861 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1862 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1866 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1867 IP(src=host.ip4, dst=server1_nat_ip) /
1868 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1870 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1871 IP(src=host.ip4, dst=server1_nat_ip) /
1872 UDP(sport=self.udp_port_in, dport=server_udp_port))
1874 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1875 IP(src=host.ip4, dst=server1_nat_ip) /
1876 ICMP(id=self.icmp_id_in, type='echo-request'))
1878 self.pg0.add_stream(pkts)
1879 self.pg_enable_capture(self.pg_interfaces)
1881 capture = self.pg0.get_capture(len(pkts))
1882 for packet in capture:
1884 self.assertEqual(packet[IP].src, self.nat_addr)
1885 self.assertEqual(packet[IP].dst, server1.ip4)
1886 if packet.haslayer(TCP):
1887 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1888 self.assertEqual(packet[TCP].dport, server_tcp_port)
1889 self.tcp_port_out = packet[TCP].sport
1890 self.assert_packet_checksums_valid(packet)
1891 elif packet.haslayer(UDP):
1892 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1893 self.assertEqual(packet[UDP].dport, server_udp_port)
1894 self.udp_port_out = packet[UDP].sport
1896 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1897 self.icmp_id_out = packet[ICMP].id
1899 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1904 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1905 IP(src=server1.ip4, dst=self.nat_addr) /
1906 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1908 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1909 IP(src=server1.ip4, dst=self.nat_addr) /
1910 UDP(sport=server_udp_port, dport=self.udp_port_out))
1912 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1913 IP(src=server1.ip4, dst=self.nat_addr) /
1914 ICMP(id=self.icmp_id_out, type='echo-reply'))
1916 self.pg0.add_stream(pkts)
1917 self.pg_enable_capture(self.pg_interfaces)
1919 capture = self.pg0.get_capture(len(pkts))
1920 for packet in capture:
1922 self.assertEqual(packet[IP].src, server1_nat_ip)
1923 self.assertEqual(packet[IP].dst, host.ip4)
1924 if packet.haslayer(TCP):
1925 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1926 self.assertEqual(packet[TCP].sport, server_tcp_port)
1927 self.assert_packet_checksums_valid(packet)
1928 elif packet.haslayer(UDP):
1929 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1930 self.assertEqual(packet[UDP].sport, server_udp_port)
1932 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1934 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1937 # server2 to server1
1939 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1940 IP(src=server2.ip4, dst=server1_nat_ip) /
1941 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1943 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1944 IP(src=server2.ip4, dst=server1_nat_ip) /
1945 UDP(sport=self.udp_port_in, dport=server_udp_port))
1947 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1948 IP(src=server2.ip4, dst=server1_nat_ip) /
1949 ICMP(id=self.icmp_id_in, type='echo-request'))
1951 self.pg0.add_stream(pkts)
1952 self.pg_enable_capture(self.pg_interfaces)
1954 capture = self.pg0.get_capture(len(pkts))
1955 for packet in capture:
1957 self.assertEqual(packet[IP].src, server2_nat_ip)
1958 self.assertEqual(packet[IP].dst, server1.ip4)
1959 if packet.haslayer(TCP):
1960 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1961 self.assertEqual(packet[TCP].dport, server_tcp_port)
1962 self.tcp_port_out = packet[TCP].sport
1963 self.assert_packet_checksums_valid(packet)
1964 elif packet.haslayer(UDP):
1965 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1966 self.assertEqual(packet[UDP].dport, server_udp_port)
1967 self.udp_port_out = packet[UDP].sport
1969 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1970 self.icmp_id_out = packet[ICMP].id
1972 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1975 # server1 to server2
1977 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1978 IP(src=server1.ip4, dst=server2_nat_ip) /
1979 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1981 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1982 IP(src=server1.ip4, dst=server2_nat_ip) /
1983 UDP(sport=server_udp_port, dport=self.udp_port_out))
1985 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1986 IP(src=server1.ip4, dst=server2_nat_ip) /
1987 ICMP(id=self.icmp_id_out, type='echo-reply'))
1989 self.pg0.add_stream(pkts)
1990 self.pg_enable_capture(self.pg_interfaces)
1992 capture = self.pg0.get_capture(len(pkts))
1993 for packet in capture:
1995 self.assertEqual(packet[IP].src, server1_nat_ip)
1996 self.assertEqual(packet[IP].dst, server2.ip4)
1997 if packet.haslayer(TCP):
1998 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1999 self.assertEqual(packet[TCP].sport, server_tcp_port)
2000 self.assert_packet_checksums_valid(packet)
2001 elif packet.haslayer(UDP):
2002 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2003 self.assertEqual(packet[UDP].sport, server_udp_port)
2005 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2007 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2010 def test_max_translations_per_user(self):
2011 """ MAX translations per user - recycle the least recently used """
2013 self.nat44_add_address(self.nat_addr)
2014 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2015 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2018 # get maximum number of translations per user
2019 nat44_config = self.vapi.nat_show_config()
2021 # send more than maximum number of translations per user packets
2022 pkts_num = nat44_config.max_translations_per_user + 5
2024 for port in range(0, pkts_num):
2025 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2026 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2027 TCP(sport=1025 + port))
2029 self.pg0.add_stream(pkts)
2030 self.pg_enable_capture(self.pg_interfaces)
2033 # verify number of translated packet
2034 self.pg1.get_capture(pkts_num)
2036 users = self.vapi.nat44_user_dump()
2038 if user.ip_address == self.pg0.remote_ip4n:
2039 self.assertEqual(user.nsessions,
2040 nat44_config.max_translations_per_user)
2041 self.assertEqual(user.nstaticsessions, 0)
2044 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
2046 proto=IP_PROTOS.tcp)
2047 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2048 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2049 TCP(sport=tcp_port))
2050 self.pg0.add_stream(p)
2051 self.pg_enable_capture(self.pg_interfaces)
2053 self.pg1.get_capture(1)
2054 users = self.vapi.nat44_user_dump()
2056 if user.ip_address == self.pg0.remote_ip4n:
2057 self.assertEqual(user.nsessions,
2058 nat44_config.max_translations_per_user - 1)
2059 self.assertEqual(user.nstaticsessions, 1)
2061 def test_interface_addr(self):
2062 """ Acquire NAT44 addresses from interface """
2063 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2065 # no address in NAT pool
2066 adresses = self.vapi.nat44_address_dump()
2067 self.assertEqual(0, len(adresses))
2069 # configure interface address and check NAT address pool
2070 self.pg7.config_ip4()
2071 adresses = self.vapi.nat44_address_dump()
2072 self.assertEqual(1, len(adresses))
2073 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2075 # remove interface address and check NAT address pool
2076 self.pg7.unconfig_ip4()
2077 adresses = self.vapi.nat44_address_dump()
2078 self.assertEqual(0, len(adresses))
2080 def test_interface_addr_static_mapping(self):
2081 """ Static mapping with addresses from interface """
2084 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2085 self.nat44_add_static_mapping(
2087 external_sw_if_index=self.pg7.sw_if_index,
2090 # static mappings with external interface
2091 static_mappings = self.vapi.nat44_static_mapping_dump()
2092 self.assertEqual(1, len(static_mappings))
2093 self.assertEqual(self.pg7.sw_if_index,
2094 static_mappings[0].external_sw_if_index)
2095 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2097 # configure interface address and check static mappings
2098 self.pg7.config_ip4()
2099 static_mappings = self.vapi.nat44_static_mapping_dump()
2100 self.assertEqual(2, len(static_mappings))
2102 for sm in static_mappings:
2103 if sm.external_sw_if_index == 0xFFFFFFFF:
2104 self.assertEqual(sm.external_ip_address[0:4],
2105 self.pg7.local_ip4n)
2106 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2108 self.assertTrue(resolved)
2110 # remove interface address and check static mappings
2111 self.pg7.unconfig_ip4()
2112 static_mappings = self.vapi.nat44_static_mapping_dump()
2113 self.assertEqual(1, len(static_mappings))
2114 self.assertEqual(self.pg7.sw_if_index,
2115 static_mappings[0].external_sw_if_index)
2116 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2118 # configure interface address again and check static mappings
2119 self.pg7.config_ip4()
2120 static_mappings = self.vapi.nat44_static_mapping_dump()
2121 self.assertEqual(2, len(static_mappings))
2123 for sm in static_mappings:
2124 if sm.external_sw_if_index == 0xFFFFFFFF:
2125 self.assertEqual(sm.external_ip_address[0:4],
2126 self.pg7.local_ip4n)
2127 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2129 self.assertTrue(resolved)
2131 # remove static mapping
2132 self.nat44_add_static_mapping(
2134 external_sw_if_index=self.pg7.sw_if_index,
2137 static_mappings = self.vapi.nat44_static_mapping_dump()
2138 self.assertEqual(0, len(static_mappings))
2140 def test_interface_addr_identity_nat(self):
2141 """ Identity NAT with addresses from interface """
2144 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2145 self.vapi.nat44_add_del_identity_mapping(
2146 sw_if_index=self.pg7.sw_if_index,
2148 protocol=IP_PROTOS.tcp,
2151 # identity mappings with external interface
2152 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2153 self.assertEqual(1, len(identity_mappings))
2154 self.assertEqual(self.pg7.sw_if_index,
2155 identity_mappings[0].sw_if_index)
2157 # configure interface address and check identity mappings
2158 self.pg7.config_ip4()
2159 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2161 self.assertEqual(2, len(identity_mappings))
2162 for sm in identity_mappings:
2163 if sm.sw_if_index == 0xFFFFFFFF:
2164 self.assertEqual(identity_mappings[0].ip_address,
2165 self.pg7.local_ip4n)
2166 self.assertEqual(port, identity_mappings[0].port)
2167 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2169 self.assertTrue(resolved)
2171 # remove interface address and check identity mappings
2172 self.pg7.unconfig_ip4()
2173 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2174 self.assertEqual(1, len(identity_mappings))
2175 self.assertEqual(self.pg7.sw_if_index,
2176 identity_mappings[0].sw_if_index)
2178 def test_ipfix_nat44_sess(self):
2179 """ IPFIX logging NAT44 session created/delted """
2180 self.ipfix_domain_id = 10
2181 self.ipfix_src_port = 20202
2182 colector_port = 30303
2183 bind_layers(UDP, IPFIX, dport=30303)
2184 self.nat44_add_address(self.nat_addr)
2185 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2186 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2188 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2189 src_address=self.pg3.local_ip4n,
2191 template_interval=10,
2192 collector_port=colector_port)
2193 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2194 src_port=self.ipfix_src_port)
2196 pkts = self.create_stream_in(self.pg0, self.pg1)
2197 self.pg0.add_stream(pkts)
2198 self.pg_enable_capture(self.pg_interfaces)
2200 capture = self.pg1.get_capture(len(pkts))
2201 self.verify_capture_out(capture)
2202 self.nat44_add_address(self.nat_addr, is_add=0)
2203 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2204 capture = self.pg3.get_capture(9)
2205 ipfix = IPFIXDecoder()
2206 # first load template
2208 self.assertTrue(p.haslayer(IPFIX))
2209 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2210 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2211 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2212 self.assertEqual(p[UDP].dport, colector_port)
2213 self.assertEqual(p[IPFIX].observationDomainID,
2214 self.ipfix_domain_id)
2215 if p.haslayer(Template):
2216 ipfix.add_template(p.getlayer(Template))
2217 # verify events in data set
2219 if p.haslayer(Data):
2220 data = ipfix.decode_data_set(p.getlayer(Set))
2221 self.verify_ipfix_nat44_ses(data)
2223 def test_ipfix_addr_exhausted(self):
2224 """ IPFIX logging NAT addresses exhausted """
2225 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2226 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2228 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2229 src_address=self.pg3.local_ip4n,
2231 template_interval=10)
2232 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2233 src_port=self.ipfix_src_port)
2235 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2236 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2238 self.pg0.add_stream(p)
2239 self.pg_enable_capture(self.pg_interfaces)
2241 self.pg1.assert_nothing_captured()
2243 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2244 capture = self.pg3.get_capture(9)
2245 ipfix = IPFIXDecoder()
2246 # first load template
2248 self.assertTrue(p.haslayer(IPFIX))
2249 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2250 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2251 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2252 self.assertEqual(p[UDP].dport, 4739)
2253 self.assertEqual(p[IPFIX].observationDomainID,
2254 self.ipfix_domain_id)
2255 if p.haslayer(Template):
2256 ipfix.add_template(p.getlayer(Template))
2257 # verify events in data set
2259 if p.haslayer(Data):
2260 data = ipfix.decode_data_set(p.getlayer(Set))
2261 self.verify_ipfix_addr_exhausted(data)
2263 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2264 def test_ipfix_max_sessions(self):
2265 """ IPFIX logging maximum session entries exceeded """
2266 self.nat44_add_address(self.nat_addr)
2267 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2268 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2271 nat44_config = self.vapi.nat_show_config()
2272 max_sessions = 10 * nat44_config.translation_buckets
2275 for i in range(0, max_sessions):
2276 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2277 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2278 IP(src=src, dst=self.pg1.remote_ip4) /
2281 self.pg0.add_stream(pkts)
2282 self.pg_enable_capture(self.pg_interfaces)
2285 self.pg1.get_capture(max_sessions)
2286 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2287 src_address=self.pg3.local_ip4n,
2289 template_interval=10)
2290 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2291 src_port=self.ipfix_src_port)
2293 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2294 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2296 self.pg0.add_stream(p)
2297 self.pg_enable_capture(self.pg_interfaces)
2299 self.pg1.assert_nothing_captured()
2301 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2302 capture = self.pg3.get_capture(9)
2303 ipfix = IPFIXDecoder()
2304 # first load template
2306 self.assertTrue(p.haslayer(IPFIX))
2307 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2308 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2309 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2310 self.assertEqual(p[UDP].dport, 4739)
2311 self.assertEqual(p[IPFIX].observationDomainID,
2312 self.ipfix_domain_id)
2313 if p.haslayer(Template):
2314 ipfix.add_template(p.getlayer(Template))
2315 # verify events in data set
2317 if p.haslayer(Data):
2318 data = ipfix.decode_data_set(p.getlayer(Set))
2319 self.verify_ipfix_max_sessions(data, max_sessions)
2321 def test_pool_addr_fib(self):
2322 """ NAT44 add pool addresses to FIB """
2323 static_addr = '10.0.0.10'
2324 self.nat44_add_address(self.nat_addr)
2325 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2326 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2328 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2331 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2332 ARP(op=ARP.who_has, pdst=self.nat_addr,
2333 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2334 self.pg1.add_stream(p)
2335 self.pg_enable_capture(self.pg_interfaces)
2337 capture = self.pg1.get_capture(1)
2338 self.assertTrue(capture[0].haslayer(ARP))
2339 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2342 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2343 ARP(op=ARP.who_has, pdst=static_addr,
2344 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2345 self.pg1.add_stream(p)
2346 self.pg_enable_capture(self.pg_interfaces)
2348 capture = self.pg1.get_capture(1)
2349 self.assertTrue(capture[0].haslayer(ARP))
2350 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2352 # send ARP to non-NAT44 interface
2353 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2354 ARP(op=ARP.who_has, pdst=self.nat_addr,
2355 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2356 self.pg2.add_stream(p)
2357 self.pg_enable_capture(self.pg_interfaces)
2359 self.pg1.assert_nothing_captured()
2361 # remove addresses and verify
2362 self.nat44_add_address(self.nat_addr, is_add=0)
2363 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2366 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2367 ARP(op=ARP.who_has, pdst=self.nat_addr,
2368 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2369 self.pg1.add_stream(p)
2370 self.pg_enable_capture(self.pg_interfaces)
2372 self.pg1.assert_nothing_captured()
2374 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2375 ARP(op=ARP.who_has, pdst=static_addr,
2376 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2377 self.pg1.add_stream(p)
2378 self.pg_enable_capture(self.pg_interfaces)
2380 self.pg1.assert_nothing_captured()
2382 def test_vrf_mode(self):
2383 """ NAT44 tenant VRF aware address pool mode """
2387 nat_ip1 = "10.0.0.10"
2388 nat_ip2 = "10.0.0.11"
2390 self.pg0.unconfig_ip4()
2391 self.pg1.unconfig_ip4()
2392 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2393 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2394 self.pg0.set_table_ip4(vrf_id1)
2395 self.pg1.set_table_ip4(vrf_id2)
2396 self.pg0.config_ip4()
2397 self.pg1.config_ip4()
2398 self.pg0.resolve_arp()
2399 self.pg1.resolve_arp()
2401 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2402 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2403 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2404 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2405 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2410 pkts = self.create_stream_in(self.pg0, self.pg2)
2411 self.pg0.add_stream(pkts)
2412 self.pg_enable_capture(self.pg_interfaces)
2414 capture = self.pg2.get_capture(len(pkts))
2415 self.verify_capture_out(capture, nat_ip1)
2418 pkts = self.create_stream_in(self.pg1, self.pg2)
2419 self.pg1.add_stream(pkts)
2420 self.pg_enable_capture(self.pg_interfaces)
2422 capture = self.pg2.get_capture(len(pkts))
2423 self.verify_capture_out(capture, nat_ip2)
2426 self.pg0.unconfig_ip4()
2427 self.pg1.unconfig_ip4()
2428 self.pg0.set_table_ip4(0)
2429 self.pg1.set_table_ip4(0)
2430 self.pg0.config_ip4()
2431 self.pg1.config_ip4()
2432 self.pg0.resolve_arp()
2433 self.pg1.resolve_arp()
2434 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2435 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2437 def test_vrf_feature_independent(self):
2438 """ NAT44 tenant VRF independent address pool mode """
2440 nat_ip1 = "10.0.0.10"
2441 nat_ip2 = "10.0.0.11"
2443 self.nat44_add_address(nat_ip1)
2444 self.nat44_add_address(nat_ip2, vrf_id=99)
2445 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2446 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2447 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2451 pkts = self.create_stream_in(self.pg0, self.pg2)
2452 self.pg0.add_stream(pkts)
2453 self.pg_enable_capture(self.pg_interfaces)
2455 capture = self.pg2.get_capture(len(pkts))
2456 self.verify_capture_out(capture, nat_ip1)
2459 pkts = self.create_stream_in(self.pg1, self.pg2)
2460 self.pg1.add_stream(pkts)
2461 self.pg_enable_capture(self.pg_interfaces)
2463 capture = self.pg2.get_capture(len(pkts))
2464 self.verify_capture_out(capture, nat_ip1)
2466 def test_dynamic_ipless_interfaces(self):
2467 """ NAT44 interfaces without configured IP address """
2469 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2470 mactobinary(self.pg7.remote_mac),
2471 self.pg7.remote_ip4n,
2473 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2474 mactobinary(self.pg8.remote_mac),
2475 self.pg8.remote_ip4n,
2478 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2479 dst_address_length=32,
2480 next_hop_address=self.pg7.remote_ip4n,
2481 next_hop_sw_if_index=self.pg7.sw_if_index)
2482 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2483 dst_address_length=32,
2484 next_hop_address=self.pg8.remote_ip4n,
2485 next_hop_sw_if_index=self.pg8.sw_if_index)
2487 self.nat44_add_address(self.nat_addr)
2488 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2489 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2493 pkts = self.create_stream_in(self.pg7, self.pg8)
2494 self.pg7.add_stream(pkts)
2495 self.pg_enable_capture(self.pg_interfaces)
2497 capture = self.pg8.get_capture(len(pkts))
2498 self.verify_capture_out(capture)
2501 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2502 self.pg8.add_stream(pkts)
2503 self.pg_enable_capture(self.pg_interfaces)
2505 capture = self.pg7.get_capture(len(pkts))
2506 self.verify_capture_in(capture, self.pg7)
2508 def test_static_ipless_interfaces(self):
2509 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2511 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2512 mactobinary(self.pg7.remote_mac),
2513 self.pg7.remote_ip4n,
2515 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2516 mactobinary(self.pg8.remote_mac),
2517 self.pg8.remote_ip4n,
2520 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2521 dst_address_length=32,
2522 next_hop_address=self.pg7.remote_ip4n,
2523 next_hop_sw_if_index=self.pg7.sw_if_index)
2524 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2525 dst_address_length=32,
2526 next_hop_address=self.pg8.remote_ip4n,
2527 next_hop_sw_if_index=self.pg8.sw_if_index)
2529 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2530 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2531 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2535 pkts = self.create_stream_out(self.pg8)
2536 self.pg8.add_stream(pkts)
2537 self.pg_enable_capture(self.pg_interfaces)
2539 capture = self.pg7.get_capture(len(pkts))
2540 self.verify_capture_in(capture, self.pg7)
2543 pkts = self.create_stream_in(self.pg7, self.pg8)
2544 self.pg7.add_stream(pkts)
2545 self.pg_enable_capture(self.pg_interfaces)
2547 capture = self.pg8.get_capture(len(pkts))
2548 self.verify_capture_out(capture, self.nat_addr, True)
2550 def test_static_with_port_ipless_interfaces(self):
2551 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2553 self.tcp_port_out = 30606
2554 self.udp_port_out = 30607
2555 self.icmp_id_out = 30608
2557 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2558 mactobinary(self.pg7.remote_mac),
2559 self.pg7.remote_ip4n,
2561 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2562 mactobinary(self.pg8.remote_mac),
2563 self.pg8.remote_ip4n,
2566 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2567 dst_address_length=32,
2568 next_hop_address=self.pg7.remote_ip4n,
2569 next_hop_sw_if_index=self.pg7.sw_if_index)
2570 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2571 dst_address_length=32,
2572 next_hop_address=self.pg8.remote_ip4n,
2573 next_hop_sw_if_index=self.pg8.sw_if_index)
2575 self.nat44_add_address(self.nat_addr)
2576 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2577 self.tcp_port_in, self.tcp_port_out,
2578 proto=IP_PROTOS.tcp)
2579 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2580 self.udp_port_in, self.udp_port_out,
2581 proto=IP_PROTOS.udp)
2582 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2583 self.icmp_id_in, self.icmp_id_out,
2584 proto=IP_PROTOS.icmp)
2585 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2586 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2590 pkts = self.create_stream_out(self.pg8)
2591 self.pg8.add_stream(pkts)
2592 self.pg_enable_capture(self.pg_interfaces)
2594 capture = self.pg7.get_capture(len(pkts))
2595 self.verify_capture_in(capture, self.pg7)
2598 pkts = self.create_stream_in(self.pg7, self.pg8)
2599 self.pg7.add_stream(pkts)
2600 self.pg_enable_capture(self.pg_interfaces)
2602 capture = self.pg8.get_capture(len(pkts))
2603 self.verify_capture_out(capture)
2605 def test_static_unknown_proto(self):
2606 """ 1:1 NAT translate packet with unknown protocol """
2607 nat_ip = "10.0.0.10"
2608 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2609 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2610 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2614 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2615 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2617 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2618 TCP(sport=1234, dport=1234))
2619 self.pg0.add_stream(p)
2620 self.pg_enable_capture(self.pg_interfaces)
2622 p = self.pg1.get_capture(1)
2625 self.assertEqual(packet[IP].src, nat_ip)
2626 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2627 self.assertTrue(packet.haslayer(GRE))
2628 self.assert_packet_checksums_valid(packet)
2630 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2634 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2635 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2637 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2638 TCP(sport=1234, dport=1234))
2639 self.pg1.add_stream(p)
2640 self.pg_enable_capture(self.pg_interfaces)
2642 p = self.pg0.get_capture(1)
2645 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2646 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2647 self.assertTrue(packet.haslayer(GRE))
2648 self.assert_packet_checksums_valid(packet)
2650 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2653 def test_hairpinning_static_unknown_proto(self):
2654 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2656 host = self.pg0.remote_hosts[0]
2657 server = self.pg0.remote_hosts[1]
2659 host_nat_ip = "10.0.0.10"
2660 server_nat_ip = "10.0.0.11"
2662 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2663 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2664 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2665 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2669 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2670 IP(src=host.ip4, dst=server_nat_ip) /
2672 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2673 TCP(sport=1234, dport=1234))
2674 self.pg0.add_stream(p)
2675 self.pg_enable_capture(self.pg_interfaces)
2677 p = self.pg0.get_capture(1)
2680 self.assertEqual(packet[IP].src, host_nat_ip)
2681 self.assertEqual(packet[IP].dst, server.ip4)
2682 self.assertTrue(packet.haslayer(GRE))
2683 self.assert_packet_checksums_valid(packet)
2685 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2689 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2690 IP(src=server.ip4, dst=host_nat_ip) /
2692 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2693 TCP(sport=1234, dport=1234))
2694 self.pg0.add_stream(p)
2695 self.pg_enable_capture(self.pg_interfaces)
2697 p = self.pg0.get_capture(1)
2700 self.assertEqual(packet[IP].src, server_nat_ip)
2701 self.assertEqual(packet[IP].dst, host.ip4)
2702 self.assertTrue(packet.haslayer(GRE))
2703 self.assert_packet_checksums_valid(packet)
2705 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2708 def test_output_feature(self):
2709 """ NAT44 interface output feature (in2out postrouting) """
2710 self.nat44_add_address(self.nat_addr)
2711 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2712 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2713 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2717 pkts = self.create_stream_in(self.pg0, self.pg3)
2718 self.pg0.add_stream(pkts)
2719 self.pg_enable_capture(self.pg_interfaces)
2721 capture = self.pg3.get_capture(len(pkts))
2722 self.verify_capture_out(capture)
2725 pkts = self.create_stream_out(self.pg3)
2726 self.pg3.add_stream(pkts)
2727 self.pg_enable_capture(self.pg_interfaces)
2729 capture = self.pg0.get_capture(len(pkts))
2730 self.verify_capture_in(capture, self.pg0)
2732 # from non-NAT interface to NAT inside interface
2733 pkts = self.create_stream_in(self.pg2, self.pg0)
2734 self.pg2.add_stream(pkts)
2735 self.pg_enable_capture(self.pg_interfaces)
2737 capture = self.pg0.get_capture(len(pkts))
2738 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2740 def test_output_feature_vrf_aware(self):
2741 """ NAT44 interface output feature VRF aware (in2out postrouting) """
2742 nat_ip_vrf10 = "10.0.0.10"
2743 nat_ip_vrf20 = "10.0.0.20"
2745 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2746 dst_address_length=32,
2747 next_hop_address=self.pg3.remote_ip4n,
2748 next_hop_sw_if_index=self.pg3.sw_if_index,
2750 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2751 dst_address_length=32,
2752 next_hop_address=self.pg3.remote_ip4n,
2753 next_hop_sw_if_index=self.pg3.sw_if_index,
2756 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2757 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2758 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2759 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2760 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2764 pkts = self.create_stream_in(self.pg4, self.pg3)
2765 self.pg4.add_stream(pkts)
2766 self.pg_enable_capture(self.pg_interfaces)
2768 capture = self.pg3.get_capture(len(pkts))
2769 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2772 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2773 self.pg3.add_stream(pkts)
2774 self.pg_enable_capture(self.pg_interfaces)
2776 capture = self.pg4.get_capture(len(pkts))
2777 self.verify_capture_in(capture, self.pg4)
2780 pkts = self.create_stream_in(self.pg6, self.pg3)
2781 self.pg6.add_stream(pkts)
2782 self.pg_enable_capture(self.pg_interfaces)
2784 capture = self.pg3.get_capture(len(pkts))
2785 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2788 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2789 self.pg3.add_stream(pkts)
2790 self.pg_enable_capture(self.pg_interfaces)
2792 capture = self.pg6.get_capture(len(pkts))
2793 self.verify_capture_in(capture, self.pg6)
2795 def test_output_feature_hairpinning(self):
2796 """ NAT44 interface output feature hairpinning (in2out postrouting) """
2797 host = self.pg0.remote_hosts[0]
2798 server = self.pg0.remote_hosts[1]
2801 server_in_port = 5678
2802 server_out_port = 8765
2804 self.nat44_add_address(self.nat_addr)
2805 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2806 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2809 # add static mapping for server
2810 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2811 server_in_port, server_out_port,
2812 proto=IP_PROTOS.tcp)
2814 # send packet from host to server
2815 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2816 IP(src=host.ip4, dst=self.nat_addr) /
2817 TCP(sport=host_in_port, dport=server_out_port))
2818 self.pg0.add_stream(p)
2819 self.pg_enable_capture(self.pg_interfaces)
2821 capture = self.pg0.get_capture(1)
2826 self.assertEqual(ip.src, self.nat_addr)
2827 self.assertEqual(ip.dst, server.ip4)
2828 self.assertNotEqual(tcp.sport, host_in_port)
2829 self.assertEqual(tcp.dport, server_in_port)
2830 self.assert_packet_checksums_valid(p)
2831 host_out_port = tcp.sport
2833 self.logger.error(ppp("Unexpected or invalid packet:", p))
2836 # send reply from server to host
2837 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2838 IP(src=server.ip4, dst=self.nat_addr) /
2839 TCP(sport=server_in_port, dport=host_out_port))
2840 self.pg0.add_stream(p)
2841 self.pg_enable_capture(self.pg_interfaces)
2843 capture = self.pg0.get_capture(1)
2848 self.assertEqual(ip.src, self.nat_addr)
2849 self.assertEqual(ip.dst, host.ip4)
2850 self.assertEqual(tcp.sport, server_out_port)
2851 self.assertEqual(tcp.dport, host_in_port)
2852 self.assert_packet_checksums_valid(p)
2854 self.logger.error(ppp("Unexpected or invalid packet:", p))
2857 def test_one_armed_nat44(self):
2858 """ One armed NAT44 """
2859 remote_host = self.pg9.remote_hosts[0]
2860 local_host = self.pg9.remote_hosts[1]
2863 self.nat44_add_address(self.nat_addr)
2864 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2865 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2869 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2870 IP(src=local_host.ip4, dst=remote_host.ip4) /
2871 TCP(sport=12345, dport=80))
2872 self.pg9.add_stream(p)
2873 self.pg_enable_capture(self.pg_interfaces)
2875 capture = self.pg9.get_capture(1)
2880 self.assertEqual(ip.src, self.nat_addr)
2881 self.assertEqual(ip.dst, remote_host.ip4)
2882 self.assertNotEqual(tcp.sport, 12345)
2883 external_port = tcp.sport
2884 self.assertEqual(tcp.dport, 80)
2885 self.assert_packet_checksums_valid(p)
2887 self.logger.error(ppp("Unexpected or invalid packet:", p))
2891 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2892 IP(src=remote_host.ip4, dst=self.nat_addr) /
2893 TCP(sport=80, dport=external_port))
2894 self.pg9.add_stream(p)
2895 self.pg_enable_capture(self.pg_interfaces)
2897 capture = self.pg9.get_capture(1)
2902 self.assertEqual(ip.src, remote_host.ip4)
2903 self.assertEqual(ip.dst, local_host.ip4)
2904 self.assertEqual(tcp.sport, 80)
2905 self.assertEqual(tcp.dport, 12345)
2906 self.assert_packet_checksums_valid(p)
2908 self.logger.error(ppp("Unexpected or invalid packet:", p))
2911 def test_del_session(self):
2912 """ Delete NAT44 session """
2913 self.nat44_add_address(self.nat_addr)
2914 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2915 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2918 pkts = self.create_stream_in(self.pg0, self.pg1)
2919 self.pg0.add_stream(pkts)
2920 self.pg_enable_capture(self.pg_interfaces)
2922 self.pg1.get_capture(len(pkts))
2924 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2925 nsessions = len(sessions)
2927 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2928 sessions[0].inside_port,
2929 sessions[0].protocol)
2930 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2931 sessions[1].outside_port,
2932 sessions[1].protocol,
2935 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2936 self.assertEqual(nsessions - len(sessions), 2)
2938 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2939 sessions[0].inside_port,
2940 sessions[0].protocol)
2942 self.verify_no_nat44_user()
2944 def test_set_get_reass(self):
2945 """ NAT44 set/get virtual fragmentation reassembly """
2946 reas_cfg1 = self.vapi.nat_get_reass()
2948 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
2949 max_reass=reas_cfg1.ip4_max_reass * 2,
2950 max_frag=reas_cfg1.ip4_max_frag * 2)
2952 reas_cfg2 = self.vapi.nat_get_reass()
2954 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
2955 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
2956 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
2958 self.vapi.nat_set_reass(drop_frag=1)
2959 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
2961 def test_frag_in_order(self):
2962 """ NAT44 translate fragments arriving in order """
2963 self.nat44_add_address(self.nat_addr)
2964 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2965 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2968 data = "A" * 4 + "B" * 16 + "C" * 3
2969 self.tcp_port_in = random.randint(1025, 65535)
2971 reass = self.vapi.nat_reass_dump()
2972 reass_n_start = len(reass)
2975 pkts = self.create_stream_frag(self.pg0,
2976 self.pg1.remote_ip4,
2980 self.pg0.add_stream(pkts)
2981 self.pg_enable_capture(self.pg_interfaces)
2983 frags = self.pg1.get_capture(len(pkts))
2984 p = self.reass_frags_and_verify(frags,
2986 self.pg1.remote_ip4)
2987 self.assertEqual(p[TCP].dport, 20)
2988 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2989 self.tcp_port_out = p[TCP].sport
2990 self.assertEqual(data, p[Raw].load)
2993 pkts = self.create_stream_frag(self.pg1,
2998 self.pg1.add_stream(pkts)
2999 self.pg_enable_capture(self.pg_interfaces)
3001 frags = self.pg0.get_capture(len(pkts))
3002 p = self.reass_frags_and_verify(frags,
3003 self.pg1.remote_ip4,
3004 self.pg0.remote_ip4)
3005 self.assertEqual(p[TCP].sport, 20)
3006 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3007 self.assertEqual(data, p[Raw].load)
3009 reass = self.vapi.nat_reass_dump()
3010 reass_n_end = len(reass)
3012 self.assertEqual(reass_n_end - reass_n_start, 2)
3014 def test_reass_hairpinning(self):
3015 """ NAT44 fragments hairpinning """
3016 server = self.pg0.remote_hosts[1]
3017 host_in_port = random.randint(1025, 65535)
3018 server_in_port = random.randint(1025, 65535)
3019 server_out_port = random.randint(1025, 65535)
3020 data = "A" * 4 + "B" * 16 + "C" * 3
3022 self.nat44_add_address(self.nat_addr)
3023 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3024 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3026 # add static mapping for server
3027 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3028 server_in_port, server_out_port,
3029 proto=IP_PROTOS.tcp)
3031 # send packet from host to server
3032 pkts = self.create_stream_frag(self.pg0,
3037 self.pg0.add_stream(pkts)
3038 self.pg_enable_capture(self.pg_interfaces)
3040 frags = self.pg0.get_capture(len(pkts))
3041 p = self.reass_frags_and_verify(frags,
3044 self.assertNotEqual(p[TCP].sport, host_in_port)
3045 self.assertEqual(p[TCP].dport, server_in_port)
3046 self.assertEqual(data, p[Raw].load)
3048 def test_frag_out_of_order(self):
3049 """ NAT44 translate fragments arriving out of order """
3050 self.nat44_add_address(self.nat_addr)
3051 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3052 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3055 data = "A" * 4 + "B" * 16 + "C" * 3
3056 random.randint(1025, 65535)
3059 pkts = self.create_stream_frag(self.pg0,
3060 self.pg1.remote_ip4,
3065 self.pg0.add_stream(pkts)
3066 self.pg_enable_capture(self.pg_interfaces)
3068 frags = self.pg1.get_capture(len(pkts))
3069 p = self.reass_frags_and_verify(frags,
3071 self.pg1.remote_ip4)
3072 self.assertEqual(p[TCP].dport, 20)
3073 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3074 self.tcp_port_out = p[TCP].sport
3075 self.assertEqual(data, p[Raw].load)
3078 pkts = self.create_stream_frag(self.pg1,
3084 self.pg1.add_stream(pkts)
3085 self.pg_enable_capture(self.pg_interfaces)
3087 frags = self.pg0.get_capture(len(pkts))
3088 p = self.reass_frags_and_verify(frags,
3089 self.pg1.remote_ip4,
3090 self.pg0.remote_ip4)
3091 self.assertEqual(p[TCP].sport, 20)
3092 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3093 self.assertEqual(data, p[Raw].load)
3095 def test_port_restricted(self):
3096 """ Port restricted NAT44 (MAP-E CE) """
3097 self.nat44_add_address(self.nat_addr)
3098 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3099 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3101 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3102 "psid-offset 6 psid-len 6")
3104 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3105 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3106 TCP(sport=4567, dport=22))
3107 self.pg0.add_stream(p)
3108 self.pg_enable_capture(self.pg_interfaces)
3110 capture = self.pg1.get_capture(1)
3115 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3116 self.assertEqual(ip.src, self.nat_addr)
3117 self.assertEqual(tcp.dport, 22)
3118 self.assertNotEqual(tcp.sport, 4567)
3119 self.assertEqual((tcp.sport >> 6) & 63, 10)
3120 self.assert_packet_checksums_valid(p)
3122 self.logger.error(ppp("Unexpected or invalid packet:", p))
3125 def test_ipfix_max_frags(self):
3126 """ IPFIX logging maximum fragments pending reassembly exceeded """
3127 self.nat44_add_address(self.nat_addr)
3128 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3129 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3131 self.vapi.nat_set_reass(max_frag=0)
3132 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3133 src_address=self.pg3.local_ip4n,
3135 template_interval=10)
3136 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3137 src_port=self.ipfix_src_port)
3139 data = "A" * 4 + "B" * 16 + "C" * 3
3140 self.tcp_port_in = random.randint(1025, 65535)
3141 pkts = self.create_stream_frag(self.pg0,
3142 self.pg1.remote_ip4,
3146 self.pg0.add_stream(pkts[-1])
3147 self.pg_enable_capture(self.pg_interfaces)
3149 self.pg1.assert_nothing_captured()
3151 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3152 capture = self.pg3.get_capture(9)
3153 ipfix = IPFIXDecoder()
3154 # first load template
3156 self.assertTrue(p.haslayer(IPFIX))
3157 self.assertEqual(p[IP].src, self.pg3.local_ip4)
3158 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3159 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3160 self.assertEqual(p[UDP].dport, 4739)
3161 self.assertEqual(p[IPFIX].observationDomainID,
3162 self.ipfix_domain_id)
3163 if p.haslayer(Template):
3164 ipfix.add_template(p.getlayer(Template))
3165 # verify events in data set
3167 if p.haslayer(Data):
3168 data = ipfix.decode_data_set(p.getlayer(Set))
3169 self.verify_ipfix_max_fragments_ip4(data, 0,
3170 self.pg0.remote_ip4n)
3172 def test_multiple_outside_vrf(self):
3173 """ Multiple outside VRF """
3177 self.pg1.unconfig_ip4()
3178 self.pg2.unconfig_ip4()
3179 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
3180 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
3181 self.pg1.set_table_ip4(vrf_id1)
3182 self.pg2.set_table_ip4(vrf_id2)
3183 self.pg1.config_ip4()
3184 self.pg2.config_ip4()
3185 self.pg1.resolve_arp()
3186 self.pg2.resolve_arp()
3188 self.nat44_add_address(self.nat_addr)
3189 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3190 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3192 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
3197 pkts = self.create_stream_in(self.pg0, self.pg1)
3198 self.pg0.add_stream(pkts)
3199 self.pg_enable_capture(self.pg_interfaces)
3201 capture = self.pg1.get_capture(len(pkts))
3202 self.verify_capture_out(capture, self.nat_addr)
3204 pkts = self.create_stream_out(self.pg1, self.nat_addr)
3205 self.pg1.add_stream(pkts)
3206 self.pg_enable_capture(self.pg_interfaces)
3208 capture = self.pg0.get_capture(len(pkts))
3209 self.verify_capture_in(capture, self.pg0)
3211 self.tcp_port_in = 60303
3212 self.udp_port_in = 60304
3213 self.icmp_id_in = 60305
3216 pkts = self.create_stream_in(self.pg0, self.pg2)
3217 self.pg0.add_stream(pkts)
3218 self.pg_enable_capture(self.pg_interfaces)
3220 capture = self.pg2.get_capture(len(pkts))
3221 self.verify_capture_out(capture, self.nat_addr)
3223 pkts = self.create_stream_out(self.pg2, self.nat_addr)
3224 self.pg2.add_stream(pkts)
3225 self.pg_enable_capture(self.pg_interfaces)
3227 capture = self.pg0.get_capture(len(pkts))
3228 self.verify_capture_in(capture, self.pg0)
3231 self.pg1.unconfig_ip4()
3232 self.pg2.unconfig_ip4()
3233 self.pg1.set_table_ip4(0)
3234 self.pg2.set_table_ip4(0)
3235 self.pg1.config_ip4()
3236 self.pg2.config_ip4()
3237 self.pg1.resolve_arp()
3238 self.pg2.resolve_arp()
3240 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3241 def test_session_timeout(self):
3242 """ NAT44 session timeouts """
3243 self.nat44_add_address(self.nat_addr)
3244 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3245 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3247 self.vapi.nat_set_timeouts(udp=5)
3251 for i in range(0, max_sessions):
3252 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
3253 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3254 IP(src=src, dst=self.pg1.remote_ip4) /
3255 UDP(sport=1025, dport=53))
3257 self.pg0.add_stream(pkts)
3258 self.pg_enable_capture(self.pg_interfaces)
3260 self.pg1.get_capture(max_sessions)
3265 for i in range(0, max_sessions):
3266 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
3267 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3268 IP(src=src, dst=self.pg1.remote_ip4) /
3269 UDP(sport=1026, dport=53))
3271 self.pg0.add_stream(pkts)
3272 self.pg_enable_capture(self.pg_interfaces)
3274 self.pg1.get_capture(max_sessions)
3277 users = self.vapi.nat44_user_dump()
3279 nsessions = nsessions + user.nsessions
3280 self.assertLess(nsessions, 2 * max_sessions)
3283 super(TestNAT44, self).tearDown()
3284 if not self.vpp_dead:
3285 self.logger.info(self.vapi.cli("show nat44 addresses"))
3286 self.logger.info(self.vapi.cli("show nat44 interfaces"))
3287 self.logger.info(self.vapi.cli("show nat44 static mappings"))
3288 self.logger.info(self.vapi.cli("show nat44 interface address"))
3289 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3290 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3291 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
3292 self.logger.info(self.vapi.cli("show nat timeouts"))
3293 self.vapi.cli("nat addr-port-assignment-alg default")
3295 self.vapi.cli("clear logging")
3298 class TestNAT44EndpointDependent(MethodHolder):
3299 """ Endpoint-Dependent mapping and filtering test cases """
3302 def setUpConstants(cls):
3303 super(TestNAT44EndpointDependent, cls).setUpConstants()
3304 cls.vpp_cmdline.extend(["nat", "{", "endpoint-dependent", "}"])
3307 def setUpClass(cls):
3308 super(TestNAT44EndpointDependent, cls).setUpClass()
3309 cls.vapi.cli("set log class nat level debug")
3311 cls.tcp_port_in = 6303
3312 cls.tcp_port_out = 6303
3313 cls.udp_port_in = 6304
3314 cls.udp_port_out = 6304
3315 cls.icmp_id_in = 6305
3316 cls.icmp_id_out = 6305
3317 cls.nat_addr = '10.0.0.3'
3318 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3319 cls.ipfix_src_port = 4739
3320 cls.ipfix_domain_id = 1
3321 cls.tcp_external_port = 80
3323 cls.create_pg_interfaces(range(7))
3324 cls.interfaces = list(cls.pg_interfaces[0:3])
3326 for i in cls.interfaces:
3331 cls.pg0.generate_remote_hosts(3)
3332 cls.pg0.configure_ipv4_neighbors()
3336 cls.pg4.generate_remote_hosts(2)
3337 cls.pg4.config_ip4()
3338 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
3339 cls.vapi.sw_interface_add_del_address(cls.pg4.sw_if_index,
3343 cls.pg4.resolve_arp()
3344 cls.pg4._remote_hosts[1]._ip4 = cls.pg4._remote_hosts[0]._ip4
3345 cls.pg4.resolve_arp()
3347 zero_ip4n = socket.inet_pton(socket.AF_INET, "0.0.0.0")
3348 cls.vapi.ip_table_add_del(1, is_add=1)
3350 cls.pg5._local_ip4 = "10.1.1.1"
3351 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET,
3353 cls.pg5._remote_hosts[0]._ip4 = "10.1.1.2"
3354 cls.pg5._remote_hosts[0]._ip4n = socket.inet_pton(
3355 socket.AF_INET, cls.pg5.remote_ip4)
3356 cls.pg5.set_table_ip4(1)
3357 cls.pg5.config_ip4()
3359 cls.vapi.ip_add_del_route(dst_address=cls.pg5.remote_ip4n,
3360 dst_address_length=32,
3362 next_hop_sw_if_index=cls.pg5.sw_if_index,
3363 next_hop_address=zero_ip4n)
3365 cls.pg6._local_ip4 = "10.1.2.1"
3366 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET,
3368 cls.pg6._remote_hosts[0]._ip4 = "10.1.2.2"
3369 cls.pg6._remote_hosts[0]._ip4n = socket.inet_pton(
3370 socket.AF_INET, cls.pg6.remote_ip4)
3371 cls.pg6.set_table_ip4(1)
3372 cls.pg6.config_ip4()
3374 cls.vapi.ip_add_del_route(dst_address=cls.pg6.remote_ip4n,
3375 dst_address_length=32,
3377 next_hop_sw_if_index=cls.pg6.sw_if_index,
3378 next_hop_address=zero_ip4n)
3380 cls.vapi.ip_add_del_route(dst_address=cls.pg6.remote_ip4n,
3381 dst_address_length=16,
3382 next_hop_address=zero_ip4n,
3384 next_hop_table_id=1)
3385 cls.vapi.ip_add_del_route(dst_address=zero_ip4n,
3386 dst_address_length=0,
3387 next_hop_address=zero_ip4n,
3389 next_hop_table_id=0)
3390 cls.vapi.ip_add_del_route(dst_address=zero_ip4n,
3391 dst_address_length=0,
3393 next_hop_sw_if_index=cls.pg1.sw_if_index,
3394 next_hop_address=cls.pg1.local_ip4n)
3396 cls.pg5.resolve_arp()
3397 cls.pg6.resolve_arp()
3400 super(TestNAT44EndpointDependent, cls).tearDownClass()
3403 def test_dynamic(self):
3404 """ NAT44 dynamic translation test """
3406 self.nat44_add_address(self.nat_addr)
3407 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3408 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3411 nat_config = self.vapi.nat_show_config()
3412 self.assertEqual(1, nat_config.endpoint_dependent)
3415 pkts = self.create_stream_in(self.pg0, self.pg1)
3416 self.pg0.add_stream(pkts)
3417 self.pg_enable_capture(self.pg_interfaces)
3419 capture = self.pg1.get_capture(len(pkts))
3420 self.verify_capture_out(capture)
3423 pkts = self.create_stream_out(self.pg1)
3424 self.pg1.add_stream(pkts)
3425 self.pg_enable_capture(self.pg_interfaces)
3427 capture = self.pg0.get_capture(len(pkts))
3428 self.verify_capture_in(capture, self.pg0)
3430 def test_forwarding(self):
3431 """ NAT44 forwarding test """
3433 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3434 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3436 self.vapi.nat44_forwarding_enable_disable(1)
3438 real_ip = self.pg0.remote_ip4n
3439 alias_ip = self.nat_addr_n
3440 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
3441 external_ip=alias_ip)
3444 # in2out - static mapping match
3446 pkts = self.create_stream_out(self.pg1)
3447 self.pg1.add_stream(pkts)
3448 self.pg_enable_capture(self.pg_interfaces)
3450 capture = self.pg0.get_capture(len(pkts))
3451 self.verify_capture_in(capture, self.pg0)
3453 pkts = self.create_stream_in(self.pg0, self.pg1)
3454 self.pg0.add_stream(pkts)
3455 self.pg_enable_capture(self.pg_interfaces)
3457 capture = self.pg1.get_capture(len(pkts))
3458 self.verify_capture_out(capture, same_port=True)
3460 # in2out - no static mapping match
3462 host0 = self.pg0.remote_hosts[0]
3463 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
3465 pkts = self.create_stream_out(self.pg1,
3466 dst_ip=self.pg0.remote_ip4,
3467 use_inside_ports=True)
3468 self.pg1.add_stream(pkts)
3469 self.pg_enable_capture(self.pg_interfaces)
3471 capture = self.pg0.get_capture(len(pkts))
3472 self.verify_capture_in(capture, self.pg0)
3474 pkts = self.create_stream_in(self.pg0, self.pg1)
3475 self.pg0.add_stream(pkts)
3476 self.pg_enable_capture(self.pg_interfaces)
3478 capture = self.pg1.get_capture(len(pkts))
3479 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3482 self.pg0.remote_hosts[0] = host0
3484 user = self.pg0.remote_hosts[1]
3485 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
3486 self.assertEqual(len(sessions), 3)
3487 self.assertTrue(sessions[0].ext_host_valid)
3488 self.vapi.nat44_del_session(
3489 sessions[0].inside_ip_address,
3490 sessions[0].inside_port,
3491 sessions[0].protocol,
3492 ext_host_address=sessions[0].ext_host_address,
3493 ext_host_port=sessions[0].ext_host_port)
3494 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
3495 self.assertEqual(len(sessions), 2)
3498 self.vapi.nat44_forwarding_enable_disable(0)
3499 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
3500 external_ip=alias_ip,
3503 def test_static_lb(self):
3504 """ NAT44 local service load balancing """
3505 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3508 server1 = self.pg0.remote_hosts[0]
3509 server2 = self.pg0.remote_hosts[1]
3511 locals = [{'addr': server1.ip4n,
3515 {'addr': server2.ip4n,
3520 self.nat44_add_address(self.nat_addr)
3521 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3524 local_num=len(locals),
3526 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3527 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3530 # from client to service
3531 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3532 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3533 TCP(sport=12345, dport=external_port))
3534 self.pg1.add_stream(p)
3535 self.pg_enable_capture(self.pg_interfaces)
3537 capture = self.pg0.get_capture(1)
3543 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3544 if ip.dst == server1.ip4:
3548 self.assertEqual(tcp.dport, local_port)
3549 self.assert_packet_checksums_valid(p)
3551 self.logger.error(ppp("Unexpected or invalid packet:", p))
3554 # from service back to client
3555 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3556 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
3557 TCP(sport=local_port, dport=12345))
3558 self.pg0.add_stream(p)
3559 self.pg_enable_capture(self.pg_interfaces)
3561 capture = self.pg1.get_capture(1)
3566 self.assertEqual(ip.src, self.nat_addr)
3567 self.assertEqual(tcp.sport, external_port)
3568 self.assert_packet_checksums_valid(p)
3570 self.logger.error(ppp("Unexpected or invalid packet:", p))
3573 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3574 self.assertEqual(len(sessions), 1)
3575 self.assertTrue(sessions[0].ext_host_valid)
3576 self.vapi.nat44_del_session(
3577 sessions[0].inside_ip_address,
3578 sessions[0].inside_port,
3579 sessions[0].protocol,
3580 ext_host_address=sessions[0].ext_host_address,
3581 ext_host_port=sessions[0].ext_host_port)
3582 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3583 self.assertEqual(len(sessions), 0)
3585 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3586 def test_static_lb_multi_clients(self):
3587 """ NAT44 local service load balancing - multiple clients"""
3589 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3592 server1 = self.pg0.remote_hosts[0]
3593 server2 = self.pg0.remote_hosts[1]
3595 locals = [{'addr': server1.ip4n,
3599 {'addr': server2.ip4n,
3604 self.nat44_add_address(self.nat_addr)
3605 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3608 local_num=len(locals),
3610 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3611 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3616 clients = ip4_range(self.pg1.remote_ip4, 10, 50)
3618 for client in clients:
3619 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3620 IP(src=client, dst=self.nat_addr) /
3621 TCP(sport=12345, dport=external_port))
3623 self.pg1.add_stream(pkts)
3624 self.pg_enable_capture(self.pg_interfaces)
3626 capture = self.pg0.get_capture(len(pkts))
3628 if p[IP].dst == server1.ip4:
3632 self.assertTrue(server1_n > server2_n)
3634 def test_static_lb_2(self):
3635 """ NAT44 local service load balancing (asymmetrical rule) """
3636 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3639 server1 = self.pg0.remote_hosts[0]
3640 server2 = self.pg0.remote_hosts[1]
3642 locals = [{'addr': server1.ip4n,
3646 {'addr': server2.ip4n,
3651 self.vapi.nat44_forwarding_enable_disable(1)
3652 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3656 local_num=len(locals),
3658 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3659 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3662 # from client to service
3663 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3664 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3665 TCP(sport=12345, dport=external_port))
3666 self.pg1.add_stream(p)
3667 self.pg_enable_capture(self.pg_interfaces)
3669 capture = self.pg0.get_capture(1)
3675 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3676 if ip.dst == server1.ip4:
3680 self.assertEqual(tcp.dport, local_port)
3681 self.assert_packet_checksums_valid(p)
3683 self.logger.error(ppp("Unexpected or invalid packet:", p))
3686 # from service back to client
3687 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3688 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
3689 TCP(sport=local_port, dport=12345))
3690 self.pg0.add_stream(p)
3691 self.pg_enable_capture(self.pg_interfaces)
3693 capture = self.pg1.get_capture(1)
3698 self.assertEqual(ip.src, self.nat_addr)
3699 self.assertEqual(tcp.sport, external_port)
3700 self.assert_packet_checksums_valid(p)
3702 self.logger.error(ppp("Unexpected or invalid packet:", p))
3705 # from client to server (no translation)
3706 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3707 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
3708 TCP(sport=12346, dport=local_port))
3709 self.pg1.add_stream(p)
3710 self.pg_enable_capture(self.pg_interfaces)
3712 capture = self.pg0.get_capture(1)
3718 self.assertEqual(ip.dst, server1.ip4)
3719 self.assertEqual(tcp.dport, local_port)
3720 self.assert_packet_checksums_valid(p)
3722 self.logger.error(ppp("Unexpected or invalid packet:", p))
3725 # from service back to client (no translation)
3726 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
3727 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
3728 TCP(sport=local_port, dport=12346))
3729 self.pg0.add_stream(p)
3730 self.pg_enable_capture(self.pg_interfaces)
3732 capture = self.pg1.get_capture(1)
3737 self.assertEqual(ip.src, server1.ip4)
3738 self.assertEqual(tcp.sport, local_port)
3739 self.assert_packet_checksums_valid(p)
3741 self.logger.error(ppp("Unexpected or invalid packet:", p))
3744 def test_lb_affinity(self):
3745 """ NAT44 local service load balancing affinity """
3746 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3749 server1 = self.pg0.remote_hosts[0]
3750 server2 = self.pg0.remote_hosts[1]
3752 locals = [{'addr': server1.ip4n,
3756 {'addr': server2.ip4n,
3761 self.nat44_add_address(self.nat_addr)
3762 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3766 local_num=len(locals),
3768 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3769 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3772 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3773 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3774 TCP(sport=1025, dport=external_port))
3775 self.pg1.add_stream(p)
3776 self.pg_enable_capture(self.pg_interfaces)
3778 capture = self.pg0.get_capture(1)
3779 backend = capture[0][IP].dst
3781 sessions = self.vapi.nat44_user_session_dump(
3782 socket.inet_pton(socket.AF_INET, backend), 0)
3783 self.assertEqual(len(sessions), 1)
3784 self.assertTrue(sessions[0].ext_host_valid)
3785 self.vapi.nat44_del_session(
3786 sessions[0].inside_ip_address,
3787 sessions[0].inside_port,
3788 sessions[0].protocol,
3789 ext_host_address=sessions[0].ext_host_address,
3790 ext_host_port=sessions[0].ext_host_port)
3793 for port in range(1030, 1100):
3794 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3795 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3796 TCP(sport=port, dport=external_port))
3798 self.pg1.add_stream(pkts)
3799 self.pg_enable_capture(self.pg_interfaces)
3801 capture = self.pg0.get_capture(len(pkts))
3803 self.assertEqual(p[IP].dst, backend)
3805 def test_unknown_proto(self):
3806 """ NAT44 translate packet with unknown protocol """
3807 self.nat44_add_address(self.nat_addr)
3808 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3809 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3813 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3814 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3815 TCP(sport=self.tcp_port_in, dport=20))
3816 self.pg0.add_stream(p)
3817 self.pg_enable_capture(self.pg_interfaces)
3819 p = self.pg1.get_capture(1)
3821 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3822 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3824 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3825 TCP(sport=1234, dport=1234))
3826 self.pg0.add_stream(p)
3827 self.pg_enable_capture(self.pg_interfaces)
3829 p = self.pg1.get_capture(1)
3832 self.assertEqual(packet[IP].src, self.nat_addr)
3833 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3834 self.assertTrue(packet.haslayer(GRE))
3835 self.assert_packet_checksums_valid(packet)
3837 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3841 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3842 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3844 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3845 TCP(sport=1234, dport=1234))
3846 self.pg1.add_stream(p)
3847 self.pg_enable_capture(self.pg_interfaces)
3849 p = self.pg0.get_capture(1)
3852 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3853 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3854 self.assertTrue(packet.haslayer(GRE))
3855 self.assert_packet_checksums_valid(packet)
3857 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3860 def test_hairpinning_unknown_proto(self):
3861 """ NAT44 translate packet with unknown protocol - hairpinning """
3862 host = self.pg0.remote_hosts[0]
3863 server = self.pg0.remote_hosts[1]
3865 server_out_port = 8765
3866 server_nat_ip = "10.0.0.11"
3868 self.nat44_add_address(self.nat_addr)
3869 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3870 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3873 # add static mapping for server
3874 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3877 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3878 IP(src=host.ip4, dst=server_nat_ip) /
3879 TCP(sport=host_in_port, dport=server_out_port))
3880 self.pg0.add_stream(p)
3881 self.pg_enable_capture(self.pg_interfaces)
3883 self.pg0.get_capture(1)
3885 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3886 IP(src=host.ip4, dst=server_nat_ip) /
3888 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3889 TCP(sport=1234, dport=1234))
3890 self.pg0.add_stream(p)
3891 self.pg_enable_capture(self.pg_interfaces)
3893 p = self.pg0.get_capture(1)
3896 self.assertEqual(packet[IP].src, self.nat_addr)
3897 self.assertEqual(packet[IP].dst, server.ip4)
3898 self.assertTrue(packet.haslayer(GRE))
3899 self.assert_packet_checksums_valid(packet)
3901 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3905 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3906 IP(src=server.ip4, dst=self.nat_addr) /
3908 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3909 TCP(sport=1234, dport=1234))
3910 self.pg0.add_stream(p)
3911 self.pg_enable_capture(self.pg_interfaces)
3913 p = self.pg0.get_capture(1)
3916 self.assertEqual(packet[IP].src, server_nat_ip)
3917 self.assertEqual(packet[IP].dst, host.ip4)
3918 self.assertTrue(packet.haslayer(GRE))
3919 self.assert_packet_checksums_valid(packet)
3921 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3924 def test_output_feature_and_service(self):
3925 """ NAT44 interface output feature and services """
3926 external_addr = '1.2.3.4'
3930 self.vapi.nat44_forwarding_enable_disable(1)
3931 self.nat44_add_address(self.nat_addr)
3932 self.vapi.nat44_add_del_identity_mapping(ip=self.pg1.remote_ip4n)
3933 self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
3934 local_port, external_port,
3935 proto=IP_PROTOS.tcp, out2in_only=1)
3936 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3937 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3939 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3942 # from client to service
3943 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3944 IP(src=self.pg1.remote_ip4, dst=external_addr) /
3945 TCP(sport=12345, dport=external_port))
3946 self.pg1.add_stream(p)
3947 self.pg_enable_capture(self.pg_interfaces)
3949 capture = self.pg0.get_capture(1)
3954 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3955 self.assertEqual(tcp.dport, local_port)
3956 self.assert_packet_checksums_valid(p)
3958 self.logger.error(ppp("Unexpected or invalid packet:", p))
3961 # from service back to client
3962 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3963 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3964 TCP(sport=local_port, dport=12345))
3965 self.pg0.add_stream(p)
3966 self.pg_enable_capture(self.pg_interfaces)
3968 capture = self.pg1.get_capture(1)
3973 self.assertEqual(ip.src, external_addr)
3974 self.assertEqual(tcp.sport, external_port)
3975 self.assert_packet_checksums_valid(p)
3977 self.logger.error(ppp("Unexpected or invalid packet:", p))
3980 # from local network host to external network
3981 pkts = self.create_stream_in(self.pg0, self.pg1)
3982 self.pg0.add_stream(pkts)
3983 self.pg_enable_capture(self.pg_interfaces)
3985 capture = self.pg1.get_capture(len(pkts))
3986 self.verify_capture_out(capture)
3987 pkts = self.create_stream_in(self.pg0, self.pg1)
3988 self.pg0.add_stream(pkts)
3989 self.pg_enable_capture(self.pg_interfaces)
3991 capture = self.pg1.get_capture(len(pkts))
3992 self.verify_capture_out(capture)
3994 # from external network back to local network host
3995 pkts = self.create_stream_out(self.pg1)
3996 self.pg1.add_stream(pkts)
3997 self.pg_enable_capture(self.pg_interfaces)
3999 capture = self.pg0.get_capture(len(pkts))
4000 self.verify_capture_in(capture, self.pg0)
4002 def test_output_feature_and_service2(self):
4003 """ NAT44 interface output feature and service host direct access """
4004 self.vapi.nat44_forwarding_enable_disable(1)
4005 self.nat44_add_address(self.nat_addr)
4006 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4009 # session initiaded from service host - translate
4010 pkts = self.create_stream_in(self.pg0, self.pg1)
4011 self.pg0.add_stream(pkts)
4012 self.pg_enable_capture(self.pg_interfaces)
4014 capture = self.pg1.get_capture(len(pkts))
4015 self.verify_capture_out(capture)
4017 pkts = self.create_stream_out(self.pg1)
4018 self.pg1.add_stream(pkts)
4019 self.pg_enable_capture(self.pg_interfaces)
4021 capture = self.pg0.get_capture(len(pkts))
4022 self.verify_capture_in(capture, self.pg0)
4024 # session initiaded from remote host - do not translate
4025 self.tcp_port_in = 60303
4026 self.udp_port_in = 60304
4027 self.icmp_id_in = 60305
4028 pkts = self.create_stream_out(self.pg1,
4029 self.pg0.remote_ip4,
4030 use_inside_ports=True)
4031 self.pg1.add_stream(pkts)
4032 self.pg_enable_capture(self.pg_interfaces)
4034 capture = self.pg0.get_capture(len(pkts))
4035 self.verify_capture_in(capture, self.pg0)
4037 pkts = self.create_stream_in(self.pg0, self.pg1)
4038 self.pg0.add_stream(pkts)
4039 self.pg_enable_capture(self.pg_interfaces)
4041 capture = self.pg1.get_capture(len(pkts))
4042 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
4045 def test_output_feature_and_service3(self):
4046 """ NAT44 interface output feature and DST NAT """
4047 external_addr = '1.2.3.4'
4051 self.vapi.nat44_forwarding_enable_disable(1)
4052 self.nat44_add_address(self.nat_addr)
4053 self.nat44_add_static_mapping(self.pg1.remote_ip4, external_addr,
4054 local_port, external_port,
4055 proto=IP_PROTOS.tcp, out2in_only=1)
4056 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4057 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4059 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4062 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4063 IP(src=self.pg0.remote_ip4, dst=external_addr) /
4064 TCP(sport=12345, dport=external_port))
4065 self.pg0.add_stream(p)
4066 self.pg_enable_capture(self.pg_interfaces)
4068 capture = self.pg1.get_capture(1)
4073 self.assertEqual(ip.src, self.pg0.remote_ip4)
4074 self.assertEqual(tcp.sport, 12345)
4075 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4076 self.assertEqual(tcp.dport, local_port)
4077 self.assert_packet_checksums_valid(p)
4079 self.logger.error(ppp("Unexpected or invalid packet:", p))
4082 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4083 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
4084 TCP(sport=local_port, dport=12345))
4085 self.pg1.add_stream(p)
4086 self.pg_enable_capture(self.pg_interfaces)
4088 capture = self.pg0.get_capture(1)
4093 self.assertEqual(ip.src, external_addr)
4094 self.assertEqual(tcp.sport, external_port)
4095 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4096 self.assertEqual(tcp.dport, 12345)
4097 self.assert_packet_checksums_valid(p)
4099 self.logger.error(ppp("Unexpected or invalid packet:", p))
4102 def test_next_src_nat(self):
4103 """ On way back forward packet to nat44-in2out node. """
4104 twice_nat_addr = '10.0.1.3'
4107 post_twice_nat_port = 0
4109 self.vapi.nat44_forwarding_enable_disable(1)
4110 self.nat44_add_address(twice_nat_addr, twice_nat=1)
4111 self.nat44_add_static_mapping(self.pg6.remote_ip4, self.pg1.remote_ip4,
4112 local_port, external_port,
4113 proto=IP_PROTOS.tcp, out2in_only=1,
4114 self_twice_nat=1, vrf_id=1)
4115 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index,
4118 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4119 IP(src=self.pg6.remote_ip4, dst=self.pg1.remote_ip4) /
4120 TCP(sport=12345, dport=external_port))
4121 self.pg6.add_stream(p)
4122 self.pg_enable_capture(self.pg_interfaces)
4124 capture = self.pg6.get_capture(1)
4129 self.assertEqual(ip.src, twice_nat_addr)
4130 self.assertNotEqual(tcp.sport, 12345)
4131 post_twice_nat_port = tcp.sport
4132 self.assertEqual(ip.dst, self.pg6.remote_ip4)
4133 self.assertEqual(tcp.dport, local_port)
4134 self.assert_packet_checksums_valid(p)
4136 self.logger.error(ppp("Unexpected or invalid packet:", p))
4139 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4140 IP(src=self.pg6.remote_ip4, dst=twice_nat_addr) /
4141 TCP(sport=local_port, dport=post_twice_nat_port))
4142 self.pg6.add_stream(p)
4143 self.pg_enable_capture(self.pg_interfaces)
4145 capture = self.pg6.get_capture(1)
4150 self.assertEqual(ip.src, self.pg1.remote_ip4)
4151 self.assertEqual(tcp.sport, external_port)
4152 self.assertEqual(ip.dst, self.pg6.remote_ip4)
4153 self.assertEqual(tcp.dport, 12345)
4154 self.assert_packet_checksums_valid(p)
4156 self.logger.error(ppp("Unexpected or invalid packet:", p))
4159 def twice_nat_common(self, self_twice_nat=False, same_pg=False, lb=False,
4161 twice_nat_addr = '10.0.1.3'
4169 port_in1 = port_in+1
4170 port_in2 = port_in+2
4175 server1 = self.pg0.remote_hosts[0]
4176 server2 = self.pg0.remote_hosts[1]
4188 eh_translate = ((not self_twice_nat) or (not lb and same_pg) or
4191 self.nat44_add_address(self.nat_addr)
4192 self.nat44_add_address(twice_nat_addr, twice_nat=1)
4194 self.nat44_add_static_mapping(pg0.remote_ip4, self.nat_addr,
4196 proto=IP_PROTOS.tcp,
4197 twice_nat=int(not self_twice_nat),
4198 self_twice_nat=int(self_twice_nat))
4200 locals = [{'addr': server1.ip4n,
4204 {'addr': server2.ip4n,
4208 out_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
4209 self.vapi.nat44_add_del_lb_static_mapping(out_addr_n,
4213 not self_twice_nat),
4216 local_num=len(locals),
4218 self.vapi.nat44_interface_add_del_feature(pg0.sw_if_index)
4219 self.vapi.nat44_interface_add_del_feature(pg1.sw_if_index,
4226 assert client_id is not None
4228 client = self.pg0.remote_hosts[0]
4229 elif client_id == 2:
4230 client = self.pg0.remote_hosts[1]
4232 client = pg1.remote_hosts[0]
4233 p = (Ether(src=pg1.remote_mac, dst=pg1.local_mac) /
4234 IP(src=client.ip4, dst=self.nat_addr) /
4235 TCP(sport=eh_port_out, dport=port_out))
4237 self.pg_enable_capture(self.pg_interfaces)
4239 capture = pg0.get_capture(1)
4245 if ip.dst == server1.ip4:
4251 self.assertEqual(ip.dst, server.ip4)
4253 self.assertIn(tcp.dport, [port_in1, port_in2])
4255 self.assertEqual(tcp.dport, port_in)
4257 self.assertEqual(ip.src, twice_nat_addr)
4258 self.assertNotEqual(tcp.sport, eh_port_out)
4260 self.assertEqual(ip.src, client.ip4)
4261 self.assertEqual(tcp.sport, eh_port_out)
4263 eh_port_in = tcp.sport
4264 saved_port_in = tcp.dport
4265 self.assert_packet_checksums_valid(p)
4267 self.logger.error(ppp("Unexpected or invalid packet:", p))
4270 p = (Ether(src=server.mac, dst=pg0.local_mac) /
4271 IP(src=server.ip4, dst=eh_addr_in) /
4272 TCP(sport=saved_port_in, dport=eh_port_in))
4274 self.pg_enable_capture(self.pg_interfaces)
4276 capture = pg1.get_capture(1)
4281 self.assertEqual(ip.dst, client.ip4)
4282 self.assertEqual(ip.src, self.nat_addr)
4283 self.assertEqual(tcp.dport, eh_port_out)
4284 self.assertEqual(tcp.sport, port_out)
4285 self.assert_packet_checksums_valid(p)
4287 self.logger.error(ppp("Unexpected or invalid packet:", p))
4291 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
4292 self.assertEqual(len(sessions), 1)
4293 self.assertTrue(sessions[0].ext_host_valid)
4294 self.assertTrue(sessions[0].is_twicenat)
4295 self.vapi.nat44_del_session(
4296 sessions[0].inside_ip_address,
4297 sessions[0].inside_port,
4298 sessions[0].protocol,
4299 ext_host_address=sessions[0].ext_host_nat_address,
4300 ext_host_port=sessions[0].ext_host_nat_port)
4301 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
4302 self.assertEqual(len(sessions), 0)
4304 def test_twice_nat(self):
4306 self.twice_nat_common()
4308 def test_self_twice_nat_positive(self):
4309 """ Self Twice NAT44 (positive test) """
4310 self.twice_nat_common(self_twice_nat=True, same_pg=True)
4312 def test_self_twice_nat_negative(self):
4313 """ Self Twice NAT44 (negative test) """
4314 self.twice_nat_common(self_twice_nat=True)
4316 def test_twice_nat_lb(self):
4317 """ Twice NAT44 local service load balancing """
4318 self.twice_nat_common(lb=True)
4320 def test_self_twice_nat_lb_positive(self):
4321 """ Self Twice NAT44 local service load balancing (positive test) """
4322 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4325 def test_self_twice_nat_lb_negative(self):
4326 """ Self Twice NAT44 local service load balancing (negative test) """
4327 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4330 def test_twice_nat_interface_addr(self):
4331 """ Acquire twice NAT44 addresses from interface """
4332 self.vapi.nat44_add_interface_addr(self.pg3.sw_if_index, twice_nat=1)
4334 # no address in NAT pool
4335 adresses = self.vapi.nat44_address_dump()
4336 self.assertEqual(0, len(adresses))
4338 # configure interface address and check NAT address pool
4339 self.pg3.config_ip4()
4340 adresses = self.vapi.nat44_address_dump()
4341 self.assertEqual(1, len(adresses))
4342 self.assertEqual(adresses[0].ip_address[0:4], self.pg3.local_ip4n)
4343 self.assertEqual(adresses[0].twice_nat, 1)
4345 # remove interface address and check NAT address pool
4346 self.pg3.unconfig_ip4()
4347 adresses = self.vapi.nat44_address_dump()
4348 self.assertEqual(0, len(adresses))
4350 def test_tcp_session_close_in(self):
4351 """ Close TCP session from inside network """
4352 self.tcp_port_out = 10505
4353 self.nat44_add_address(self.nat_addr)
4354 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4358 proto=IP_PROTOS.tcp,
4360 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4361 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4364 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4365 start_sessnum = len(sessions)
4367 self.initiate_tcp_session(self.pg0, self.pg1)
4369 # FIN packet in -> out
4370 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4371 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4372 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4373 flags="FA", seq=100, ack=300))
4374 self.pg0.add_stream(p)
4375 self.pg_enable_capture(self.pg_interfaces)
4377 self.pg1.get_capture(1)
4381 # ACK packet out -> in
4382 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4383 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4384 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4385 flags="A", seq=300, ack=101))
4388 # FIN packet out -> in
4389 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4390 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4391 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4392 flags="FA", seq=300, ack=101))
4395 self.pg1.add_stream(pkts)
4396 self.pg_enable_capture(self.pg_interfaces)
4398 self.pg0.get_capture(2)
4400 # ACK packet in -> out
4401 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4402 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4403 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4404 flags="A", seq=101, ack=301))
4405 self.pg0.add_stream(p)
4406 self.pg_enable_capture(self.pg_interfaces)
4408 self.pg1.get_capture(1)
4410 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4412 self.assertEqual(len(sessions) - start_sessnum, 0)
4414 def test_tcp_session_close_out(self):
4415 """ Close TCP session from outside network """
4416 self.tcp_port_out = 10505
4417 self.nat44_add_address(self.nat_addr)
4418 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4422 proto=IP_PROTOS.tcp,
4424 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4425 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4428 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4429 start_sessnum = len(sessions)
4431 self.initiate_tcp_session(self.pg0, self.pg1)
4433 # FIN packet out -> in
4434 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4435 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4436 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4437 flags="FA", seq=100, ack=300))
4438 self.pg1.add_stream(p)
4439 self.pg_enable_capture(self.pg_interfaces)
4441 self.pg0.get_capture(1)
4443 # FIN+ACK packet in -> out
4444 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4445 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4446 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4447 flags="FA", seq=300, ack=101))
4449 self.pg0.add_stream(p)
4450 self.pg_enable_capture(self.pg_interfaces)
4452 self.pg1.get_capture(1)
4454 # ACK packet out -> in
4455 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4456 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4457 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4458 flags="A", seq=101, ack=301))
4459 self.pg1.add_stream(p)
4460 self.pg_enable_capture(self.pg_interfaces)
4462 self.pg0.get_capture(1)
4464 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4466 self.assertEqual(len(sessions) - start_sessnum, 0)
4468 def test_tcp_session_close_simultaneous(self):
4469 """ Close TCP session from inside network """
4470 self.tcp_port_out = 10505
4471 self.nat44_add_address(self.nat_addr)
4472 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4476 proto=IP_PROTOS.tcp,
4478 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4479 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4482 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4483 start_sessnum = len(sessions)
4485 self.initiate_tcp_session(self.pg0, self.pg1)
4487 # FIN packet in -> out
4488 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4489 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4490 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4491 flags="FA", seq=100, ack=300))
4492 self.pg0.add_stream(p)
4493 self.pg_enable_capture(self.pg_interfaces)
4495 self.pg1.get_capture(1)
4497 # FIN packet out -> in
4498 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4499 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4500 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4501 flags="FA", seq=300, ack=100))
4502 self.pg1.add_stream(p)
4503 self.pg_enable_capture(self.pg_interfaces)
4505 self.pg0.get_capture(1)
4507 # ACK packet in -> out
4508 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4509 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4510 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4511 flags="A", seq=101, ack=301))
4512 self.pg0.add_stream(p)
4513 self.pg_enable_capture(self.pg_interfaces)
4515 self.pg1.get_capture(1)
4517 # ACK packet out -> in
4518 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4519 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4520 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4521 flags="A", seq=301, ack=101))
4522 self.pg1.add_stream(p)
4523 self.pg_enable_capture(self.pg_interfaces)
4525 self.pg0.get_capture(1)
4527 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4529 self.assertEqual(len(sessions) - start_sessnum, 0)
4531 def test_one_armed_nat44_static(self):
4532 """ One armed NAT44 and 1:1 NAPT asymmetrical rule """
4533 remote_host = self.pg4.remote_hosts[0]
4534 local_host = self.pg4.remote_hosts[1]
4539 self.vapi.nat44_forwarding_enable_disable(1)
4540 self.nat44_add_address(self.nat_addr, twice_nat=1)
4541 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
4542 local_port, external_port,
4543 proto=IP_PROTOS.tcp, out2in_only=1,
4545 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
4546 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index,
4549 # from client to service
4550 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
4551 IP(src=remote_host.ip4, dst=self.nat_addr) /
4552 TCP(sport=12345, dport=external_port))
4553 self.pg4.add_stream(p)
4554 self.pg_enable_capture(self.pg_interfaces)
4556 capture = self.pg4.get_capture(1)
4561 self.assertEqual(ip.dst, local_host.ip4)
4562 self.assertEqual(ip.src, self.nat_addr)
4563 self.assertEqual(tcp.dport, local_port)
4564 self.assertNotEqual(tcp.sport, 12345)
4565 eh_port_in = tcp.sport
4566 self.assert_packet_checksums_valid(p)
4568 self.logger.error(ppp("Unexpected or invalid packet:", p))
4571 # from service back to client
4572 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
4573 IP(src=local_host.ip4, dst=self.nat_addr) /
4574 TCP(sport=local_port, dport=eh_port_in))
4575 self.pg4.add_stream(p)
4576 self.pg_enable_capture(self.pg_interfaces)
4578 capture = self.pg4.get_capture(1)
4583 self.assertEqual(ip.src, self.nat_addr)
4584 self.assertEqual(ip.dst, remote_host.ip4)
4585 self.assertEqual(tcp.sport, external_port)
4586 self.assertEqual(tcp.dport, 12345)
4587 self.assert_packet_checksums_valid(p)
4589 self.logger.error(ppp("Unexpected or invalid packet:", p))
4592 def test_static_with_port_out2(self):
4593 """ 1:1 NAPT asymmetrical rule """
4598 self.vapi.nat44_forwarding_enable_disable(1)
4599 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
4600 local_port, external_port,
4601 proto=IP_PROTOS.tcp, out2in_only=1)
4602 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4603 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4606 # from client to service
4607 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4608 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4609 TCP(sport=12345, dport=external_port))
4610 self.pg1.add_stream(p)
4611 self.pg_enable_capture(self.pg_interfaces)
4613 capture = self.pg0.get_capture(1)
4618 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4619 self.assertEqual(tcp.dport, local_port)
4620 self.assert_packet_checksums_valid(p)
4622 self.logger.error(ppp("Unexpected or invalid packet:", p))
4626 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4627 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4628 ICMP(type=11) / capture[0][IP])
4629 self.pg0.add_stream(p)
4630 self.pg_enable_capture(self.pg_interfaces)
4632 capture = self.pg1.get_capture(1)
4635 self.assertEqual(p[IP].src, self.nat_addr)
4637 self.assertEqual(inner.dst, self.nat_addr)
4638 self.assertEqual(inner[TCPerror].dport, external_port)
4640 self.logger.error(ppp("Unexpected or invalid packet:", p))
4643 # from service back to client
4644 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4645 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4646 TCP(sport=local_port, dport=12345))
4647 self.pg0.add_stream(p)
4648 self.pg_enable_capture(self.pg_interfaces)
4650 capture = self.pg1.get_capture(1)
4655 self.assertEqual(ip.src, self.nat_addr)
4656 self.assertEqual(tcp.sport, external_port)
4657 self.assert_packet_checksums_valid(p)
4659 self.logger.error(ppp("Unexpected or invalid packet:", p))
4663 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4664 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4665 ICMP(type=11) / capture[0][IP])
4666 self.pg1.add_stream(p)
4667 self.pg_enable_capture(self.pg_interfaces)
4669 capture = self.pg0.get_capture(1)
4672 self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
4674 self.assertEqual(inner.src, self.pg0.remote_ip4)
4675 self.assertEqual(inner[TCPerror].sport, local_port)
4677 self.logger.error(ppp("Unexpected or invalid packet:", p))
4680 # from client to server (no translation)
4681 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4682 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
4683 TCP(sport=12346, dport=local_port))
4684 self.pg1.add_stream(p)
4685 self.pg_enable_capture(self.pg_interfaces)
4687 capture = self.pg0.get_capture(1)
4692 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4693 self.assertEqual(tcp.dport, local_port)
4694 self.assert_packet_checksums_valid(p)
4696 self.logger.error(ppp("Unexpected or invalid packet:", p))
4699 # from service back to client (no translation)
4700 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4701 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4702 TCP(sport=local_port, dport=12346))
4703 self.pg0.add_stream(p)
4704 self.pg_enable_capture(self.pg_interfaces)
4706 capture = self.pg1.get_capture(1)
4711 self.assertEqual(ip.src, self.pg0.remote_ip4)
4712 self.assertEqual(tcp.sport, local_port)
4713 self.assert_packet_checksums_valid(p)
4715 self.logger.error(ppp("Unexpected or invalid packet:", p))
4718 def test_output_feature(self):
4719 """ NAT44 interface output feature (in2out postrouting) """
4720 self.vapi.nat44_forwarding_enable_disable(1)
4721 self.nat44_add_address(self.nat_addr)
4722 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4724 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4728 pkts = self.create_stream_in(self.pg0, self.pg1)
4729 self.pg0.add_stream(pkts)
4730 self.pg_enable_capture(self.pg_interfaces)
4732 capture = self.pg1.get_capture(len(pkts))
4733 self.verify_capture_out(capture)
4736 pkts = self.create_stream_out(self.pg1)
4737 self.pg1.add_stream(pkts)
4738 self.pg_enable_capture(self.pg_interfaces)
4740 capture = self.pg0.get_capture(len(pkts))
4741 self.verify_capture_in(capture, self.pg0)
4743 def test_multiple_vrf(self):
4744 """ Multiple VRF setup """
4745 external_addr = '1.2.3.4'
4750 self.vapi.nat44_forwarding_enable_disable(1)
4751 self.nat44_add_address(self.nat_addr)
4752 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4753 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4755 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4757 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
4758 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index,
4760 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index,
4762 self.nat44_add_static_mapping(self.pg5.remote_ip4, external_addr,
4763 local_port, external_port, vrf_id=1,
4764 proto=IP_PROTOS.tcp, out2in_only=1)
4765 self.nat44_add_static_mapping(
4766 self.pg0.remote_ip4, external_sw_if_index=self.pg0.sw_if_index,
4767 local_port=local_port, vrf_id=0, external_port=external_port,
4768 proto=IP_PROTOS.tcp, out2in_only=1)
4770 # from client to service (both VRF1)
4771 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4772 IP(src=self.pg6.remote_ip4, dst=external_addr) /
4773 TCP(sport=12345, dport=external_port))
4774 self.pg6.add_stream(p)
4775 self.pg_enable_capture(self.pg_interfaces)
4777 capture = self.pg5.get_capture(1)
4782 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4783 self.assertEqual(tcp.dport, local_port)
4784 self.assert_packet_checksums_valid(p)
4786 self.logger.error(ppp("Unexpected or invalid packet:", p))
4789 # from service back to client (both VRF1)
4790 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4791 IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) /
4792 TCP(sport=local_port, dport=12345))
4793 self.pg5.add_stream(p)
4794 self.pg_enable_capture(self.pg_interfaces)
4796 capture = self.pg6.get_capture(1)
4801 self.assertEqual(ip.src, external_addr)
4802 self.assertEqual(tcp.sport, external_port)
4803 self.assert_packet_checksums_valid(p)
4805 self.logger.error(ppp("Unexpected or invalid packet:", p))
4808 # dynamic NAT from VRF1 to VRF0 (output-feature)
4809 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4810 IP(src=self.pg5.remote_ip4, dst=self.pg1.remote_ip4) /
4811 TCP(sport=2345, dport=22))
4812 self.pg5.add_stream(p)
4813 self.pg_enable_capture(self.pg_interfaces)
4815 capture = self.pg1.get_capture(1)
4820 self.assertEqual(ip.src, self.nat_addr)
4821 self.assertNotEqual(tcp.sport, 2345)
4822 self.assert_packet_checksums_valid(p)
4825 self.logger.error(ppp("Unexpected or invalid packet:", p))
4828 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4829 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4830 TCP(sport=22, dport=port))
4831 self.pg1.add_stream(p)
4832 self.pg_enable_capture(self.pg_interfaces)
4834 capture = self.pg5.get_capture(1)
4839 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4840 self.assertEqual(tcp.dport, 2345)
4841 self.assert_packet_checksums_valid(p)
4843 self.logger.error(ppp("Unexpected or invalid packet:", p))
4846 # from client VRF1 to service VRF0
4847 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4848 IP(src=self.pg6.remote_ip4, dst=self.pg0.local_ip4) /
4849 TCP(sport=12346, dport=external_port))
4850 self.pg6.add_stream(p)
4851 self.pg_enable_capture(self.pg_interfaces)
4853 capture = self.pg0.get_capture(1)
4858 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4859 self.assertEqual(tcp.dport, local_port)
4860 self.assert_packet_checksums_valid(p)
4862 self.logger.error(ppp("Unexpected or invalid packet:", p))
4865 # from service VRF0 back to client VRF1
4866 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4867 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
4868 TCP(sport=local_port, dport=12346))
4869 self.pg0.add_stream(p)
4870 self.pg_enable_capture(self.pg_interfaces)
4872 capture = self.pg6.get_capture(1)
4877 self.assertEqual(ip.src, self.pg0.local_ip4)
4878 self.assertEqual(tcp.sport, external_port)
4879 self.assert_packet_checksums_valid(p)
4881 self.logger.error(ppp("Unexpected or invalid packet:", p))
4884 # from client VRF0 to service VRF1
4885 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4886 IP(src=self.pg0.remote_ip4, dst=external_addr) /
4887 TCP(sport=12347, dport=external_port))
4888 self.pg0.add_stream(p)
4889 self.pg_enable_capture(self.pg_interfaces)
4891 capture = self.pg5.get_capture(1)
4896 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4897 self.assertEqual(tcp.dport, local_port)
4898 self.assert_packet_checksums_valid(p)
4900 self.logger.error(ppp("Unexpected or invalid packet:", p))
4903 # from service VRF1 back to client VRF0
4904 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4905 IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) /
4906 TCP(sport=local_port, dport=12347))
4907 self.pg5.add_stream(p)
4908 self.pg_enable_capture(self.pg_interfaces)
4910 capture = self.pg0.get_capture(1)
4915 self.assertEqual(ip.src, external_addr)
4916 self.assertEqual(tcp.sport, external_port)
4917 self.assert_packet_checksums_valid(p)
4919 self.logger.error(ppp("Unexpected or invalid packet:", p))
4922 # from client to server (both VRF1, no translation)
4923 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4924 IP(src=self.pg6.remote_ip4, dst=self.pg5.remote_ip4) /
4925 TCP(sport=12348, dport=local_port))
4926 self.pg6.add_stream(p)
4927 self.pg_enable_capture(self.pg_interfaces)
4929 capture = self.pg5.get_capture(1)
4934 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4935 self.assertEqual(tcp.dport, local_port)
4936 self.assert_packet_checksums_valid(p)
4938 self.logger.error(ppp("Unexpected or invalid packet:", p))
4941 # from server back to client (both VRF1, no translation)
4942 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4943 IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) /
4944 TCP(sport=local_port, dport=12348))
4945 self.pg5.add_stream(p)
4946 self.pg_enable_capture(self.pg_interfaces)
4948 capture = self.pg6.get_capture(1)
4953 self.assertEqual(ip.src, self.pg5.remote_ip4)
4954 self.assertEqual(tcp.sport, local_port)
4955 self.assert_packet_checksums_valid(p)
4957 self.logger.error(ppp("Unexpected or invalid packet:", p))
4960 # from client VRF1 to server VRF0 (no translation)
4961 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4962 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
4963 TCP(sport=local_port, dport=12349))
4964 self.pg0.add_stream(p)
4965 self.pg_enable_capture(self.pg_interfaces)
4967 capture = self.pg6.get_capture(1)
4972 self.assertEqual(ip.src, self.pg0.remote_ip4)
4973 self.assertEqual(tcp.sport, local_port)
4974 self.assert_packet_checksums_valid(p)
4976 self.logger.error(ppp("Unexpected or invalid packet:", p))
4979 # from server VRF0 back to client VRF1 (no translation)
4980 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4981 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
4982 TCP(sport=local_port, dport=12349))
4983 self.pg0.add_stream(p)
4984 self.pg_enable_capture(self.pg_interfaces)
4986 capture = self.pg6.get_capture(1)
4991 self.assertEqual(ip.src, self.pg0.remote_ip4)
4992 self.assertEqual(tcp.sport, local_port)
4993 self.assert_packet_checksums_valid(p)
4995 self.logger.error(ppp("Unexpected or invalid packet:", p))
4998 # from client VRF0 to server VRF1 (no translation)
4999 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5000 IP(src=self.pg0.remote_ip4, dst=self.pg5.remote_ip4) /
5001 TCP(sport=12344, dport=local_port))
5002 self.pg0.add_stream(p)
5003 self.pg_enable_capture(self.pg_interfaces)
5005 capture = self.pg5.get_capture(1)
5010 self.assertEqual(ip.dst, self.pg5.remote_ip4)
5011 self.assertEqual(tcp.dport, local_port)
5012 self.assert_packet_checksums_valid(p)
5014 self.logger.error(ppp("Unexpected or invalid packet:", p))
5017 # from server VRF1 back to client VRF0 (no translation)
5018 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
5019 IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) /
5020 TCP(sport=local_port, dport=12344))
5021 self.pg5.add_stream(p)
5022 self.pg_enable_capture(self.pg_interfaces)
5024 capture = self.pg0.get_capture(1)
5029 self.assertEqual(ip.src, self.pg5.remote_ip4)
5030 self.assertEqual(tcp.sport, local_port)
5031 self.assert_packet_checksums_valid(p)
5033 self.logger.error(ppp("Unexpected or invalid packet:", p))
5036 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5037 def test_session_timeout(self):
5038 """ NAT44 session timeouts """
5039 self.nat44_add_address(self.nat_addr)
5040 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5041 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5043 self.vapi.nat_set_timeouts(icmp=5)
5047 for i in range(0, max_sessions):
5048 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
5049 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5050 IP(src=src, dst=self.pg1.remote_ip4) /
5051 ICMP(id=1025, type='echo-request'))
5053 self.pg0.add_stream(pkts)
5054 self.pg_enable_capture(self.pg_interfaces)
5056 self.pg1.get_capture(max_sessions)
5061 for i in range(0, max_sessions):
5062 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
5063 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5064 IP(src=src, dst=self.pg1.remote_ip4) /
5065 ICMP(id=1026, type='echo-request'))
5067 self.pg0.add_stream(pkts)
5068 self.pg_enable_capture(self.pg_interfaces)
5070 self.pg1.get_capture(max_sessions)
5073 users = self.vapi.nat44_user_dump()
5075 nsessions = nsessions + user.nsessions
5076 self.assertLess(nsessions, 2 * max_sessions)
5078 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5079 def test_session_limit_per_user(self):
5080 """ Maximum sessions per user limit """
5081 self.nat44_add_address(self.nat_addr)
5082 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5083 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5085 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
5086 src_address=self.pg2.local_ip4n,
5088 template_interval=10)
5090 # get maximum number of translations per user
5091 nat44_config = self.vapi.nat_show_config()
5094 for port in range(0, nat44_config.max_translations_per_user):
5095 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5096 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5097 UDP(sport=1025 + port, dport=1025 + port))
5100 self.pg0.add_stream(pkts)
5101 self.pg_enable_capture(self.pg_interfaces)
5103 capture = self.pg1.get_capture(len(pkts))
5105 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5106 src_port=self.ipfix_src_port)
5108 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5109 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5110 UDP(sport=3001, dport=3002))
5111 self.pg0.add_stream(p)
5112 self.pg_enable_capture(self.pg_interfaces)
5114 capture = self.pg1.assert_nothing_captured()
5116 # verify IPFIX logging
5117 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5119 capture = self.pg2.get_capture(10)
5120 ipfix = IPFIXDecoder()
5121 # first load template
5123 self.assertTrue(p.haslayer(IPFIX))
5124 if p.haslayer(Template):
5125 ipfix.add_template(p.getlayer(Template))
5126 # verify events in data set
5128 if p.haslayer(Data):
5129 data = ipfix.decode_data_set(p.getlayer(Set))
5130 self.verify_ipfix_max_entries_per_user(
5132 nat44_config.max_translations_per_user,
5133 self.pg0.remote_ip4n)
5136 super(TestNAT44EndpointDependent, self).tearDown()
5137 if not self.vpp_dead:
5138 self.logger.info(self.vapi.cli("show nat44 addresses"))
5139 self.logger.info(self.vapi.cli("show nat44 interfaces"))
5140 self.logger.info(self.vapi.cli("show nat44 static mappings"))
5141 self.logger.info(self.vapi.cli("show nat44 interface address"))
5142 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
5143 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
5144 self.logger.info(self.vapi.cli("show nat timeouts"))
5146 self.vapi.cli("clear logging")
5149 class TestNAT44Out2InDPO(MethodHolder):
5150 """ NAT44 Test Cases using out2in DPO """
5153 def setUpConstants(cls):
5154 super(TestNAT44Out2InDPO, cls).setUpConstants()
5155 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
5158 def setUpClass(cls):
5159 super(TestNAT44Out2InDPO, cls).setUpClass()
5160 cls.vapi.cli("set log class nat level debug")
5163 cls.tcp_port_in = 6303
5164 cls.tcp_port_out = 6303
5165 cls.udp_port_in = 6304
5166 cls.udp_port_out = 6304
5167 cls.icmp_id_in = 6305
5168 cls.icmp_id_out = 6305
5169 cls.nat_addr = '10.0.0.3'
5170 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5171 cls.dst_ip4 = '192.168.70.1'
5173 cls.create_pg_interfaces(range(2))
5176 cls.pg0.config_ip4()
5177 cls.pg0.resolve_arp()
5180 cls.pg1.config_ip6()
5181 cls.pg1.resolve_ndp()
5183 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
5184 dst_address_length=0,
5185 next_hop_address=cls.pg1.remote_ip6n,
5186 next_hop_sw_if_index=cls.pg1.sw_if_index)
5189 super(TestNAT44Out2InDPO, cls).tearDownClass()
5192 def configure_xlat(self):
5193 self.dst_ip6_pfx = '1:2:3::'
5194 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
5196 self.dst_ip6_pfx_len = 96
5197 self.src_ip6_pfx = '4:5:6::'
5198 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
5200 self.src_ip6_pfx_len = 96
5201 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
5202 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
5203 '\x00\x00\x00\x00', 0, is_translation=1,
5206 def test_464xlat_ce(self):
5207 """ Test 464XLAT CE with NAT44 """
5209 nat_config = self.vapi.nat_show_config()
5210 self.assertEqual(1, nat_config.out2in_dpo)
5212 self.configure_xlat()
5214 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5215 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
5217 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
5218 self.dst_ip6_pfx_len)
5219 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
5220 self.src_ip6_pfx_len)
5223 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
5224 self.pg0.add_stream(pkts)
5225 self.pg_enable_capture(self.pg_interfaces)
5227 capture = self.pg1.get_capture(len(pkts))
5228 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
5231 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
5233 self.pg1.add_stream(pkts)
5234 self.pg_enable_capture(self.pg_interfaces)
5236 capture = self.pg0.get_capture(len(pkts))
5237 self.verify_capture_in(capture, self.pg0)
5239 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
5241 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
5242 self.nat_addr_n, is_add=0)
5244 def test_464xlat_ce_no_nat(self):
5245 """ Test 464XLAT CE without NAT44 """
5247 self.configure_xlat()
5249 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
5250 self.dst_ip6_pfx_len)
5251 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
5252 self.src_ip6_pfx_len)
5254 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
5255 self.pg0.add_stream(pkts)
5256 self.pg_enable_capture(self.pg_interfaces)
5258 capture = self.pg1.get_capture(len(pkts))
5259 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
5260 nat_ip=out_dst_ip6, same_port=True)
5262 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
5263 self.pg1.add_stream(pkts)
5264 self.pg_enable_capture(self.pg_interfaces)
5266 capture = self.pg0.get_capture(len(pkts))
5267 self.verify_capture_in(capture, self.pg0)
5270 class TestDeterministicNAT(MethodHolder):
5271 """ Deterministic NAT Test Cases """
5274 def setUpConstants(cls):
5275 super(TestDeterministicNAT, cls).setUpConstants()
5276 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
5279 def setUpClass(cls):
5280 super(TestDeterministicNAT, cls).setUpClass()
5281 cls.vapi.cli("set log class nat level debug")
5284 cls.tcp_port_in = 6303
5285 cls.tcp_external_port = 6303
5286 cls.udp_port_in = 6304
5287 cls.udp_external_port = 6304
5288 cls.icmp_id_in = 6305
5289 cls.nat_addr = '10.0.0.3'
5291 cls.create_pg_interfaces(range(3))
5292 cls.interfaces = list(cls.pg_interfaces)
5294 for i in cls.interfaces:
5299 cls.pg0.generate_remote_hosts(2)
5300 cls.pg0.configure_ipv4_neighbors()
5303 super(TestDeterministicNAT, cls).tearDownClass()
5306 def create_stream_in(self, in_if, out_if, ttl=64):
5308 Create packet stream for inside network
5310 :param in_if: Inside interface
5311 :param out_if: Outside interface
5312 :param ttl: TTL of generated packets
5316 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5317 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5318 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
5322 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5323 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5324 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
5328 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5329 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5330 ICMP(id=self.icmp_id_in, type='echo-request'))
5335 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
5337 Create packet stream for outside network
5339 :param out_if: Outside interface
5340 :param dst_ip: Destination IP address (Default use global NAT address)
5341 :param ttl: TTL of generated packets
5344 dst_ip = self.nat_addr
5347 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5348 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5349 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
5353 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5354 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5355 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
5359 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5360 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5361 ICMP(id=self.icmp_external_id, type='echo-reply'))
5366 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
5368 Verify captured packets on outside network
5370 :param capture: Captured packets
5371 :param nat_ip: Translated IP address (Default use global NAT address)
5372 :param same_port: Sorce port number is not translated (Default False)
5373 :param packet_num: Expected number of packets (Default 3)
5376 nat_ip = self.nat_addr
5377 self.assertEqual(packet_num, len(capture))
5378 for packet in capture:
5380 self.assertEqual(packet[IP].src, nat_ip)
5381 if packet.haslayer(TCP):
5382 self.tcp_port_out = packet[TCP].sport
5383 elif packet.haslayer(UDP):
5384 self.udp_port_out = packet[UDP].sport
5386 self.icmp_external_id = packet[ICMP].id
5388 self.logger.error(ppp("Unexpected or invalid packet "
5389 "(outside network):", packet))
5392 def test_deterministic_mode(self):
5393 """ NAT plugin run deterministic mode """
5394 in_addr = '172.16.255.0'
5395 out_addr = '172.17.255.50'
5396 in_addr_t = '172.16.255.20'
5397 in_addr_n = socket.inet_aton(in_addr)
5398 out_addr_n = socket.inet_aton(out_addr)
5399 in_addr_t_n = socket.inet_aton(in_addr_t)
5403 nat_config = self.vapi.nat_show_config()
5404 self.assertEqual(1, nat_config.deterministic)
5406 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
5408 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
5409 self.assertEqual(rep1.out_addr[:4], out_addr_n)
5410 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
5411 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
5413 deterministic_mappings = self.vapi.nat_det_map_dump()
5414 self.assertEqual(len(deterministic_mappings), 1)
5415 dsm = deterministic_mappings[0]
5416 self.assertEqual(in_addr_n, dsm.in_addr[:4])
5417 self.assertEqual(in_plen, dsm.in_plen)
5418 self.assertEqual(out_addr_n, dsm.out_addr[:4])
5419 self.assertEqual(out_plen, dsm.out_plen)
5421 self.clear_nat_det()
5422 deterministic_mappings = self.vapi.nat_det_map_dump()
5423 self.assertEqual(len(deterministic_mappings), 0)
5425 def test_set_timeouts(self):
5426 """ Set deterministic NAT timeouts """
5427 timeouts_before = self.vapi.nat_get_timeouts()
5429 self.vapi.nat_set_timeouts(timeouts_before.udp + 10,
5430 timeouts_before.tcp_established + 10,
5431 timeouts_before.tcp_transitory + 10,
5432 timeouts_before.icmp + 10)
5434 timeouts_after = self.vapi.nat_get_timeouts()
5436 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
5437 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
5438 self.assertNotEqual(timeouts_before.tcp_established,
5439 timeouts_after.tcp_established)
5440 self.assertNotEqual(timeouts_before.tcp_transitory,
5441 timeouts_after.tcp_transitory)
5443 def test_det_in(self):
5444 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
5446 nat_ip = "10.0.0.10"
5448 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5450 socket.inet_aton(nat_ip),
5452 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5453 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5457 pkts = self.create_stream_in(self.pg0, self.pg1)
5458 self.pg0.add_stream(pkts)
5459 self.pg_enable_capture(self.pg_interfaces)
5461 capture = self.pg1.get_capture(len(pkts))
5462 self.verify_capture_out(capture, nat_ip)
5465 pkts = self.create_stream_out(self.pg1, nat_ip)
5466 self.pg1.add_stream(pkts)
5467 self.pg_enable_capture(self.pg_interfaces)
5469 capture = self.pg0.get_capture(len(pkts))
5470 self.verify_capture_in(capture, self.pg0)
5473 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
5474 self.assertEqual(len(sessions), 3)
5478 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5479 self.assertEqual(s.in_port, self.tcp_port_in)
5480 self.assertEqual(s.out_port, self.tcp_port_out)
5481 self.assertEqual(s.ext_port, self.tcp_external_port)
5485 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5486 self.assertEqual(s.in_port, self.udp_port_in)
5487 self.assertEqual(s.out_port, self.udp_port_out)
5488 self.assertEqual(s.ext_port, self.udp_external_port)
5492 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5493 self.assertEqual(s.in_port, self.icmp_id_in)
5494 self.assertEqual(s.out_port, self.icmp_external_id)
5496 def test_multiple_users(self):
5497 """ Deterministic NAT multiple users """
5499 nat_ip = "10.0.0.10"
5501 external_port = 6303
5503 host0 = self.pg0.remote_hosts[0]
5504 host1 = self.pg0.remote_hosts[1]
5506 self.vapi.nat_det_add_del_map(host0.ip4n,
5508 socket.inet_aton(nat_ip),
5510 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5511 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5515 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
5516 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
5517 TCP(sport=port_in, dport=external_port))
5518 self.pg0.add_stream(p)
5519 self.pg_enable_capture(self.pg_interfaces)
5521 capture = self.pg1.get_capture(1)
5526 self.assertEqual(ip.src, nat_ip)
5527 self.assertEqual(ip.dst, self.pg1.remote_ip4)
5528 self.assertEqual(tcp.dport, external_port)
5529 port_out0 = tcp.sport
5531 self.logger.error(ppp("Unexpected or invalid packet:", p))
5535 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
5536 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
5537 TCP(sport=port_in, dport=external_port))
5538 self.pg0.add_stream(p)
5539 self.pg_enable_capture(self.pg_interfaces)
5541 capture = self.pg1.get_capture(1)
5546 self.assertEqual(ip.src, nat_ip)
5547 self.assertEqual(ip.dst, self.pg1.remote_ip4)
5548 self.assertEqual(tcp.dport, external_port)
5549 port_out1 = tcp.sport
5551 self.logger.error(ppp("Unexpected or invalid packet:", p))
5554 dms = self.vapi.nat_det_map_dump()
5555 self.assertEqual(1, len(dms))
5556 self.assertEqual(2, dms[0].ses_num)
5559 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5560 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
5561 TCP(sport=external_port, dport=port_out0))
5562 self.pg1.add_stream(p)
5563 self.pg_enable_capture(self.pg_interfaces)
5565 capture = self.pg0.get_capture(1)
5570 self.assertEqual(ip.src, self.pg1.remote_ip4)
5571 self.assertEqual(ip.dst, host0.ip4)
5572 self.assertEqual(tcp.dport, port_in)
5573 self.assertEqual(tcp.sport, external_port)
5575 self.logger.error(ppp("Unexpected or invalid packet:", p))
5579 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5580 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
5581 TCP(sport=external_port, dport=port_out1))
5582 self.pg1.add_stream(p)
5583 self.pg_enable_capture(self.pg_interfaces)
5585 capture = self.pg0.get_capture(1)
5590 self.assertEqual(ip.src, self.pg1.remote_ip4)
5591 self.assertEqual(ip.dst, host1.ip4)
5592 self.assertEqual(tcp.dport, port_in)
5593 self.assertEqual(tcp.sport, external_port)
5595 self.logger.error(ppp("Unexpected or invalid packet", p))
5598 # session close api test
5599 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
5601 self.pg1.remote_ip4n,
5603 dms = self.vapi.nat_det_map_dump()
5604 self.assertEqual(dms[0].ses_num, 1)
5606 self.vapi.nat_det_close_session_in(host0.ip4n,
5608 self.pg1.remote_ip4n,
5610 dms = self.vapi.nat_det_map_dump()
5611 self.assertEqual(dms[0].ses_num, 0)
5613 def test_tcp_session_close_detection_in(self):
5614 """ Deterministic NAT TCP session close from inside network """
5615 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5617 socket.inet_aton(self.nat_addr),
5619 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5620 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5623 self.initiate_tcp_session(self.pg0, self.pg1)
5625 # close the session from inside
5627 # FIN packet in -> out
5628 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5629 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5630 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5632 self.pg0.add_stream(p)
5633 self.pg_enable_capture(self.pg_interfaces)
5635 self.pg1.get_capture(1)
5639 # ACK packet out -> in
5640 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5641 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5642 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5646 # FIN packet out -> in
5647 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5648 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5649 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5653 self.pg1.add_stream(pkts)
5654 self.pg_enable_capture(self.pg_interfaces)
5656 self.pg0.get_capture(2)
5658 # ACK packet in -> out
5659 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5660 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5661 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5663 self.pg0.add_stream(p)
5664 self.pg_enable_capture(self.pg_interfaces)
5666 self.pg1.get_capture(1)
5668 # Check if deterministic NAT44 closed the session
5669 dms = self.vapi.nat_det_map_dump()
5670 self.assertEqual(0, dms[0].ses_num)
5672 self.logger.error("TCP session termination failed")
5675 def test_tcp_session_close_detection_out(self):
5676 """ Deterministic NAT TCP session close from outside network """
5677 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5679 socket.inet_aton(self.nat_addr),
5681 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5682 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5685 self.initiate_tcp_session(self.pg0, self.pg1)
5687 # close the session from outside
5689 # FIN packet out -> in
5690 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5691 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5692 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5694 self.pg1.add_stream(p)
5695 self.pg_enable_capture(self.pg_interfaces)
5697 self.pg0.get_capture(1)
5701 # ACK packet in -> out
5702 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5703 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5704 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5708 # ACK packet in -> out
5709 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5710 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5711 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5715 self.pg0.add_stream(pkts)
5716 self.pg_enable_capture(self.pg_interfaces)
5718 self.pg1.get_capture(2)
5720 # ACK packet out -> in
5721 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5722 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5723 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5725 self.pg1.add_stream(p)
5726 self.pg_enable_capture(self.pg_interfaces)
5728 self.pg0.get_capture(1)
5730 # Check if deterministic NAT44 closed the session
5731 dms = self.vapi.nat_det_map_dump()
5732 self.assertEqual(0, dms[0].ses_num)
5734 self.logger.error("TCP session termination failed")
5737 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5738 def test_session_timeout(self):
5739 """ Deterministic NAT session timeouts """
5740 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5742 socket.inet_aton(self.nat_addr),
5744 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5745 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5748 self.initiate_tcp_session(self.pg0, self.pg1)
5749 self.vapi.nat_set_timeouts(5, 5, 5, 5)
5750 pkts = self.create_stream_in(self.pg0, self.pg1)
5751 self.pg0.add_stream(pkts)
5752 self.pg_enable_capture(self.pg_interfaces)
5754 capture = self.pg1.get_capture(len(pkts))
5757 dms = self.vapi.nat_det_map_dump()
5758 self.assertEqual(0, dms[0].ses_num)
5760 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5761 def test_session_limit_per_user(self):
5762 """ Deterministic NAT maximum sessions per user limit """
5763 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5765 socket.inet_aton(self.nat_addr),
5767 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5768 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5770 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
5771 src_address=self.pg2.local_ip4n,
5773 template_interval=10)
5774 self.vapi.nat_ipfix()
5777 for port in range(1025, 2025):
5778 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5779 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5780 UDP(sport=port, dport=port))
5783 self.pg0.add_stream(pkts)
5784 self.pg_enable_capture(self.pg_interfaces)
5786 capture = self.pg1.get_capture(len(pkts))
5788 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5789 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5790 UDP(sport=3001, dport=3002))
5791 self.pg0.add_stream(p)
5792 self.pg_enable_capture(self.pg_interfaces)
5794 capture = self.pg1.assert_nothing_captured()
5796 # verify ICMP error packet
5797 capture = self.pg0.get_capture(1)
5799 self.assertTrue(p.haslayer(ICMP))
5801 self.assertEqual(icmp.type, 3)
5802 self.assertEqual(icmp.code, 1)
5803 self.assertTrue(icmp.haslayer(IPerror))
5804 inner_ip = icmp[IPerror]
5805 self.assertEqual(inner_ip[UDPerror].sport, 3001)
5806 self.assertEqual(inner_ip[UDPerror].dport, 3002)
5808 dms = self.vapi.nat_det_map_dump()
5810 self.assertEqual(1000, dms[0].ses_num)
5812 # verify IPFIX logging
5813 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5815 capture = self.pg2.get_capture(2)
5816 ipfix = IPFIXDecoder()
5817 # first load template
5819 self.assertTrue(p.haslayer(IPFIX))
5820 if p.haslayer(Template):
5821 ipfix.add_template(p.getlayer(Template))
5822 # verify events in data set
5824 if p.haslayer(Data):
5825 data = ipfix.decode_data_set(p.getlayer(Set))
5826 self.verify_ipfix_max_entries_per_user(data,
5828 self.pg0.remote_ip4n)
5830 def clear_nat_det(self):
5832 Clear deterministic NAT configuration.
5834 self.vapi.nat_ipfix(enable=0)
5835 self.vapi.nat_set_timeouts()
5836 deterministic_mappings = self.vapi.nat_det_map_dump()
5837 for dsm in deterministic_mappings:
5838 self.vapi.nat_det_add_del_map(dsm.in_addr,
5844 interfaces = self.vapi.nat44_interface_dump()
5845 for intf in interfaces:
5846 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
5851 super(TestDeterministicNAT, self).tearDown()
5852 if not self.vpp_dead:
5853 self.logger.info(self.vapi.cli("show nat44 interfaces"))
5854 self.logger.info(self.vapi.cli("show nat timeouts"))
5856 self.vapi.cli("show nat44 deterministic mappings"))
5858 self.vapi.cli("show nat44 deterministic sessions"))
5859 self.clear_nat_det()
5862 class TestNAT64(MethodHolder):
5863 """ NAT64 Test Cases """
5866 def setUpConstants(cls):
5867 super(TestNAT64, cls).setUpConstants()
5868 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
5869 "nat64 st hash buckets 256", "}"])
5872 def setUpClass(cls):
5873 super(TestNAT64, cls).setUpClass()
5876 cls.tcp_port_in = 6303
5877 cls.tcp_port_out = 6303
5878 cls.udp_port_in = 6304
5879 cls.udp_port_out = 6304
5880 cls.icmp_id_in = 6305
5881 cls.icmp_id_out = 6305
5882 cls.nat_addr = '10.0.0.3'
5883 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5885 cls.vrf1_nat_addr = '10.0.10.3'
5886 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
5888 cls.ipfix_src_port = 4739
5889 cls.ipfix_domain_id = 1
5891 cls.create_pg_interfaces(range(6))
5892 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
5893 cls.ip6_interfaces.append(cls.pg_interfaces[2])
5894 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
5896 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
5898 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
5900 cls.pg0.generate_remote_hosts(2)
5902 for i in cls.ip6_interfaces:
5905 i.configure_ipv6_neighbors()
5907 for i in cls.ip4_interfaces:
5913 cls.pg3.config_ip4()
5914 cls.pg3.resolve_arp()
5915 cls.pg3.config_ip6()
5916 cls.pg3.configure_ipv6_neighbors()
5919 cls.pg5.config_ip6()
5922 super(TestNAT64, cls).tearDownClass()
5925 def test_nat64_inside_interface_handles_neighbor_advertisement(self):
5926 """ NAT64 inside interface handles Neighbor Advertisement """
5928 self.vapi.nat64_add_del_interface(self.pg5.sw_if_index)
5931 ping = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
5932 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
5933 ICMPv6EchoRequest())
5935 self.pg5.add_stream(pkts)
5936 self.pg_enable_capture(self.pg_interfaces)
5939 # Wait for Neighbor Solicitation
5940 capture = self.pg5.get_capture(len(pkts))
5941 self.assertEqual(1, len(capture))
5944 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
5945 self.assertTrue(packet.haslayer(ICMPv6ND_NS))
5946 tgt = packet[ICMPv6ND_NS].tgt
5948 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5951 # Send Neighbor Advertisement
5952 p = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
5953 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
5954 ICMPv6ND_NA(tgt=tgt) /
5955 ICMPv6NDOptDstLLAddr(lladdr=self.pg5.remote_mac))
5957 self.pg5.add_stream(pkts)
5958 self.pg_enable_capture(self.pg_interfaces)
5961 # Try to send ping again
5963 self.pg5.add_stream(pkts)
5964 self.pg_enable_capture(self.pg_interfaces)
5967 # Wait for ping reply
5968 capture = self.pg5.get_capture(len(pkts))
5969 self.assertEqual(1, len(capture))
5972 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
5973 self.assertEqual(packet[IPv6].dst, self.pg5.remote_ip6)
5974 self.assertTrue(packet.haslayer(ICMPv6EchoReply))
5976 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5979 def test_pool(self):
5980 """ Add/delete address to NAT64 pool """
5981 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
5983 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
5985 addresses = self.vapi.nat64_pool_addr_dump()
5986 self.assertEqual(len(addresses), 1)
5987 self.assertEqual(addresses[0].address, nat_addr)
5989 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
5991 addresses = self.vapi.nat64_pool_addr_dump()
5992 self.assertEqual(len(addresses), 0)
5994 def test_interface(self):
5995 """ Enable/disable NAT64 feature on the interface """
5996 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5997 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5999 interfaces = self.vapi.nat64_interface_dump()
6000 self.assertEqual(len(interfaces), 2)
6003 for intf in interfaces:
6004 if intf.sw_if_index == self.pg0.sw_if_index:
6005 self.assertEqual(intf.is_inside, 1)
6007 elif intf.sw_if_index == self.pg1.sw_if_index:
6008 self.assertEqual(intf.is_inside, 0)
6010 self.assertTrue(pg0_found)
6011 self.assertTrue(pg1_found)
6013 features = self.vapi.cli("show interface features pg0")
6014 self.assertNotEqual(features.find('nat64-in2out'), -1)
6015 features = self.vapi.cli("show interface features pg1")
6016 self.assertNotEqual(features.find('nat64-out2in'), -1)
6018 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
6019 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
6021 interfaces = self.vapi.nat64_interface_dump()
6022 self.assertEqual(len(interfaces), 0)
6024 def test_static_bib(self):
6025 """ Add/delete static BIB entry """
6026 in_addr = socket.inet_pton(socket.AF_INET6,
6027 '2001:db8:85a3::8a2e:370:7334')
6028 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
6031 proto = IP_PROTOS.tcp
6033 self.vapi.nat64_add_del_static_bib(in_addr,
6038 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
6043 self.assertEqual(bibe.i_addr, in_addr)
6044 self.assertEqual(bibe.o_addr, out_addr)
6045 self.assertEqual(bibe.i_port, in_port)
6046 self.assertEqual(bibe.o_port, out_port)
6047 self.assertEqual(static_bib_num, 1)
6049 self.vapi.nat64_add_del_static_bib(in_addr,
6055 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
6060 self.assertEqual(static_bib_num, 0)
6062 def test_set_timeouts(self):
6063 """ Set NAT64 timeouts """
6064 # verify default values
6065 timeouts = self.vapi.nat_get_timeouts()
6066 self.assertEqual(timeouts.udp, 300)
6067 self.assertEqual(timeouts.icmp, 60)
6068 self.assertEqual(timeouts.tcp_transitory, 240)
6069 self.assertEqual(timeouts.tcp_established, 7440)
6071 # set and verify custom values
6072 self.vapi.nat_set_timeouts(udp=200, icmp=30, tcp_transitory=250,
6073 tcp_established=7450)
6074 timeouts = self.vapi.nat_get_timeouts()
6075 self.assertEqual(timeouts.udp, 200)
6076 self.assertEqual(timeouts.icmp, 30)
6077 self.assertEqual(timeouts.tcp_transitory, 250)
6078 self.assertEqual(timeouts.tcp_established, 7450)
6080 def test_dynamic(self):
6081 """ NAT64 dynamic translation test """
6082 self.tcp_port_in = 6303
6083 self.udp_port_in = 6304
6084 self.icmp_id_in = 6305
6086 ses_num_start = self.nat64_get_ses_num()
6088 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6090 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6091 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6094 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6095 self.pg0.add_stream(pkts)
6096 self.pg_enable_capture(self.pg_interfaces)
6098 capture = self.pg1.get_capture(len(pkts))
6099 self.verify_capture_out(capture, nat_ip=self.nat_addr,
6100 dst_ip=self.pg1.remote_ip4)
6103 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6104 self.pg1.add_stream(pkts)
6105 self.pg_enable_capture(self.pg_interfaces)
6107 capture = self.pg0.get_capture(len(pkts))
6108 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
6109 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
6112 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6113 self.pg0.add_stream(pkts)
6114 self.pg_enable_capture(self.pg_interfaces)
6116 capture = self.pg1.get_capture(len(pkts))
6117 self.verify_capture_out(capture, nat_ip=self.nat_addr,
6118 dst_ip=self.pg1.remote_ip4)
6121 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6122 self.pg1.add_stream(pkts)
6123 self.pg_enable_capture(self.pg_interfaces)
6125 capture = self.pg0.get_capture(len(pkts))
6126 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
6128 ses_num_end = self.nat64_get_ses_num()
6130 self.assertEqual(ses_num_end - ses_num_start, 3)
6132 # tenant with specific VRF
6133 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
6134 self.vrf1_nat_addr_n,
6135 vrf_id=self.vrf1_id)
6136 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
6138 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
6139 self.pg2.add_stream(pkts)
6140 self.pg_enable_capture(self.pg_interfaces)
6142 capture = self.pg1.get_capture(len(pkts))
6143 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
6144 dst_ip=self.pg1.remote_ip4)
6146 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
6147 self.pg1.add_stream(pkts)
6148 self.pg_enable_capture(self.pg_interfaces)
6150 capture = self.pg2.get_capture(len(pkts))
6151 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
6153 def test_static(self):
6154 """ NAT64 static translation test """
6155 self.tcp_port_in = 60303
6156 self.udp_port_in = 60304
6157 self.icmp_id_in = 60305
6158 self.tcp_port_out = 60303
6159 self.udp_port_out = 60304
6160 self.icmp_id_out = 60305
6162 ses_num_start = self.nat64_get_ses_num()
6164 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6166 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6167 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6169 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
6174 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
6179 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
6186 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6187 self.pg0.add_stream(pkts)
6188 self.pg_enable_capture(self.pg_interfaces)
6190 capture = self.pg1.get_capture(len(pkts))
6191 self.verify_capture_out(capture, nat_ip=self.nat_addr,
6192 dst_ip=self.pg1.remote_ip4, same_port=True)
6195 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6196 self.pg1.add_stream(pkts)
6197 self.pg_enable_capture(self.pg_interfaces)
6199 capture = self.pg0.get_capture(len(pkts))
6200 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
6201 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
6203 ses_num_end = self.nat64_get_ses_num()
6205 self.assertEqual(ses_num_end - ses_num_start, 3)
6207 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
6208 def test_session_timeout(self):
6209 """ NAT64 session timeout """
6210 self.icmp_id_in = 1234
6211 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6213 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6214 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6215 self.vapi.nat_set_timeouts(icmp=5, tcp_transitory=5, tcp_established=5)
6217 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6218 self.pg0.add_stream(pkts)
6219 self.pg_enable_capture(self.pg_interfaces)
6221 capture = self.pg1.get_capture(len(pkts))
6223 ses_num_before_timeout = self.nat64_get_ses_num()
6227 # ICMP and TCP session after timeout
6228 ses_num_after_timeout = self.nat64_get_ses_num()
6229 self.assertEqual(ses_num_before_timeout - ses_num_after_timeout, 2)
6231 def test_icmp_error(self):
6232 """ NAT64 ICMP Error message translation """
6233 self.tcp_port_in = 6303
6234 self.udp_port_in = 6304
6235 self.icmp_id_in = 6305
6237 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6239 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6240 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6242 # send some packets to create sessions
6243 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6244 self.pg0.add_stream(pkts)
6245 self.pg_enable_capture(self.pg_interfaces)
6247 capture_ip4 = self.pg1.get_capture(len(pkts))
6248 self.verify_capture_out(capture_ip4,
6249 nat_ip=self.nat_addr,
6250 dst_ip=self.pg1.remote_ip4)
6252 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6253 self.pg1.add_stream(pkts)
6254 self.pg_enable_capture(self.pg_interfaces)
6256 capture_ip6 = self.pg0.get_capture(len(pkts))
6257 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
6258 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
6259 self.pg0.remote_ip6)
6262 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6263 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
6264 ICMPv6DestUnreach(code=1) /
6265 packet[IPv6] for packet in capture_ip6]
6266 self.pg0.add_stream(pkts)
6267 self.pg_enable_capture(self.pg_interfaces)
6269 capture = self.pg1.get_capture(len(pkts))
6270 for packet in capture:
6272 self.assertEqual(packet[IP].src, self.nat_addr)
6273 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
6274 self.assertEqual(packet[ICMP].type, 3)
6275 self.assertEqual(packet[ICMP].code, 13)
6276 inner = packet[IPerror]
6277 self.assertEqual(inner.src, self.pg1.remote_ip4)
6278 self.assertEqual(inner.dst, self.nat_addr)
6279 self.assert_packet_checksums_valid(packet)
6280 if inner.haslayer(TCPerror):
6281 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
6282 elif inner.haslayer(UDPerror):
6283 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
6285 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
6287 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6291 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6292 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
6293 ICMP(type=3, code=13) /
6294 packet[IP] for packet in capture_ip4]
6295 self.pg1.add_stream(pkts)
6296 self.pg_enable_capture(self.pg_interfaces)
6298 capture = self.pg0.get_capture(len(pkts))
6299 for packet in capture:
6301 self.assertEqual(packet[IPv6].src, ip.src)
6302 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6303 icmp = packet[ICMPv6DestUnreach]
6304 self.assertEqual(icmp.code, 1)
6305 inner = icmp[IPerror6]
6306 self.assertEqual(inner.src, self.pg0.remote_ip6)
6307 self.assertEqual(inner.dst, ip.src)
6308 self.assert_icmpv6_checksum_valid(packet)
6309 if inner.haslayer(TCPerror):
6310 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
6311 elif inner.haslayer(UDPerror):
6312 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
6314 self.assertEqual(inner[ICMPv6EchoRequest].id,
6317 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6320 def test_hairpinning(self):
6321 """ NAT64 hairpinning """
6323 client = self.pg0.remote_hosts[0]
6324 server = self.pg0.remote_hosts[1]
6325 server_tcp_in_port = 22
6326 server_tcp_out_port = 4022
6327 server_udp_in_port = 23
6328 server_udp_out_port = 4023
6329 client_tcp_in_port = 1234
6330 client_udp_in_port = 1235
6331 client_tcp_out_port = 0
6332 client_udp_out_port = 0
6333 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
6334 nat_addr_ip6 = ip.src
6336 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6338 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6339 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6341 self.vapi.nat64_add_del_static_bib(server.ip6n,
6344 server_tcp_out_port,
6346 self.vapi.nat64_add_del_static_bib(server.ip6n,
6349 server_udp_out_port,
6354 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6355 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6356 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
6358 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6359 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6360 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
6362 self.pg0.add_stream(pkts)
6363 self.pg_enable_capture(self.pg_interfaces)
6365 capture = self.pg0.get_capture(len(pkts))
6366 for packet in capture:
6368 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6369 self.assertEqual(packet[IPv6].dst, server.ip6)
6370 self.assert_packet_checksums_valid(packet)
6371 if packet.haslayer(TCP):
6372 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
6373 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
6374 client_tcp_out_port = packet[TCP].sport
6376 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
6377 self.assertEqual(packet[UDP].dport, server_udp_in_port)
6378 client_udp_out_port = packet[UDP].sport
6380 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6385 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6386 IPv6(src=server.ip6, dst=nat_addr_ip6) /
6387 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
6389 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6390 IPv6(src=server.ip6, dst=nat_addr_ip6) /
6391 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
6393 self.pg0.add_stream(pkts)
6394 self.pg_enable_capture(self.pg_interfaces)
6396 capture = self.pg0.get_capture(len(pkts))
6397 for packet in capture:
6399 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6400 self.assertEqual(packet[IPv6].dst, client.ip6)
6401 self.assert_packet_checksums_valid(packet)
6402 if packet.haslayer(TCP):
6403 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
6404 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
6406 self.assertEqual(packet[UDP].sport, server_udp_out_port)
6407 self.assertEqual(packet[UDP].dport, client_udp_in_port)
6409 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6414 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6415 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6416 ICMPv6DestUnreach(code=1) /
6417 packet[IPv6] for packet in capture]
6418 self.pg0.add_stream(pkts)
6419 self.pg_enable_capture(self.pg_interfaces)
6421 capture = self.pg0.get_capture(len(pkts))
6422 for packet in capture:
6424 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6425 self.assertEqual(packet[IPv6].dst, server.ip6)
6426 icmp = packet[ICMPv6DestUnreach]
6427 self.assertEqual(icmp.code, 1)
6428 inner = icmp[IPerror6]
6429 self.assertEqual(inner.src, server.ip6)
6430 self.assertEqual(inner.dst, nat_addr_ip6)
6431 self.assert_packet_checksums_valid(packet)
6432 if inner.haslayer(TCPerror):
6433 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
6434 self.assertEqual(inner[TCPerror].dport,
6435 client_tcp_out_port)
6437 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
6438 self.assertEqual(inner[UDPerror].dport,
6439 client_udp_out_port)
6441 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6444 def test_prefix(self):
6445 """ NAT64 Network-Specific Prefix """
6447 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6449 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6450 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6451 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
6452 self.vrf1_nat_addr_n,
6453 vrf_id=self.vrf1_id)
6454 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
6457 global_pref64 = "2001:db8::"
6458 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
6459 global_pref64_len = 32
6460 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
6462 prefix = self.vapi.nat64_prefix_dump()
6463 self.assertEqual(len(prefix), 1)
6464 self.assertEqual(prefix[0].prefix, global_pref64_n)
6465 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
6466 self.assertEqual(prefix[0].vrf_id, 0)
6468 # Add tenant specific prefix
6469 vrf1_pref64 = "2001:db8:122:300::"
6470 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
6471 vrf1_pref64_len = 56
6472 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
6474 vrf_id=self.vrf1_id)
6475 prefix = self.vapi.nat64_prefix_dump()
6476 self.assertEqual(len(prefix), 2)
6479 pkts = self.create_stream_in_ip6(self.pg0,
6482 plen=global_pref64_len)
6483 self.pg0.add_stream(pkts)
6484 self.pg_enable_capture(self.pg_interfaces)
6486 capture = self.pg1.get_capture(len(pkts))
6487 self.verify_capture_out(capture, nat_ip=self.nat_addr,
6488 dst_ip=self.pg1.remote_ip4)
6490 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6491 self.pg1.add_stream(pkts)
6492 self.pg_enable_capture(self.pg_interfaces)
6494 capture = self.pg0.get_capture(len(pkts))
6495 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
6498 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
6500 # Tenant specific prefix
6501 pkts = self.create_stream_in_ip6(self.pg2,
6504 plen=vrf1_pref64_len)
6505 self.pg2.add_stream(pkts)
6506 self.pg_enable_capture(self.pg_interfaces)
6508 capture = self.pg1.get_capture(len(pkts))
6509 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
6510 dst_ip=self.pg1.remote_ip4)
6512 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
6513 self.pg1.add_stream(pkts)
6514 self.pg_enable_capture(self.pg_interfaces)
6516 capture = self.pg2.get_capture(len(pkts))
6517 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
6520 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
6522 def test_unknown_proto(self):
6523 """ NAT64 translate packet with unknown protocol """
6525 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6527 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6528 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6529 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6532 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6533 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
6534 TCP(sport=self.tcp_port_in, dport=20))
6535 self.pg0.add_stream(p)
6536 self.pg_enable_capture(self.pg_interfaces)
6538 p = self.pg1.get_capture(1)
6540 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6541 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
6543 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
6544 TCP(sport=1234, dport=1234))
6545 self.pg0.add_stream(p)
6546 self.pg_enable_capture(self.pg_interfaces)
6548 p = self.pg1.get_capture(1)
6551 self.assertEqual(packet[IP].src, self.nat_addr)
6552 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
6553 self.assertTrue(packet.haslayer(GRE))
6554 self.assert_packet_checksums_valid(packet)
6556 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6560 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6561 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
6563 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
6564 TCP(sport=1234, dport=1234))
6565 self.pg1.add_stream(p)
6566 self.pg_enable_capture(self.pg_interfaces)
6568 p = self.pg0.get_capture(1)
6571 self.assertEqual(packet[IPv6].src, remote_ip6)
6572 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6573 self.assertEqual(packet[IPv6].nh, 47)
6575 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6578 def test_hairpinning_unknown_proto(self):
6579 """ NAT64 translate packet with unknown protocol - hairpinning """
6581 client = self.pg0.remote_hosts[0]
6582 server = self.pg0.remote_hosts[1]
6583 server_tcp_in_port = 22
6584 server_tcp_out_port = 4022
6585 client_tcp_in_port = 1234
6586 client_tcp_out_port = 1235
6587 server_nat_ip = "10.0.0.100"
6588 client_nat_ip = "10.0.0.110"
6589 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
6590 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
6591 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
6592 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
6594 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
6596 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6597 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6599 self.vapi.nat64_add_del_static_bib(server.ip6n,
6602 server_tcp_out_port,
6605 self.vapi.nat64_add_del_static_bib(server.ip6n,
6611 self.vapi.nat64_add_del_static_bib(client.ip6n,
6614 client_tcp_out_port,
6618 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6619 IPv6(src=client.ip6, dst=server_nat_ip6) /
6620 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
6621 self.pg0.add_stream(p)
6622 self.pg_enable_capture(self.pg_interfaces)
6624 p = self.pg0.get_capture(1)
6626 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6627 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
6629 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
6630 TCP(sport=1234, dport=1234))
6631 self.pg0.add_stream(p)
6632 self.pg_enable_capture(self.pg_interfaces)
6634 p = self.pg0.get_capture(1)
6637 self.assertEqual(packet[IPv6].src, client_nat_ip6)
6638 self.assertEqual(packet[IPv6].dst, server.ip6)
6639 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
6641 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6645 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6646 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
6648 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
6649 TCP(sport=1234, dport=1234))
6650 self.pg0.add_stream(p)
6651 self.pg_enable_capture(self.pg_interfaces)
6653 p = self.pg0.get_capture(1)
6656 self.assertEqual(packet[IPv6].src, server_nat_ip6)
6657 self.assertEqual(packet[IPv6].dst, client.ip6)
6658 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
6660 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6663 def test_one_armed_nat64(self):
6664 """ One armed NAT64 """
6666 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
6670 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6672 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
6673 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
6676 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
6677 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
6678 TCP(sport=12345, dport=80))
6679 self.pg3.add_stream(p)
6680 self.pg_enable_capture(self.pg_interfaces)
6682 capture = self.pg3.get_capture(1)
6687 self.assertEqual(ip.src, self.nat_addr)
6688 self.assertEqual(ip.dst, self.pg3.remote_ip4)
6689 self.assertNotEqual(tcp.sport, 12345)
6690 external_port = tcp.sport
6691 self.assertEqual(tcp.dport, 80)
6692 self.assert_packet_checksums_valid(p)
6694 self.logger.error(ppp("Unexpected or invalid packet:", p))
6698 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
6699 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
6700 TCP(sport=80, dport=external_port))
6701 self.pg3.add_stream(p)
6702 self.pg_enable_capture(self.pg_interfaces)
6704 capture = self.pg3.get_capture(1)
6709 self.assertEqual(ip.src, remote_host_ip6)
6710 self.assertEqual(ip.dst, self.pg3.remote_ip6)
6711 self.assertEqual(tcp.sport, 80)
6712 self.assertEqual(tcp.dport, 12345)
6713 self.assert_packet_checksums_valid(p)
6715 self.logger.error(ppp("Unexpected or invalid packet:", p))
6718 def test_frag_in_order(self):
6719 """ NAT64 translate fragments arriving in order """
6720 self.tcp_port_in = random.randint(1025, 65535)
6722 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6724 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6725 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6727 reass = self.vapi.nat_reass_dump()
6728 reass_n_start = len(reass)
6732 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6733 self.tcp_port_in, 20, data)
6734 self.pg0.add_stream(pkts)
6735 self.pg_enable_capture(self.pg_interfaces)
6737 frags = self.pg1.get_capture(len(pkts))
6738 p = self.reass_frags_and_verify(frags,
6740 self.pg1.remote_ip4)
6741 self.assertEqual(p[TCP].dport, 20)
6742 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
6743 self.tcp_port_out = p[TCP].sport
6744 self.assertEqual(data, p[Raw].load)
6747 data = "A" * 4 + "b" * 16 + "C" * 3
6748 pkts = self.create_stream_frag(self.pg1,
6753 self.pg1.add_stream(pkts)
6754 self.pg_enable_capture(self.pg_interfaces)
6756 frags = self.pg0.get_capture(len(pkts))
6757 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6758 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
6759 self.assertEqual(p[TCP].sport, 20)
6760 self.assertEqual(p[TCP].dport, self.tcp_port_in)
6761 self.assertEqual(data, p[Raw].load)
6763 reass = self.vapi.nat_reass_dump()
6764 reass_n_end = len(reass)
6766 self.assertEqual(reass_n_end - reass_n_start, 2)
6768 def test_reass_hairpinning(self):
6769 """ NAT64 fragments hairpinning """
6771 server = self.pg0.remote_hosts[1]
6772 server_in_port = random.randint(1025, 65535)
6773 server_out_port = random.randint(1025, 65535)
6774 client_in_port = random.randint(1025, 65535)
6775 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
6776 nat_addr_ip6 = ip.src
6778 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6780 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6781 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6783 # add static BIB entry for server
6784 self.vapi.nat64_add_del_static_bib(server.ip6n,
6790 # send packet from host to server
6791 pkts = self.create_stream_frag_ip6(self.pg0,
6796 self.pg0.add_stream(pkts)
6797 self.pg_enable_capture(self.pg_interfaces)
6799 frags = self.pg0.get_capture(len(pkts))
6800 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
6801 self.assertNotEqual(p[TCP].sport, client_in_port)
6802 self.assertEqual(p[TCP].dport, server_in_port)
6803 self.assertEqual(data, p[Raw].load)
6805 def test_frag_out_of_order(self):
6806 """ NAT64 translate fragments arriving out of order """
6807 self.tcp_port_in = random.randint(1025, 65535)
6809 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6811 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6812 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6816 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6817 self.tcp_port_in, 20, data)
6819 self.pg0.add_stream(pkts)
6820 self.pg_enable_capture(self.pg_interfaces)
6822 frags = self.pg1.get_capture(len(pkts))
6823 p = self.reass_frags_and_verify(frags,
6825 self.pg1.remote_ip4)
6826 self.assertEqual(p[TCP].dport, 20)
6827 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
6828 self.tcp_port_out = p[TCP].sport
6829 self.assertEqual(data, p[Raw].load)
6832 data = "A" * 4 + "B" * 16 + "C" * 3
6833 pkts = self.create_stream_frag(self.pg1,
6839 self.pg1.add_stream(pkts)
6840 self.pg_enable_capture(self.pg_interfaces)
6842 frags = self.pg0.get_capture(len(pkts))
6843 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6844 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
6845 self.assertEqual(p[TCP].sport, 20)
6846 self.assertEqual(p[TCP].dport, self.tcp_port_in)
6847 self.assertEqual(data, p[Raw].load)
6849 def test_interface_addr(self):
6850 """ Acquire NAT64 pool addresses from interface """
6851 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
6853 # no address in NAT64 pool
6854 adresses = self.vapi.nat44_address_dump()
6855 self.assertEqual(0, len(adresses))
6857 # configure interface address and check NAT64 address pool
6858 self.pg4.config_ip4()
6859 addresses = self.vapi.nat64_pool_addr_dump()
6860 self.assertEqual(len(addresses), 1)
6861 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
6863 # remove interface address and check NAT64 address pool
6864 self.pg4.unconfig_ip4()
6865 addresses = self.vapi.nat64_pool_addr_dump()
6866 self.assertEqual(0, len(adresses))
6868 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
6869 def test_ipfix_max_bibs_sessions(self):
6870 """ IPFIX logging maximum session and BIB entries exceeded """
6873 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
6877 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6879 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6880 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6884 for i in range(0, max_bibs):
6885 src = "fd01:aa::%x" % (i)
6886 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6887 IPv6(src=src, dst=remote_host_ip6) /
6888 TCP(sport=12345, dport=80))
6890 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6891 IPv6(src=src, dst=remote_host_ip6) /
6892 TCP(sport=12345, dport=22))
6894 self.pg0.add_stream(pkts)
6895 self.pg_enable_capture(self.pg_interfaces)
6897 self.pg1.get_capture(max_sessions)
6899 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6900 src_address=self.pg3.local_ip4n,
6902 template_interval=10)
6903 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6904 src_port=self.ipfix_src_port)
6906 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6907 IPv6(src=src, dst=remote_host_ip6) /
6908 TCP(sport=12345, dport=25))
6909 self.pg0.add_stream(p)
6910 self.pg_enable_capture(self.pg_interfaces)
6912 self.pg1.assert_nothing_captured()
6914 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6915 capture = self.pg3.get_capture(9)
6916 ipfix = IPFIXDecoder()
6917 # first load template
6919 self.assertTrue(p.haslayer(IPFIX))
6920 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6921 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6922 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6923 self.assertEqual(p[UDP].dport, 4739)
6924 self.assertEqual(p[IPFIX].observationDomainID,
6925 self.ipfix_domain_id)
6926 if p.haslayer(Template):
6927 ipfix.add_template(p.getlayer(Template))
6928 # verify events in data set
6930 if p.haslayer(Data):
6931 data = ipfix.decode_data_set(p.getlayer(Set))
6932 self.verify_ipfix_max_sessions(data, max_sessions)
6934 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6935 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
6936 TCP(sport=12345, dport=80))
6937 self.pg0.add_stream(p)
6938 self.pg_enable_capture(self.pg_interfaces)
6940 self.pg1.assert_nothing_captured()
6942 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6943 capture = self.pg3.get_capture(1)
6944 # verify events in data set
6946 self.assertTrue(p.haslayer(IPFIX))
6947 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6948 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6949 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6950 self.assertEqual(p[UDP].dport, 4739)
6951 self.assertEqual(p[IPFIX].observationDomainID,
6952 self.ipfix_domain_id)
6953 if p.haslayer(Data):
6954 data = ipfix.decode_data_set(p.getlayer(Set))
6955 self.verify_ipfix_max_bibs(data, max_bibs)
6957 def test_ipfix_max_frags(self):
6958 """ IPFIX logging maximum fragments pending reassembly exceeded """
6959 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6961 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6962 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6963 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
6964 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6965 src_address=self.pg3.local_ip4n,
6967 template_interval=10)
6968 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6969 src_port=self.ipfix_src_port)
6972 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6973 self.tcp_port_in, 20, data)
6974 self.pg0.add_stream(pkts[-1])
6975 self.pg_enable_capture(self.pg_interfaces)
6977 self.pg1.assert_nothing_captured()
6979 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6980 capture = self.pg3.get_capture(9)
6981 ipfix = IPFIXDecoder()
6982 # first load template
6984 self.assertTrue(p.haslayer(IPFIX))
6985 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6986 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6987 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6988 self.assertEqual(p[UDP].dport, 4739)
6989 self.assertEqual(p[IPFIX].observationDomainID,
6990 self.ipfix_domain_id)
6991 if p.haslayer(Template):
6992 ipfix.add_template(p.getlayer(Template))
6993 # verify events in data set
6995 if p.haslayer(Data):
6996 data = ipfix.decode_data_set(p.getlayer(Set))
6997 self.verify_ipfix_max_fragments_ip6(data, 0,
6998 self.pg0.remote_ip6n)
7000 def test_ipfix_bib_ses(self):
7001 """ IPFIX logging NAT64 BIB/session create and delete events """
7002 self.tcp_port_in = random.randint(1025, 65535)
7003 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
7007 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
7009 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
7010 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
7011 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
7012 src_address=self.pg3.local_ip4n,
7014 template_interval=10)
7015 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
7016 src_port=self.ipfix_src_port)
7019 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
7020 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
7021 TCP(sport=self.tcp_port_in, dport=25))
7022 self.pg0.add_stream(p)
7023 self.pg_enable_capture(self.pg_interfaces)
7025 p = self.pg1.get_capture(1)
7026 self.tcp_port_out = p[0][TCP].sport
7027 self.vapi.cli("ipfix flush") # FIXME this should be an API call
7028 capture = self.pg3.get_capture(10)
7029 ipfix = IPFIXDecoder()
7030 # first load template
7032 self.assertTrue(p.haslayer(IPFIX))
7033 self.assertEqual(p[IP].src, self.pg3.local_ip4)
7034 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
7035 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
7036 self.assertEqual(p[UDP].dport, 4739)
7037 self.assertEqual(p[IPFIX].observationDomainID,
7038 self.ipfix_domain_id)
7039 if p.haslayer(Template):
7040 ipfix.add_template(p.getlayer(Template))
7041 # verify events in data set
7043 if p.haslayer(Data):
7044 data = ipfix.decode_data_set(p.getlayer(Set))
7045 if ord(data[0][230]) == 10:
7046 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
7047 elif ord(data[0][230]) == 6:
7048 self.verify_ipfix_nat64_ses(data,
7050 self.pg0.remote_ip6n,
7051 self.pg1.remote_ip4,
7054 self.logger.error(ppp("Unexpected or invalid packet: ", p))
7057 self.pg_enable_capture(self.pg_interfaces)
7058 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
7061 self.vapi.cli("ipfix flush") # FIXME this should be an API call
7062 capture = self.pg3.get_capture(2)
7063 # verify events in data set
7065 self.assertTrue(p.haslayer(IPFIX))
7066 self.assertEqual(p[IP].src, self.pg3.local_ip4)
7067 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
7068 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
7069 self.assertEqual(p[UDP].dport, 4739)
7070 self.assertEqual(p[IPFIX].observationDomainID,
7071 self.ipfix_domain_id)
7072 if p.haslayer(Data):
7073 data = ipfix.decode_data_set(p.getlayer(Set))
7074 if ord(data[0][230]) == 11:
7075 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
7076 elif ord(data[0][230]) == 7:
7077 self.verify_ipfix_nat64_ses(data,
7079 self.pg0.remote_ip6n,
7080 self.pg1.remote_ip4,
7083 self.logger.error(ppp("Unexpected or invalid packet: ", p))
7085 def nat64_get_ses_num(self):
7087 Return number of active NAT64 sessions.
7089 st = self.vapi.nat64_st_dump()
7092 def clear_nat64(self):
7094 Clear NAT64 configuration.
7096 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
7097 domain_id=self.ipfix_domain_id)
7098 self.ipfix_src_port = 4739
7099 self.ipfix_domain_id = 1
7101 self.vapi.nat_set_timeouts()
7103 interfaces = self.vapi.nat64_interface_dump()
7104 for intf in interfaces:
7105 if intf.is_inside > 1:
7106 self.vapi.nat64_add_del_interface(intf.sw_if_index,
7109 self.vapi.nat64_add_del_interface(intf.sw_if_index,
7113 bib = self.vapi.nat64_bib_dump(255)
7116 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
7124 adresses = self.vapi.nat64_pool_addr_dump()
7125 for addr in adresses:
7126 self.vapi.nat64_add_del_pool_addr_range(addr.address,
7131 prefixes = self.vapi.nat64_prefix_dump()
7132 for prefix in prefixes:
7133 self.vapi.nat64_add_del_prefix(prefix.prefix,
7135 vrf_id=prefix.vrf_id,
7139 super(TestNAT64, self).tearDown()
7140 if not self.vpp_dead:
7141 self.logger.info(self.vapi.cli("show nat64 pool"))
7142 self.logger.info(self.vapi.cli("show nat64 interfaces"))
7143 self.logger.info(self.vapi.cli("show nat64 prefix"))
7144 self.logger.info(self.vapi.cli("show nat64 bib all"))
7145 self.logger.info(self.vapi.cli("show nat64 session table all"))
7146 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
7150 class TestDSlite(MethodHolder):
7151 """ DS-Lite Test Cases """
7154 def setUpClass(cls):
7155 super(TestDSlite, cls).setUpClass()
7158 cls.nat_addr = '10.0.0.3'
7159 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
7161 cls.create_pg_interfaces(range(2))
7163 cls.pg0.config_ip4()
7164 cls.pg0.resolve_arp()
7166 cls.pg1.config_ip6()
7167 cls.pg1.generate_remote_hosts(2)
7168 cls.pg1.configure_ipv6_neighbors()
7171 super(TestDSlite, cls).tearDownClass()
7174 def test_dslite(self):
7175 """ Test DS-Lite """
7176 nat_config = self.vapi.nat_show_config()
7177 self.assertEqual(0, nat_config.dslite_ce)
7179 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
7181 aftr_ip4 = '192.0.0.1'
7182 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
7183 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
7184 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
7185 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
7188 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7189 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
7190 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
7191 UDP(sport=20000, dport=10000))
7192 self.pg1.add_stream(p)
7193 self.pg_enable_capture(self.pg_interfaces)
7195 capture = self.pg0.get_capture(1)
7196 capture = capture[0]
7197 self.assertFalse(capture.haslayer(IPv6))
7198 self.assertEqual(capture[IP].src, self.nat_addr)
7199 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
7200 self.assertNotEqual(capture[UDP].sport, 20000)
7201 self.assertEqual(capture[UDP].dport, 10000)
7202 self.assert_packet_checksums_valid(capture)
7203 out_port = capture[UDP].sport
7205 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7206 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
7207 UDP(sport=10000, dport=out_port))
7208 self.pg0.add_stream(p)
7209 self.pg_enable_capture(self.pg_interfaces)
7211 capture = self.pg1.get_capture(1)
7212 capture = capture[0]
7213 self.assertEqual(capture[IPv6].src, aftr_ip6)
7214 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
7215 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7216 self.assertEqual(capture[IP].dst, '192.168.1.1')
7217 self.assertEqual(capture[UDP].sport, 10000)
7218 self.assertEqual(capture[UDP].dport, 20000)
7219 self.assert_packet_checksums_valid(capture)
7222 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7223 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
7224 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
7225 TCP(sport=20001, dport=10001))
7226 self.pg1.add_stream(p)
7227 self.pg_enable_capture(self.pg_interfaces)
7229 capture = self.pg0.get_capture(1)
7230 capture = capture[0]
7231 self.assertFalse(capture.haslayer(IPv6))
7232 self.assertEqual(capture[IP].src, self.nat_addr)
7233 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
7234 self.assertNotEqual(capture[TCP].sport, 20001)
7235 self.assertEqual(capture[TCP].dport, 10001)
7236 self.assert_packet_checksums_valid(capture)
7237 out_port = capture[TCP].sport
7239 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7240 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
7241 TCP(sport=10001, dport=out_port))
7242 self.pg0.add_stream(p)
7243 self.pg_enable_capture(self.pg_interfaces)
7245 capture = self.pg1.get_capture(1)
7246 capture = capture[0]
7247 self.assertEqual(capture[IPv6].src, aftr_ip6)
7248 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
7249 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7250 self.assertEqual(capture[IP].dst, '192.168.1.1')
7251 self.assertEqual(capture[TCP].sport, 10001)
7252 self.assertEqual(capture[TCP].dport, 20001)
7253 self.assert_packet_checksums_valid(capture)
7256 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7257 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
7258 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
7259 ICMP(id=4000, type='echo-request'))
7260 self.pg1.add_stream(p)
7261 self.pg_enable_capture(self.pg_interfaces)
7263 capture = self.pg0.get_capture(1)
7264 capture = capture[0]
7265 self.assertFalse(capture.haslayer(IPv6))
7266 self.assertEqual(capture[IP].src, self.nat_addr)
7267 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
7268 self.assertNotEqual(capture[ICMP].id, 4000)
7269 self.assert_packet_checksums_valid(capture)
7270 out_id = capture[ICMP].id
7272 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7273 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
7274 ICMP(id=out_id, type='echo-reply'))
7275 self.pg0.add_stream(p)
7276 self.pg_enable_capture(self.pg_interfaces)
7278 capture = self.pg1.get_capture(1)
7279 capture = capture[0]
7280 self.assertEqual(capture[IPv6].src, aftr_ip6)
7281 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
7282 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7283 self.assertEqual(capture[IP].dst, '192.168.1.1')
7284 self.assertEqual(capture[ICMP].id, 4000)
7285 self.assert_packet_checksums_valid(capture)
7287 # ping DS-Lite AFTR tunnel endpoint address
7288 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7289 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
7290 ICMPv6EchoRequest())
7291 self.pg1.add_stream(p)
7292 self.pg_enable_capture(self.pg_interfaces)
7294 capture = self.pg1.get_capture(1)
7295 self.assertEqual(1, len(capture))
7296 capture = capture[0]
7297 self.assertEqual(capture[IPv6].src, aftr_ip6)
7298 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
7299 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
7302 super(TestDSlite, self).tearDown()
7303 if not self.vpp_dead:
7304 self.logger.info(self.vapi.cli("show dslite pool"))
7306 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
7307 self.logger.info(self.vapi.cli("show dslite sessions"))
7310 class TestDSliteCE(MethodHolder):
7311 """ DS-Lite CE Test Cases """
7314 def setUpConstants(cls):
7315 super(TestDSliteCE, cls).setUpConstants()
7316 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
7319 def setUpClass(cls):
7320 super(TestDSliteCE, cls).setUpClass()
7323 cls.create_pg_interfaces(range(2))
7325 cls.pg0.config_ip4()
7326 cls.pg0.resolve_arp()
7328 cls.pg1.config_ip6()
7329 cls.pg1.generate_remote_hosts(1)
7330 cls.pg1.configure_ipv6_neighbors()
7333 super(TestDSliteCE, cls).tearDownClass()
7336 def test_dslite_ce(self):
7337 """ Test DS-Lite CE """
7339 nat_config = self.vapi.nat_show_config()
7340 self.assertEqual(1, nat_config.dslite_ce)
7342 b4_ip4 = '192.0.0.2'
7343 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
7344 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
7345 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
7346 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
7348 aftr_ip4 = '192.0.0.1'
7349 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
7350 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
7351 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
7352 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
7354 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
7355 dst_address_length=128,
7356 next_hop_address=self.pg1.remote_ip6n,
7357 next_hop_sw_if_index=self.pg1.sw_if_index,
7361 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7362 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
7363 UDP(sport=10000, dport=20000))
7364 self.pg0.add_stream(p)
7365 self.pg_enable_capture(self.pg_interfaces)
7367 capture = self.pg1.get_capture(1)
7368 capture = capture[0]
7369 self.assertEqual(capture[IPv6].src, b4_ip6)
7370 self.assertEqual(capture[IPv6].dst, aftr_ip6)
7371 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7372 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
7373 self.assertEqual(capture[UDP].sport, 10000)
7374 self.assertEqual(capture[UDP].dport, 20000)
7375 self.assert_packet_checksums_valid(capture)
7378 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7379 IPv6(dst=b4_ip6, src=aftr_ip6) /
7380 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
7381 UDP(sport=20000, dport=10000))
7382 self.pg1.add_stream(p)
7383 self.pg_enable_capture(self.pg_interfaces)
7385 capture = self.pg0.get_capture(1)
7386 capture = capture[0]
7387 self.assertFalse(capture.haslayer(IPv6))
7388 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
7389 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
7390 self.assertEqual(capture[UDP].sport, 20000)
7391 self.assertEqual(capture[UDP].dport, 10000)
7392 self.assert_packet_checksums_valid(capture)
7394 # ping DS-Lite B4 tunnel endpoint address
7395 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7396 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
7397 ICMPv6EchoRequest())
7398 self.pg1.add_stream(p)
7399 self.pg_enable_capture(self.pg_interfaces)
7401 capture = self.pg1.get_capture(1)
7402 self.assertEqual(1, len(capture))
7403 capture = capture[0]
7404 self.assertEqual(capture[IPv6].src, b4_ip6)
7405 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
7406 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
7409 super(TestDSliteCE, self).tearDown()
7410 if not self.vpp_dead:
7412 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
7414 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
7417 class TestNAT66(MethodHolder):
7418 """ NAT66 Test Cases """
7421 def setUpClass(cls):
7422 super(TestNAT66, cls).setUpClass()
7425 cls.nat_addr = 'fd01:ff::2'
7426 cls.nat_addr_n = socket.inet_pton(socket.AF_INET6, cls.nat_addr)
7428 cls.create_pg_interfaces(range(2))
7429 cls.interfaces = list(cls.pg_interfaces)
7431 for i in cls.interfaces:
7434 i.configure_ipv6_neighbors()
7437 super(TestNAT66, cls).tearDownClass()
7440 def test_static(self):
7441 """ 1:1 NAT66 test """
7442 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
7443 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index, is_inside=0)
7444 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
7449 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7450 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7453 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7454 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7457 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7458 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7459 ICMPv6EchoRequest())
7461 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7462 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7463 GRE() / IP() / TCP())
7465 self.pg0.add_stream(pkts)
7466 self.pg_enable_capture(self.pg_interfaces)
7468 capture = self.pg1.get_capture(len(pkts))
7469 for packet in capture:
7471 self.assertEqual(packet[IPv6].src, self.nat_addr)
7472 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
7473 self.assert_packet_checksums_valid(packet)
7475 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7480 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7481 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7484 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7485 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7488 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7489 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7492 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7493 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7494 GRE() / IP() / TCP())
7496 self.pg1.add_stream(pkts)
7497 self.pg_enable_capture(self.pg_interfaces)
7499 capture = self.pg0.get_capture(len(pkts))
7500 for packet in capture:
7502 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
7503 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
7504 self.assert_packet_checksums_valid(packet)
7506 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7509 sm = self.vapi.nat66_static_mapping_dump()
7510 self.assertEqual(len(sm), 1)
7511 self.assertEqual(sm[0].total_pkts, 8)
7513 def test_check_no_translate(self):
7514 """ NAT66 translate only when egress interface is outside interface """
7515 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
7516 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index)
7517 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
7521 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7522 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7524 self.pg0.add_stream([p])
7525 self.pg_enable_capture(self.pg_interfaces)
7527 capture = self.pg1.get_capture(1)
7530 self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
7531 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
7533 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7536 def clear_nat66(self):
7538 Clear NAT66 configuration.
7540 interfaces = self.vapi.nat66_interface_dump()
7541 for intf in interfaces:
7542 self.vapi.nat66_add_del_interface(intf.sw_if_index,
7546 static_mappings = self.vapi.nat66_static_mapping_dump()
7547 for sm in static_mappings:
7548 self.vapi.nat66_add_del_static_mapping(sm.local_ip_address,
7549 sm.external_ip_address,
7554 super(TestNAT66, self).tearDown()
7555 if not self.vpp_dead:
7556 self.logger.info(self.vapi.cli("show nat66 interfaces"))
7557 self.logger.info(self.vapi.cli("show nat66 static mappings"))
7561 if __name__ == '__main__':
7562 unittest.main(testRunner=VppTestRunner)