9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
12 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
13 ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
29 def clear_nat44(self):
31 Clear NAT44 configuration.
33 if hasattr(self, 'pg7') and hasattr(self, 'pg8'):
34 # I found no elegant way to do this
35 self.vapi.ip_add_del_route(
36 dst_address=self.pg7.remote_ip4n,
37 dst_address_length=32,
38 next_hop_address=self.pg7.remote_ip4n,
39 next_hop_sw_if_index=self.pg7.sw_if_index,
41 self.vapi.ip_add_del_route(
42 dst_address=self.pg8.remote_ip4n,
43 dst_address_length=32,
44 next_hop_address=self.pg8.remote_ip4n,
45 next_hop_sw_if_index=self.pg8.sw_if_index,
48 for intf in [self.pg7, self.pg8]:
49 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
51 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
56 if self.pg7.has_ip4_config:
57 self.pg7.unconfig_ip4()
59 self.vapi.nat44_forwarding_enable_disable(0)
61 interfaces = self.vapi.nat44_interface_addr_dump()
62 for intf in interfaces:
63 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
64 twice_nat=intf.twice_nat,
67 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
68 domain_id=self.ipfix_domain_id)
69 self.ipfix_src_port = 4739
70 self.ipfix_domain_id = 1
72 interfaces = self.vapi.nat44_interface_dump()
73 for intf in interfaces:
74 if intf.is_inside > 1:
75 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
78 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
82 interfaces = self.vapi.nat44_interface_output_feature_dump()
83 for intf in interfaces:
84 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
88 static_mappings = self.vapi.nat44_static_mapping_dump()
89 for sm in static_mappings:
90 self.vapi.nat44_add_del_static_mapping(
92 sm.external_ip_address,
93 local_port=sm.local_port,
94 external_port=sm.external_port,
95 addr_only=sm.addr_only,
98 twice_nat=sm.twice_nat,
99 self_twice_nat=sm.self_twice_nat,
100 out2in_only=sm.out2in_only,
102 external_sw_if_index=sm.external_sw_if_index,
105 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
106 for lb_sm in lb_static_mappings:
107 self.vapi.nat44_add_del_lb_static_mapping(
111 twice_nat=lb_sm.twice_nat,
112 self_twice_nat=lb_sm.self_twice_nat,
113 out2in_only=lb_sm.out2in_only,
119 identity_mappings = self.vapi.nat44_identity_mapping_dump()
120 for id_m in identity_mappings:
121 self.vapi.nat44_add_del_identity_mapping(
122 addr_only=id_m.addr_only,
125 sw_if_index=id_m.sw_if_index,
127 protocol=id_m.protocol,
130 adresses = self.vapi.nat44_address_dump()
131 for addr in adresses:
132 self.vapi.nat44_add_del_address_range(addr.ip_address,
134 twice_nat=addr.twice_nat,
137 self.vapi.nat_set_reass()
138 self.vapi.nat_set_reass(is_ip6=1)
139 self.verify_no_nat44_user()
140 self.vapi.nat_set_timeouts()
141 self.vapi.nat_set_addr_and_port_alloc_alg()
143 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
144 local_port=0, external_port=0, vrf_id=0,
145 is_add=1, external_sw_if_index=0xFFFFFFFF,
146 proto=0, twice_nat=0, self_twice_nat=0,
147 out2in_only=0, tag=""):
149 Add/delete NAT44 static mapping
151 :param local_ip: Local IP address
152 :param external_ip: External IP address
153 :param local_port: Local port number (Optional)
154 :param external_port: External port number (Optional)
155 :param vrf_id: VRF ID (Default 0)
156 :param is_add: 1 if add, 0 if delete (Default add)
157 :param external_sw_if_index: External interface instead of IP address
158 :param proto: IP protocol (Mandatory if port specified)
159 :param twice_nat: 1 if translate external host address and port
160 :param self_twice_nat: 1 if translate external host address and port
161 whenever external host address equals
162 local address of internal host
163 :param out2in_only: if 1 rule is matching only out2in direction
164 :param tag: Opaque string tag
167 if local_port and external_port:
169 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
170 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
171 self.vapi.nat44_add_del_static_mapping(
174 external_sw_if_index,
186 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
188 Add/delete NAT44 address
190 :param ip: IP address
191 :param is_add: 1 if add, 0 if delete (Default add)
192 :param twice_nat: twice NAT address for extenal hosts
194 nat_addr = socket.inet_pton(socket.AF_INET, ip)
195 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
199 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
201 Create packet stream for inside network
203 :param in_if: Inside interface
204 :param out_if: Outside interface
205 :param dst_ip: Destination address
206 :param ttl: TTL of generated packets
209 dst_ip = out_if.remote_ip4
213 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
214 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
215 TCP(sport=self.tcp_port_in, dport=20))
219 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
220 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
221 UDP(sport=self.udp_port_in, dport=20))
225 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
226 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
227 ICMP(id=self.icmp_id_in, type='echo-request'))
232 def compose_ip6(self, ip4, pref, plen):
234 Compose IPv4-embedded IPv6 addresses
236 :param ip4: IPv4 address
237 :param pref: IPv6 prefix
238 :param plen: IPv6 prefix length
239 :returns: IPv4-embedded IPv6 addresses
241 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
242 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
257 pref_n[10] = ip4_n[3]
261 pref_n[10] = ip4_n[2]
262 pref_n[11] = ip4_n[3]
265 pref_n[10] = ip4_n[1]
266 pref_n[11] = ip4_n[2]
267 pref_n[12] = ip4_n[3]
269 pref_n[12] = ip4_n[0]
270 pref_n[13] = ip4_n[1]
271 pref_n[14] = ip4_n[2]
272 pref_n[15] = ip4_n[3]
273 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
275 def extract_ip4(self, ip6, plen):
277 Extract IPv4 address embedded in IPv6 addresses
279 :param ip6: IPv6 address
280 :param plen: IPv6 prefix length
281 :returns: extracted IPv4 address
283 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
315 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
317 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
319 Create IPv6 packet stream for inside network
321 :param in_if: Inside interface
322 :param out_if: Outside interface
323 :param ttl: Hop Limit of generated packets
324 :param pref: NAT64 prefix
325 :param plen: NAT64 prefix length
329 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
331 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
334 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
335 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
336 TCP(sport=self.tcp_port_in, dport=20))
340 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
341 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
342 UDP(sport=self.udp_port_in, dport=20))
346 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
347 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
348 ICMPv6EchoRequest(id=self.icmp_id_in))
353 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
354 use_inside_ports=False):
356 Create packet stream for outside network
358 :param out_if: Outside interface
359 :param dst_ip: Destination IP address (Default use global NAT address)
360 :param ttl: TTL of generated packets
361 :param use_inside_ports: Use inside NAT ports as destination ports
362 instead of outside ports
365 dst_ip = self.nat_addr
366 if not use_inside_ports:
367 tcp_port = self.tcp_port_out
368 udp_port = self.udp_port_out
369 icmp_id = self.icmp_id_out
371 tcp_port = self.tcp_port_in
372 udp_port = self.udp_port_in
373 icmp_id = self.icmp_id_in
376 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
377 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
378 TCP(dport=tcp_port, sport=20))
382 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
383 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
384 UDP(dport=udp_port, sport=20))
388 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
389 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
390 ICMP(id=icmp_id, type='echo-reply'))
395 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
397 Create packet stream for outside network
399 :param out_if: Outside interface
400 :param dst_ip: Destination IP address (Default use global NAT address)
401 :param hl: HL of generated packets
405 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
406 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
407 TCP(dport=self.tcp_port_out, sport=20))
411 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
412 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
413 UDP(dport=self.udp_port_out, sport=20))
417 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
418 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
419 ICMPv6EchoReply(id=self.icmp_id_out))
424 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
425 packet_num=3, dst_ip=None, is_ip6=False):
427 Verify captured packets on outside network
429 :param capture: Captured packets
430 :param nat_ip: Translated IP address (Default use global NAT address)
431 :param same_port: Sorce port number is not translated (Default False)
432 :param packet_num: Expected number of packets (Default 3)
433 :param dst_ip: Destination IP address (Default do not verify)
434 :param is_ip6: If L3 protocol is IPv6 (Default False)
438 ICMP46 = ICMPv6EchoRequest
443 nat_ip = self.nat_addr
444 self.assertEqual(packet_num, len(capture))
445 for packet in capture:
448 self.assert_packet_checksums_valid(packet)
449 self.assertEqual(packet[IP46].src, nat_ip)
450 if dst_ip is not None:
451 self.assertEqual(packet[IP46].dst, dst_ip)
452 if packet.haslayer(TCP):
454 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
457 packet[TCP].sport, self.tcp_port_in)
458 self.tcp_port_out = packet[TCP].sport
459 self.assert_packet_checksums_valid(packet)
460 elif packet.haslayer(UDP):
462 self.assertEqual(packet[UDP].sport, self.udp_port_in)
465 packet[UDP].sport, self.udp_port_in)
466 self.udp_port_out = packet[UDP].sport
469 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
471 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
472 self.icmp_id_out = packet[ICMP46].id
473 self.assert_packet_checksums_valid(packet)
475 self.logger.error(ppp("Unexpected or invalid packet "
476 "(outside network):", packet))
479 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
480 packet_num=3, dst_ip=None):
482 Verify captured packets on outside network
484 :param capture: Captured packets
485 :param nat_ip: Translated IP address
486 :param same_port: Sorce port number is not translated (Default False)
487 :param packet_num: Expected number of packets (Default 3)
488 :param dst_ip: Destination IP address (Default do not verify)
490 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
493 def verify_capture_in(self, capture, in_if, packet_num=3):
495 Verify captured packets on inside network
497 :param capture: Captured packets
498 :param in_if: Inside interface
499 :param packet_num: Expected number of packets (Default 3)
501 self.assertEqual(packet_num, len(capture))
502 for packet in capture:
504 self.assert_packet_checksums_valid(packet)
505 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
506 if packet.haslayer(TCP):
507 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
508 elif packet.haslayer(UDP):
509 self.assertEqual(packet[UDP].dport, self.udp_port_in)
511 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
513 self.logger.error(ppp("Unexpected or invalid packet "
514 "(inside network):", packet))
517 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
519 Verify captured IPv6 packets on inside network
521 :param capture: Captured packets
522 :param src_ip: Source IP
523 :param dst_ip: Destination IP address
524 :param packet_num: Expected number of packets (Default 3)
526 self.assertEqual(packet_num, len(capture))
527 for packet in capture:
529 self.assertEqual(packet[IPv6].src, src_ip)
530 self.assertEqual(packet[IPv6].dst, dst_ip)
531 self.assert_packet_checksums_valid(packet)
532 if packet.haslayer(TCP):
533 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
534 elif packet.haslayer(UDP):
535 self.assertEqual(packet[UDP].dport, self.udp_port_in)
537 self.assertEqual(packet[ICMPv6EchoReply].id,
540 self.logger.error(ppp("Unexpected or invalid packet "
541 "(inside network):", packet))
544 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
546 Verify captured packet that don't have to be translated
548 :param capture: Captured packets
549 :param ingress_if: Ingress interface
550 :param egress_if: Egress interface
552 for packet in capture:
554 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
555 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
556 if packet.haslayer(TCP):
557 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
558 elif packet.haslayer(UDP):
559 self.assertEqual(packet[UDP].sport, self.udp_port_in)
561 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
563 self.logger.error(ppp("Unexpected or invalid packet "
564 "(inside network):", packet))
567 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
568 packet_num=3, icmp_type=11):
570 Verify captured packets with ICMP errors on outside network
572 :param capture: Captured packets
573 :param src_ip: Translated IP address or IP address of VPP
574 (Default use global NAT address)
575 :param packet_num: Expected number of packets (Default 3)
576 :param icmp_type: Type of error ICMP packet
577 we are expecting (Default 11)
580 src_ip = self.nat_addr
581 self.assertEqual(packet_num, len(capture))
582 for packet in capture:
584 self.assertEqual(packet[IP].src, src_ip)
585 self.assertTrue(packet.haslayer(ICMP))
587 self.assertEqual(icmp.type, icmp_type)
588 self.assertTrue(icmp.haslayer(IPerror))
589 inner_ip = icmp[IPerror]
590 if inner_ip.haslayer(TCPerror):
591 self.assertEqual(inner_ip[TCPerror].dport,
593 elif inner_ip.haslayer(UDPerror):
594 self.assertEqual(inner_ip[UDPerror].dport,
597 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
599 self.logger.error(ppp("Unexpected or invalid packet "
600 "(outside network):", packet))
603 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
606 Verify captured packets with ICMP errors on inside network
608 :param capture: Captured packets
609 :param in_if: Inside interface
610 :param packet_num: Expected number of packets (Default 3)
611 :param icmp_type: Type of error ICMP packet
612 we are expecting (Default 11)
614 self.assertEqual(packet_num, len(capture))
615 for packet in capture:
617 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
618 self.assertTrue(packet.haslayer(ICMP))
620 self.assertEqual(icmp.type, icmp_type)
621 self.assertTrue(icmp.haslayer(IPerror))
622 inner_ip = icmp[IPerror]
623 if inner_ip.haslayer(TCPerror):
624 self.assertEqual(inner_ip[TCPerror].sport,
626 elif inner_ip.haslayer(UDPerror):
627 self.assertEqual(inner_ip[UDPerror].sport,
630 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
632 self.logger.error(ppp("Unexpected or invalid packet "
633 "(inside network):", packet))
636 def create_stream_frag(self, src_if, dst, sport, dport, data):
638 Create fragmented packet stream
640 :param src_if: Source interface
641 :param dst: Destination IPv4 address
642 :param sport: Source TCP port
643 :param dport: Destination TCP port
644 :param data: Payload data
647 id = random.randint(0, 65535)
648 p = (IP(src=src_if.remote_ip4, dst=dst) /
649 TCP(sport=sport, dport=dport) /
651 p = p.__class__(str(p))
652 chksum = p['TCP'].chksum
654 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
655 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
656 TCP(sport=sport, dport=dport, chksum=chksum) /
659 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
660 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
661 proto=IP_PROTOS.tcp) /
664 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
665 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
671 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
672 pref=None, plen=0, frag_size=128):
674 Create fragmented packet stream
676 :param src_if: Source interface
677 :param dst: Destination IPv4 address
678 :param sport: Source TCP port
679 :param dport: Destination TCP port
680 :param data: Payload data
681 :param pref: NAT64 prefix
682 :param plen: NAT64 prefix length
683 :param fragsize: size of fragments
687 dst_ip6 = ''.join(['64:ff9b::', dst])
689 dst_ip6 = self.compose_ip6(dst, pref, plen)
691 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
692 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
693 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
694 TCP(sport=sport, dport=dport) /
697 return fragment6(p, frag_size)
699 def reass_frags_and_verify(self, frags, src, dst):
701 Reassemble and verify fragmented packet
703 :param frags: Captured fragments
704 :param src: Source IPv4 address to verify
705 :param dst: Destination IPv4 address to verify
707 :returns: Reassembled IPv4 packet
709 buffer = StringIO.StringIO()
711 self.assertEqual(p[IP].src, src)
712 self.assertEqual(p[IP].dst, dst)
713 self.assert_ip_checksum_valid(p)
714 buffer.seek(p[IP].frag * 8)
715 buffer.write(p[IP].payload)
716 ip = frags[0].getlayer(IP)
717 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
718 proto=frags[0][IP].proto)
719 if ip.proto == IP_PROTOS.tcp:
720 p = (ip / TCP(buffer.getvalue()))
721 self.assert_tcp_checksum_valid(p)
722 elif ip.proto == IP_PROTOS.udp:
723 p = (ip / UDP(buffer.getvalue()))
726 def reass_frags_and_verify_ip6(self, frags, src, dst):
728 Reassemble and verify fragmented packet
730 :param frags: Captured fragments
731 :param src: Source IPv6 address to verify
732 :param dst: Destination IPv6 address to verify
734 :returns: Reassembled IPv6 packet
736 buffer = StringIO.StringIO()
738 self.assertEqual(p[IPv6].src, src)
739 self.assertEqual(p[IPv6].dst, dst)
740 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
741 buffer.write(p[IPv6ExtHdrFragment].payload)
742 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
743 nh=frags[0][IPv6ExtHdrFragment].nh)
744 if ip.nh == IP_PROTOS.tcp:
745 p = (ip / TCP(buffer.getvalue()))
746 elif ip.nh == IP_PROTOS.udp:
747 p = (ip / UDP(buffer.getvalue()))
748 self.assert_packet_checksums_valid(p)
751 def initiate_tcp_session(self, in_if, out_if):
753 Initiates TCP session
755 :param in_if: Inside interface
756 :param out_if: Outside interface
760 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
761 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
762 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
765 self.pg_enable_capture(self.pg_interfaces)
767 capture = out_if.get_capture(1)
769 self.tcp_port_out = p[TCP].sport
771 # SYN + ACK packet out->in
772 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
773 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
774 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
777 self.pg_enable_capture(self.pg_interfaces)
782 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
783 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
784 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
787 self.pg_enable_capture(self.pg_interfaces)
789 out_if.get_capture(1)
792 self.logger.error("TCP 3 way handshake failed")
795 def verify_ipfix_nat44_ses(self, data):
797 Verify IPFIX NAT44 session create/delete event
799 :param data: Decoded IPFIX data records
801 nat44_ses_create_num = 0
802 nat44_ses_delete_num = 0
803 self.assertEqual(6, len(data))
806 self.assertIn(ord(record[230]), [4, 5])
807 if ord(record[230]) == 4:
808 nat44_ses_create_num += 1
810 nat44_ses_delete_num += 1
812 self.assertEqual(self.pg0.remote_ip4n, record[8])
813 # postNATSourceIPv4Address
814 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
817 self.assertEqual(struct.pack("!I", 0), record[234])
818 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
819 if IP_PROTOS.icmp == ord(record[4]):
820 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
821 self.assertEqual(struct.pack("!H", self.icmp_id_out),
823 elif IP_PROTOS.tcp == ord(record[4]):
824 self.assertEqual(struct.pack("!H", self.tcp_port_in),
826 self.assertEqual(struct.pack("!H", self.tcp_port_out),
828 elif IP_PROTOS.udp == ord(record[4]):
829 self.assertEqual(struct.pack("!H", self.udp_port_in),
831 self.assertEqual(struct.pack("!H", self.udp_port_out),
834 self.fail("Invalid protocol")
835 self.assertEqual(3, nat44_ses_create_num)
836 self.assertEqual(3, nat44_ses_delete_num)
838 def verify_ipfix_addr_exhausted(self, data):
840 Verify IPFIX NAT addresses event
842 :param data: Decoded IPFIX data records
844 self.assertEqual(1, len(data))
847 self.assertEqual(ord(record[230]), 3)
849 self.assertEqual(struct.pack("!I", 0), record[283])
851 def verify_ipfix_max_sessions(self, data, limit):
853 Verify IPFIX maximum session entries exceeded event
855 :param data: Decoded IPFIX data records
856 :param limit: Number of maximum session entries that can be created.
858 self.assertEqual(1, len(data))
861 self.assertEqual(ord(record[230]), 13)
862 # natQuotaExceededEvent
863 self.assertEqual(struct.pack("I", 1), record[466])
865 self.assertEqual(struct.pack("I", limit), record[471])
867 def verify_ipfix_max_bibs(self, data, limit):
869 Verify IPFIX maximum BIB entries exceeded event
871 :param data: Decoded IPFIX data records
872 :param limit: Number of maximum BIB entries that can be created.
874 self.assertEqual(1, len(data))
877 self.assertEqual(ord(record[230]), 13)
878 # natQuotaExceededEvent
879 self.assertEqual(struct.pack("I", 2), record[466])
881 self.assertEqual(struct.pack("I", limit), record[472])
883 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
885 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
887 :param data: Decoded IPFIX data records
888 :param limit: Number of maximum fragments pending reassembly
889 :param src_addr: IPv6 source address
891 self.assertEqual(1, len(data))
894 self.assertEqual(ord(record[230]), 13)
895 # natQuotaExceededEvent
896 self.assertEqual(struct.pack("I", 5), record[466])
897 # maxFragmentsPendingReassembly
898 self.assertEqual(struct.pack("I", limit), record[475])
900 self.assertEqual(src_addr, record[27])
902 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
904 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
906 :param data: Decoded IPFIX data records
907 :param limit: Number of maximum fragments pending reassembly
908 :param src_addr: IPv4 source address
910 self.assertEqual(1, len(data))
913 self.assertEqual(ord(record[230]), 13)
914 # natQuotaExceededEvent
915 self.assertEqual(struct.pack("I", 5), record[466])
916 # maxFragmentsPendingReassembly
917 self.assertEqual(struct.pack("I", limit), record[475])
919 self.assertEqual(src_addr, record[8])
921 def verify_ipfix_bib(self, data, is_create, src_addr):
923 Verify IPFIX NAT64 BIB create and delete events
925 :param data: Decoded IPFIX data records
926 :param is_create: Create event if nonzero value otherwise delete event
927 :param src_addr: IPv6 source address
929 self.assertEqual(1, len(data))
933 self.assertEqual(ord(record[230]), 10)
935 self.assertEqual(ord(record[230]), 11)
937 self.assertEqual(src_addr, record[27])
938 # postNATSourceIPv4Address
939 self.assertEqual(self.nat_addr_n, record[225])
941 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
943 self.assertEqual(struct.pack("!I", 0), record[234])
944 # sourceTransportPort
945 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
946 # postNAPTSourceTransportPort
947 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
949 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
952 Verify IPFIX NAT64 session create and delete events
954 :param data: Decoded IPFIX data records
955 :param is_create: Create event if nonzero value otherwise delete event
956 :param src_addr: IPv6 source address
957 :param dst_addr: IPv4 destination address
958 :param dst_port: destination TCP port
960 self.assertEqual(1, len(data))
964 self.assertEqual(ord(record[230]), 6)
966 self.assertEqual(ord(record[230]), 7)
968 self.assertEqual(src_addr, record[27])
969 # destinationIPv6Address
970 self.assertEqual(socket.inet_pton(socket.AF_INET6,
971 self.compose_ip6(dst_addr,
975 # postNATSourceIPv4Address
976 self.assertEqual(self.nat_addr_n, record[225])
977 # postNATDestinationIPv4Address
978 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
981 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
983 self.assertEqual(struct.pack("!I", 0), record[234])
984 # sourceTransportPort
985 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
986 # postNAPTSourceTransportPort
987 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
988 # destinationTransportPort
989 self.assertEqual(struct.pack("!H", dst_port), record[11])
990 # postNAPTDestinationTransportPort
991 self.assertEqual(struct.pack("!H", dst_port), record[228])
993 def verify_no_nat44_user(self):
994 """ Verify that there is no NAT44 user """
995 users = self.vapi.nat44_user_dump()
996 self.assertEqual(len(users), 0)
998 def verify_ipfix_max_entries_per_user(self, data, limit, src_addr):
1000 Verify IPFIX maximum entries per user exceeded event
1002 :param data: Decoded IPFIX data records
1003 :param limit: Number of maximum entries per user
1004 :param src_addr: IPv4 source address
1006 self.assertEqual(1, len(data))
1009 self.assertEqual(ord(record[230]), 13)
1010 # natQuotaExceededEvent
1011 self.assertEqual(struct.pack("I", 3), record[466])
1013 self.assertEqual(struct.pack("I", limit), record[473])
1015 self.assertEqual(src_addr, record[8])
1018 class TestNAT44(MethodHolder):
1019 """ NAT44 Test Cases """
1022 def setUpClass(cls):
1023 super(TestNAT44, cls).setUpClass()
1024 cls.vapi.cli("set log class nat level debug")
1027 cls.tcp_port_in = 6303
1028 cls.tcp_port_out = 6303
1029 cls.udp_port_in = 6304
1030 cls.udp_port_out = 6304
1031 cls.icmp_id_in = 6305
1032 cls.icmp_id_out = 6305
1033 cls.nat_addr = '10.0.0.3'
1034 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
1035 cls.ipfix_src_port = 4739
1036 cls.ipfix_domain_id = 1
1037 cls.tcp_external_port = 80
1039 cls.create_pg_interfaces(range(10))
1040 cls.interfaces = list(cls.pg_interfaces[0:4])
1042 for i in cls.interfaces:
1047 cls.pg0.generate_remote_hosts(3)
1048 cls.pg0.configure_ipv4_neighbors()
1050 cls.pg1.generate_remote_hosts(1)
1051 cls.pg1.configure_ipv4_neighbors()
1053 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
1054 cls.vapi.ip_table_add_del(10, is_add=1)
1055 cls.vapi.ip_table_add_del(20, is_add=1)
1057 cls.pg4._local_ip4 = "172.16.255.1"
1058 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1059 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
1060 cls.pg4.set_table_ip4(10)
1061 cls.pg5._local_ip4 = "172.17.255.3"
1062 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1063 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
1064 cls.pg5.set_table_ip4(10)
1065 cls.pg6._local_ip4 = "172.16.255.1"
1066 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1067 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
1068 cls.pg6.set_table_ip4(20)
1069 for i in cls.overlapping_interfaces:
1077 cls.pg9.generate_remote_hosts(2)
1078 cls.pg9.config_ip4()
1079 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
1080 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
1084 cls.pg9.resolve_arp()
1085 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
1086 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
1087 cls.pg9.resolve_arp()
1090 super(TestNAT44, cls).tearDownClass()
1093 def test_dynamic(self):
1094 """ NAT44 dynamic translation test """
1096 self.nat44_add_address(self.nat_addr)
1097 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1098 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1102 pkts = self.create_stream_in(self.pg0, self.pg1)
1103 self.pg0.add_stream(pkts)
1104 self.pg_enable_capture(self.pg_interfaces)
1106 capture = self.pg1.get_capture(len(pkts))
1107 self.verify_capture_out(capture)
1110 pkts = self.create_stream_out(self.pg1)
1111 self.pg1.add_stream(pkts)
1112 self.pg_enable_capture(self.pg_interfaces)
1114 capture = self.pg0.get_capture(len(pkts))
1115 self.verify_capture_in(capture, self.pg0)
1117 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1118 """ NAT44 handling of client packets with TTL=1 """
1120 self.nat44_add_address(self.nat_addr)
1121 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1122 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1125 # Client side - generate traffic
1126 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1127 self.pg0.add_stream(pkts)
1128 self.pg_enable_capture(self.pg_interfaces)
1131 # Client side - verify ICMP type 11 packets
1132 capture = self.pg0.get_capture(len(pkts))
1133 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1135 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1136 """ NAT44 handling of server packets with TTL=1 """
1138 self.nat44_add_address(self.nat_addr)
1139 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1140 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1143 # Client side - create sessions
1144 pkts = self.create_stream_in(self.pg0, self.pg1)
1145 self.pg0.add_stream(pkts)
1146 self.pg_enable_capture(self.pg_interfaces)
1149 # Server side - generate traffic
1150 capture = self.pg1.get_capture(len(pkts))
1151 self.verify_capture_out(capture)
1152 pkts = self.create_stream_out(self.pg1, ttl=1)
1153 self.pg1.add_stream(pkts)
1154 self.pg_enable_capture(self.pg_interfaces)
1157 # Server side - verify ICMP type 11 packets
1158 capture = self.pg1.get_capture(len(pkts))
1159 self.verify_capture_out_with_icmp_errors(capture,
1160 src_ip=self.pg1.local_ip4)
1162 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1163 """ NAT44 handling of error responses to client packets with TTL=2 """
1165 self.nat44_add_address(self.nat_addr)
1166 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1167 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1170 # Client side - generate traffic
1171 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1172 self.pg0.add_stream(pkts)
1173 self.pg_enable_capture(self.pg_interfaces)
1176 # Server side - simulate ICMP type 11 response
1177 capture = self.pg1.get_capture(len(pkts))
1178 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1179 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1180 ICMP(type=11) / packet[IP] for packet in capture]
1181 self.pg1.add_stream(pkts)
1182 self.pg_enable_capture(self.pg_interfaces)
1185 # Client side - verify ICMP type 11 packets
1186 capture = self.pg0.get_capture(len(pkts))
1187 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1189 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1190 """ NAT44 handling of error responses to server packets with TTL=2 """
1192 self.nat44_add_address(self.nat_addr)
1193 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1194 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1197 # Client side - create sessions
1198 pkts = self.create_stream_in(self.pg0, self.pg1)
1199 self.pg0.add_stream(pkts)
1200 self.pg_enable_capture(self.pg_interfaces)
1203 # Server side - generate traffic
1204 capture = self.pg1.get_capture(len(pkts))
1205 self.verify_capture_out(capture)
1206 pkts = self.create_stream_out(self.pg1, ttl=2)
1207 self.pg1.add_stream(pkts)
1208 self.pg_enable_capture(self.pg_interfaces)
1211 # Client side - simulate ICMP type 11 response
1212 capture = self.pg0.get_capture(len(pkts))
1213 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1214 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1215 ICMP(type=11) / packet[IP] for packet in capture]
1216 self.pg0.add_stream(pkts)
1217 self.pg_enable_capture(self.pg_interfaces)
1220 # Server side - verify ICMP type 11 packets
1221 capture = self.pg1.get_capture(len(pkts))
1222 self.verify_capture_out_with_icmp_errors(capture)
1224 def test_ping_out_interface_from_outside(self):
1225 """ Ping NAT44 out interface from outside network """
1227 self.nat44_add_address(self.nat_addr)
1228 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1229 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1232 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1233 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1234 ICMP(id=self.icmp_id_out, type='echo-request'))
1236 self.pg1.add_stream(pkts)
1237 self.pg_enable_capture(self.pg_interfaces)
1239 capture = self.pg1.get_capture(len(pkts))
1240 self.assertEqual(1, len(capture))
1243 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1244 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1245 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1246 self.assertEqual(packet[ICMP].type, 0) # echo reply
1248 self.logger.error(ppp("Unexpected or invalid packet "
1249 "(outside network):", packet))
1252 def test_ping_internal_host_from_outside(self):
1253 """ Ping internal host from outside network """
1255 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1256 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1257 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1261 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1262 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1263 ICMP(id=self.icmp_id_out, type='echo-request'))
1264 self.pg1.add_stream(pkt)
1265 self.pg_enable_capture(self.pg_interfaces)
1267 capture = self.pg0.get_capture(1)
1268 self.verify_capture_in(capture, self.pg0, packet_num=1)
1269 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1272 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1273 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1274 ICMP(id=self.icmp_id_in, type='echo-reply'))
1275 self.pg0.add_stream(pkt)
1276 self.pg_enable_capture(self.pg_interfaces)
1278 capture = self.pg1.get_capture(1)
1279 self.verify_capture_out(capture, same_port=True, packet_num=1)
1280 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1282 def test_forwarding(self):
1283 """ NAT44 forwarding test """
1285 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1286 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1288 self.vapi.nat44_forwarding_enable_disable(1)
1290 real_ip = self.pg0.remote_ip4n
1291 alias_ip = self.nat_addr_n
1292 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1293 external_ip=alias_ip)
1296 # static mapping match
1298 pkts = self.create_stream_out(self.pg1)
1299 self.pg1.add_stream(pkts)
1300 self.pg_enable_capture(self.pg_interfaces)
1302 capture = self.pg0.get_capture(len(pkts))
1303 self.verify_capture_in(capture, self.pg0)
1305 pkts = self.create_stream_in(self.pg0, self.pg1)
1306 self.pg0.add_stream(pkts)
1307 self.pg_enable_capture(self.pg_interfaces)
1309 capture = self.pg1.get_capture(len(pkts))
1310 self.verify_capture_out(capture, same_port=True)
1312 # no static mapping match
1314 host0 = self.pg0.remote_hosts[0]
1315 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1317 pkts = self.create_stream_out(self.pg1,
1318 dst_ip=self.pg0.remote_ip4,
1319 use_inside_ports=True)
1320 self.pg1.add_stream(pkts)
1321 self.pg_enable_capture(self.pg_interfaces)
1323 capture = self.pg0.get_capture(len(pkts))
1324 self.verify_capture_in(capture, self.pg0)
1326 pkts = self.create_stream_in(self.pg0, self.pg1)
1327 self.pg0.add_stream(pkts)
1328 self.pg_enable_capture(self.pg_interfaces)
1330 capture = self.pg1.get_capture(len(pkts))
1331 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1334 self.pg0.remote_hosts[0] = host0
1337 self.vapi.nat44_forwarding_enable_disable(0)
1338 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1339 external_ip=alias_ip,
1342 def test_static_in(self):
1343 """ 1:1 NAT initialized from inside network """
1345 nat_ip = "10.0.0.10"
1346 self.tcp_port_out = 6303
1347 self.udp_port_out = 6304
1348 self.icmp_id_out = 6305
1350 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1351 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1352 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1354 sm = self.vapi.nat44_static_mapping_dump()
1355 self.assertEqual(len(sm), 1)
1356 self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1357 self.assertEqual(sm[0].protocol, 0)
1358 self.assertEqual(sm[0].local_port, 0)
1359 self.assertEqual(sm[0].external_port, 0)
1362 pkts = self.create_stream_in(self.pg0, self.pg1)
1363 self.pg0.add_stream(pkts)
1364 self.pg_enable_capture(self.pg_interfaces)
1366 capture = self.pg1.get_capture(len(pkts))
1367 self.verify_capture_out(capture, nat_ip, True)
1370 pkts = self.create_stream_out(self.pg1, nat_ip)
1371 self.pg1.add_stream(pkts)
1372 self.pg_enable_capture(self.pg_interfaces)
1374 capture = self.pg0.get_capture(len(pkts))
1375 self.verify_capture_in(capture, self.pg0)
1377 def test_static_out(self):
1378 """ 1:1 NAT initialized from outside network """
1380 nat_ip = "10.0.0.20"
1381 self.tcp_port_out = 6303
1382 self.udp_port_out = 6304
1383 self.icmp_id_out = 6305
1386 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1387 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1388 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1390 sm = self.vapi.nat44_static_mapping_dump()
1391 self.assertEqual(len(sm), 1)
1392 self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1395 pkts = self.create_stream_out(self.pg1, nat_ip)
1396 self.pg1.add_stream(pkts)
1397 self.pg_enable_capture(self.pg_interfaces)
1399 capture = self.pg0.get_capture(len(pkts))
1400 self.verify_capture_in(capture, self.pg0)
1403 pkts = self.create_stream_in(self.pg0, self.pg1)
1404 self.pg0.add_stream(pkts)
1405 self.pg_enable_capture(self.pg_interfaces)
1407 capture = self.pg1.get_capture(len(pkts))
1408 self.verify_capture_out(capture, nat_ip, True)
1410 def test_static_with_port_in(self):
1411 """ 1:1 NAPT initialized from inside network """
1413 self.tcp_port_out = 3606
1414 self.udp_port_out = 3607
1415 self.icmp_id_out = 3608
1417 self.nat44_add_address(self.nat_addr)
1418 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1419 self.tcp_port_in, self.tcp_port_out,
1420 proto=IP_PROTOS.tcp)
1421 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1422 self.udp_port_in, self.udp_port_out,
1423 proto=IP_PROTOS.udp)
1424 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1425 self.icmp_id_in, self.icmp_id_out,
1426 proto=IP_PROTOS.icmp)
1427 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1428 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1432 pkts = self.create_stream_in(self.pg0, self.pg1)
1433 self.pg0.add_stream(pkts)
1434 self.pg_enable_capture(self.pg_interfaces)
1436 capture = self.pg1.get_capture(len(pkts))
1437 self.verify_capture_out(capture)
1440 pkts = self.create_stream_out(self.pg1)
1441 self.pg1.add_stream(pkts)
1442 self.pg_enable_capture(self.pg_interfaces)
1444 capture = self.pg0.get_capture(len(pkts))
1445 self.verify_capture_in(capture, self.pg0)
1447 def test_static_with_port_out(self):
1448 """ 1:1 NAPT initialized from outside network """
1450 self.tcp_port_out = 30606
1451 self.udp_port_out = 30607
1452 self.icmp_id_out = 30608
1454 self.nat44_add_address(self.nat_addr)
1455 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1456 self.tcp_port_in, self.tcp_port_out,
1457 proto=IP_PROTOS.tcp)
1458 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1459 self.udp_port_in, self.udp_port_out,
1460 proto=IP_PROTOS.udp)
1461 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1462 self.icmp_id_in, self.icmp_id_out,
1463 proto=IP_PROTOS.icmp)
1464 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1465 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1469 pkts = self.create_stream_out(self.pg1)
1470 self.pg1.add_stream(pkts)
1471 self.pg_enable_capture(self.pg_interfaces)
1473 capture = self.pg0.get_capture(len(pkts))
1474 self.verify_capture_in(capture, self.pg0)
1477 pkts = self.create_stream_in(self.pg0, self.pg1)
1478 self.pg0.add_stream(pkts)
1479 self.pg_enable_capture(self.pg_interfaces)
1481 capture = self.pg1.get_capture(len(pkts))
1482 self.verify_capture_out(capture)
1484 def test_static_vrf_aware(self):
1485 """ 1:1 NAT VRF awareness """
1487 nat_ip1 = "10.0.0.30"
1488 nat_ip2 = "10.0.0.40"
1489 self.tcp_port_out = 6303
1490 self.udp_port_out = 6304
1491 self.icmp_id_out = 6305
1493 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1495 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1497 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1499 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1500 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1502 # inside interface VRF match NAT44 static mapping VRF
1503 pkts = self.create_stream_in(self.pg4, self.pg3)
1504 self.pg4.add_stream(pkts)
1505 self.pg_enable_capture(self.pg_interfaces)
1507 capture = self.pg3.get_capture(len(pkts))
1508 self.verify_capture_out(capture, nat_ip1, True)
1510 # inside interface VRF don't match NAT44 static mapping VRF (packets
1512 pkts = self.create_stream_in(self.pg0, self.pg3)
1513 self.pg0.add_stream(pkts)
1514 self.pg_enable_capture(self.pg_interfaces)
1516 self.pg3.assert_nothing_captured()
1518 def test_dynamic_to_static(self):
1519 """ Switch from dynamic translation to 1:1NAT """
1520 nat_ip = "10.0.0.10"
1521 self.tcp_port_out = 6303
1522 self.udp_port_out = 6304
1523 self.icmp_id_out = 6305
1525 self.nat44_add_address(self.nat_addr)
1526 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1527 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1531 pkts = self.create_stream_in(self.pg0, self.pg1)
1532 self.pg0.add_stream(pkts)
1533 self.pg_enable_capture(self.pg_interfaces)
1535 capture = self.pg1.get_capture(len(pkts))
1536 self.verify_capture_out(capture)
1539 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1540 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
1541 self.assertEqual(len(sessions), 0)
1542 pkts = self.create_stream_in(self.pg0, self.pg1)
1543 self.pg0.add_stream(pkts)
1544 self.pg_enable_capture(self.pg_interfaces)
1546 capture = self.pg1.get_capture(len(pkts))
1547 self.verify_capture_out(capture, nat_ip, True)
1549 def test_identity_nat(self):
1550 """ Identity NAT """
1552 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1553 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1554 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1557 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1558 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1559 TCP(sport=12345, dport=56789))
1560 self.pg1.add_stream(p)
1561 self.pg_enable_capture(self.pg_interfaces)
1563 capture = self.pg0.get_capture(1)
1568 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1569 self.assertEqual(ip.src, self.pg1.remote_ip4)
1570 self.assertEqual(tcp.dport, 56789)
1571 self.assertEqual(tcp.sport, 12345)
1572 self.assert_packet_checksums_valid(p)
1574 self.logger.error(ppp("Unexpected or invalid packet:", p))
1577 def test_multiple_inside_interfaces(self):
1578 """ NAT44 multiple non-overlapping address space inside interfaces """
1580 self.nat44_add_address(self.nat_addr)
1581 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1582 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1583 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1586 # between two NAT44 inside interfaces (no translation)
1587 pkts = self.create_stream_in(self.pg0, self.pg1)
1588 self.pg0.add_stream(pkts)
1589 self.pg_enable_capture(self.pg_interfaces)
1591 capture = self.pg1.get_capture(len(pkts))
1592 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1594 # from NAT44 inside to interface without NAT44 feature (no translation)
1595 pkts = self.create_stream_in(self.pg0, self.pg2)
1596 self.pg0.add_stream(pkts)
1597 self.pg_enable_capture(self.pg_interfaces)
1599 capture = self.pg2.get_capture(len(pkts))
1600 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1602 # in2out 1st interface
1603 pkts = self.create_stream_in(self.pg0, self.pg3)
1604 self.pg0.add_stream(pkts)
1605 self.pg_enable_capture(self.pg_interfaces)
1607 capture = self.pg3.get_capture(len(pkts))
1608 self.verify_capture_out(capture)
1610 # out2in 1st interface
1611 pkts = self.create_stream_out(self.pg3)
1612 self.pg3.add_stream(pkts)
1613 self.pg_enable_capture(self.pg_interfaces)
1615 capture = self.pg0.get_capture(len(pkts))
1616 self.verify_capture_in(capture, self.pg0)
1618 # in2out 2nd interface
1619 pkts = self.create_stream_in(self.pg1, self.pg3)
1620 self.pg1.add_stream(pkts)
1621 self.pg_enable_capture(self.pg_interfaces)
1623 capture = self.pg3.get_capture(len(pkts))
1624 self.verify_capture_out(capture)
1626 # out2in 2nd interface
1627 pkts = self.create_stream_out(self.pg3)
1628 self.pg3.add_stream(pkts)
1629 self.pg_enable_capture(self.pg_interfaces)
1631 capture = self.pg1.get_capture(len(pkts))
1632 self.verify_capture_in(capture, self.pg1)
1634 def test_inside_overlapping_interfaces(self):
1635 """ NAT44 multiple inside interfaces with overlapping address space """
1637 static_nat_ip = "10.0.0.10"
1638 self.nat44_add_address(self.nat_addr)
1639 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1641 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1642 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1643 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1644 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1647 # between NAT44 inside interfaces with same VRF (no translation)
1648 pkts = self.create_stream_in(self.pg4, self.pg5)
1649 self.pg4.add_stream(pkts)
1650 self.pg_enable_capture(self.pg_interfaces)
1652 capture = self.pg5.get_capture(len(pkts))
1653 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1655 # between NAT44 inside interfaces with different VRF (hairpinning)
1656 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1657 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1658 TCP(sport=1234, dport=5678))
1659 self.pg4.add_stream(p)
1660 self.pg_enable_capture(self.pg_interfaces)
1662 capture = self.pg6.get_capture(1)
1667 self.assertEqual(ip.src, self.nat_addr)
1668 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1669 self.assertNotEqual(tcp.sport, 1234)
1670 self.assertEqual(tcp.dport, 5678)
1672 self.logger.error(ppp("Unexpected or invalid packet:", p))
1675 # in2out 1st interface
1676 pkts = self.create_stream_in(self.pg4, self.pg3)
1677 self.pg4.add_stream(pkts)
1678 self.pg_enable_capture(self.pg_interfaces)
1680 capture = self.pg3.get_capture(len(pkts))
1681 self.verify_capture_out(capture)
1683 # out2in 1st interface
1684 pkts = self.create_stream_out(self.pg3)
1685 self.pg3.add_stream(pkts)
1686 self.pg_enable_capture(self.pg_interfaces)
1688 capture = self.pg4.get_capture(len(pkts))
1689 self.verify_capture_in(capture, self.pg4)
1691 # in2out 2nd interface
1692 pkts = self.create_stream_in(self.pg5, self.pg3)
1693 self.pg5.add_stream(pkts)
1694 self.pg_enable_capture(self.pg_interfaces)
1696 capture = self.pg3.get_capture(len(pkts))
1697 self.verify_capture_out(capture)
1699 # out2in 2nd interface
1700 pkts = self.create_stream_out(self.pg3)
1701 self.pg3.add_stream(pkts)
1702 self.pg_enable_capture(self.pg_interfaces)
1704 capture = self.pg5.get_capture(len(pkts))
1705 self.verify_capture_in(capture, self.pg5)
1708 addresses = self.vapi.nat44_address_dump()
1709 self.assertEqual(len(addresses), 1)
1710 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1711 self.assertEqual(len(sessions), 3)
1712 for session in sessions:
1713 self.assertFalse(session.is_static)
1714 self.assertEqual(session.inside_ip_address[0:4],
1715 self.pg5.remote_ip4n)
1716 self.assertEqual(session.outside_ip_address,
1717 addresses[0].ip_address)
1718 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1719 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1720 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1721 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1722 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1723 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1724 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1725 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1726 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1728 # in2out 3rd interface
1729 pkts = self.create_stream_in(self.pg6, self.pg3)
1730 self.pg6.add_stream(pkts)
1731 self.pg_enable_capture(self.pg_interfaces)
1733 capture = self.pg3.get_capture(len(pkts))
1734 self.verify_capture_out(capture, static_nat_ip, True)
1736 # out2in 3rd interface
1737 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1738 self.pg3.add_stream(pkts)
1739 self.pg_enable_capture(self.pg_interfaces)
1741 capture = self.pg6.get_capture(len(pkts))
1742 self.verify_capture_in(capture, self.pg6)
1744 # general user and session dump verifications
1745 users = self.vapi.nat44_user_dump()
1746 self.assertTrue(len(users) >= 3)
1747 addresses = self.vapi.nat44_address_dump()
1748 self.assertEqual(len(addresses), 1)
1750 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1752 for session in sessions:
1753 self.assertEqual(user.ip_address, session.inside_ip_address)
1754 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1755 self.assertTrue(session.protocol in
1756 [IP_PROTOS.tcp, IP_PROTOS.udp,
1758 self.assertFalse(session.ext_host_valid)
1761 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1762 self.assertTrue(len(sessions) >= 4)
1763 for session in sessions:
1764 self.assertFalse(session.is_static)
1765 self.assertEqual(session.inside_ip_address[0:4],
1766 self.pg4.remote_ip4n)
1767 self.assertEqual(session.outside_ip_address,
1768 addresses[0].ip_address)
1771 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1772 self.assertTrue(len(sessions) >= 3)
1773 for session in sessions:
1774 self.assertTrue(session.is_static)
1775 self.assertEqual(session.inside_ip_address[0:4],
1776 self.pg6.remote_ip4n)
1777 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1778 map(int, static_nat_ip.split('.')))
1779 self.assertTrue(session.inside_port in
1780 [self.tcp_port_in, self.udp_port_in,
1783 def test_hairpinning(self):
1784 """ NAT44 hairpinning - 1:1 NAPT """
1786 host = self.pg0.remote_hosts[0]
1787 server = self.pg0.remote_hosts[1]
1790 server_in_port = 5678
1791 server_out_port = 8765
1793 self.nat44_add_address(self.nat_addr)
1794 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1795 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1797 # add static mapping for server
1798 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1799 server_in_port, server_out_port,
1800 proto=IP_PROTOS.tcp)
1802 # send packet from host to server
1803 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1804 IP(src=host.ip4, dst=self.nat_addr) /
1805 TCP(sport=host_in_port, dport=server_out_port))
1806 self.pg0.add_stream(p)
1807 self.pg_enable_capture(self.pg_interfaces)
1809 capture = self.pg0.get_capture(1)
1814 self.assertEqual(ip.src, self.nat_addr)
1815 self.assertEqual(ip.dst, server.ip4)
1816 self.assertNotEqual(tcp.sport, host_in_port)
1817 self.assertEqual(tcp.dport, server_in_port)
1818 self.assert_packet_checksums_valid(p)
1819 host_out_port = tcp.sport
1821 self.logger.error(ppp("Unexpected or invalid packet:", p))
1824 # send reply from server to host
1825 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1826 IP(src=server.ip4, dst=self.nat_addr) /
1827 TCP(sport=server_in_port, dport=host_out_port))
1828 self.pg0.add_stream(p)
1829 self.pg_enable_capture(self.pg_interfaces)
1831 capture = self.pg0.get_capture(1)
1836 self.assertEqual(ip.src, self.nat_addr)
1837 self.assertEqual(ip.dst, host.ip4)
1838 self.assertEqual(tcp.sport, server_out_port)
1839 self.assertEqual(tcp.dport, host_in_port)
1840 self.assert_packet_checksums_valid(p)
1842 self.logger.error(ppp("Unexpected or invalid packet:", p))
1845 def test_hairpinning2(self):
1846 """ NAT44 hairpinning - 1:1 NAT"""
1848 server1_nat_ip = "10.0.0.10"
1849 server2_nat_ip = "10.0.0.11"
1850 host = self.pg0.remote_hosts[0]
1851 server1 = self.pg0.remote_hosts[1]
1852 server2 = self.pg0.remote_hosts[2]
1853 server_tcp_port = 22
1854 server_udp_port = 20
1856 self.nat44_add_address(self.nat_addr)
1857 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1858 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1861 # add static mapping for servers
1862 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1863 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1867 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1868 IP(src=host.ip4, dst=server1_nat_ip) /
1869 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1871 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1872 IP(src=host.ip4, dst=server1_nat_ip) /
1873 UDP(sport=self.udp_port_in, dport=server_udp_port))
1875 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1876 IP(src=host.ip4, dst=server1_nat_ip) /
1877 ICMP(id=self.icmp_id_in, type='echo-request'))
1879 self.pg0.add_stream(pkts)
1880 self.pg_enable_capture(self.pg_interfaces)
1882 capture = self.pg0.get_capture(len(pkts))
1883 for packet in capture:
1885 self.assertEqual(packet[IP].src, self.nat_addr)
1886 self.assertEqual(packet[IP].dst, server1.ip4)
1887 if packet.haslayer(TCP):
1888 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1889 self.assertEqual(packet[TCP].dport, server_tcp_port)
1890 self.tcp_port_out = packet[TCP].sport
1891 self.assert_packet_checksums_valid(packet)
1892 elif packet.haslayer(UDP):
1893 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1894 self.assertEqual(packet[UDP].dport, server_udp_port)
1895 self.udp_port_out = packet[UDP].sport
1897 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1898 self.icmp_id_out = packet[ICMP].id
1900 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1905 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1906 IP(src=server1.ip4, dst=self.nat_addr) /
1907 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1909 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1910 IP(src=server1.ip4, dst=self.nat_addr) /
1911 UDP(sport=server_udp_port, dport=self.udp_port_out))
1913 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1914 IP(src=server1.ip4, dst=self.nat_addr) /
1915 ICMP(id=self.icmp_id_out, type='echo-reply'))
1917 self.pg0.add_stream(pkts)
1918 self.pg_enable_capture(self.pg_interfaces)
1920 capture = self.pg0.get_capture(len(pkts))
1921 for packet in capture:
1923 self.assertEqual(packet[IP].src, server1_nat_ip)
1924 self.assertEqual(packet[IP].dst, host.ip4)
1925 if packet.haslayer(TCP):
1926 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1927 self.assertEqual(packet[TCP].sport, server_tcp_port)
1928 self.assert_packet_checksums_valid(packet)
1929 elif packet.haslayer(UDP):
1930 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1931 self.assertEqual(packet[UDP].sport, server_udp_port)
1933 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1935 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1938 # server2 to server1
1940 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1941 IP(src=server2.ip4, dst=server1_nat_ip) /
1942 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1944 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1945 IP(src=server2.ip4, dst=server1_nat_ip) /
1946 UDP(sport=self.udp_port_in, dport=server_udp_port))
1948 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1949 IP(src=server2.ip4, dst=server1_nat_ip) /
1950 ICMP(id=self.icmp_id_in, type='echo-request'))
1952 self.pg0.add_stream(pkts)
1953 self.pg_enable_capture(self.pg_interfaces)
1955 capture = self.pg0.get_capture(len(pkts))
1956 for packet in capture:
1958 self.assertEqual(packet[IP].src, server2_nat_ip)
1959 self.assertEqual(packet[IP].dst, server1.ip4)
1960 if packet.haslayer(TCP):
1961 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1962 self.assertEqual(packet[TCP].dport, server_tcp_port)
1963 self.tcp_port_out = packet[TCP].sport
1964 self.assert_packet_checksums_valid(packet)
1965 elif packet.haslayer(UDP):
1966 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1967 self.assertEqual(packet[UDP].dport, server_udp_port)
1968 self.udp_port_out = packet[UDP].sport
1970 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1971 self.icmp_id_out = packet[ICMP].id
1973 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1976 # server1 to server2
1978 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1979 IP(src=server1.ip4, dst=server2_nat_ip) /
1980 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1982 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1983 IP(src=server1.ip4, dst=server2_nat_ip) /
1984 UDP(sport=server_udp_port, dport=self.udp_port_out))
1986 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1987 IP(src=server1.ip4, dst=server2_nat_ip) /
1988 ICMP(id=self.icmp_id_out, type='echo-reply'))
1990 self.pg0.add_stream(pkts)
1991 self.pg_enable_capture(self.pg_interfaces)
1993 capture = self.pg0.get_capture(len(pkts))
1994 for packet in capture:
1996 self.assertEqual(packet[IP].src, server1_nat_ip)
1997 self.assertEqual(packet[IP].dst, server2.ip4)
1998 if packet.haslayer(TCP):
1999 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2000 self.assertEqual(packet[TCP].sport, server_tcp_port)
2001 self.assert_packet_checksums_valid(packet)
2002 elif packet.haslayer(UDP):
2003 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2004 self.assertEqual(packet[UDP].sport, server_udp_port)
2006 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2008 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2011 def test_max_translations_per_user(self):
2012 """ MAX translations per user - recycle the least recently used """
2014 self.nat44_add_address(self.nat_addr)
2015 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2016 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2019 # get maximum number of translations per user
2020 nat44_config = self.vapi.nat_show_config()
2022 # send more than maximum number of translations per user packets
2023 pkts_num = nat44_config.max_translations_per_user + 5
2025 for port in range(0, pkts_num):
2026 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2027 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2028 TCP(sport=1025 + port))
2030 self.pg0.add_stream(pkts)
2031 self.pg_enable_capture(self.pg_interfaces)
2034 # verify number of translated packet
2035 self.pg1.get_capture(pkts_num)
2037 users = self.vapi.nat44_user_dump()
2039 if user.ip_address == self.pg0.remote_ip4n:
2040 self.assertEqual(user.nsessions,
2041 nat44_config.max_translations_per_user)
2042 self.assertEqual(user.nstaticsessions, 0)
2045 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
2047 proto=IP_PROTOS.tcp)
2048 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2049 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2050 TCP(sport=tcp_port))
2051 self.pg0.add_stream(p)
2052 self.pg_enable_capture(self.pg_interfaces)
2054 self.pg1.get_capture(1)
2055 users = self.vapi.nat44_user_dump()
2057 if user.ip_address == self.pg0.remote_ip4n:
2058 self.assertEqual(user.nsessions,
2059 nat44_config.max_translations_per_user - 1)
2060 self.assertEqual(user.nstaticsessions, 1)
2062 def test_interface_addr(self):
2063 """ Acquire NAT44 addresses from interface """
2064 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2066 # no address in NAT pool
2067 adresses = self.vapi.nat44_address_dump()
2068 self.assertEqual(0, len(adresses))
2070 # configure interface address and check NAT address pool
2071 self.pg7.config_ip4()
2072 adresses = self.vapi.nat44_address_dump()
2073 self.assertEqual(1, len(adresses))
2074 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2076 # remove interface address and check NAT address pool
2077 self.pg7.unconfig_ip4()
2078 adresses = self.vapi.nat44_address_dump()
2079 self.assertEqual(0, len(adresses))
2081 def test_interface_addr_static_mapping(self):
2082 """ Static mapping with addresses from interface """
2085 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2086 self.nat44_add_static_mapping(
2088 external_sw_if_index=self.pg7.sw_if_index,
2091 # static mappings with external interface
2092 static_mappings = self.vapi.nat44_static_mapping_dump()
2093 self.assertEqual(1, len(static_mappings))
2094 self.assertEqual(self.pg7.sw_if_index,
2095 static_mappings[0].external_sw_if_index)
2096 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2098 # configure interface address and check static mappings
2099 self.pg7.config_ip4()
2100 static_mappings = self.vapi.nat44_static_mapping_dump()
2101 self.assertEqual(2, len(static_mappings))
2103 for sm in static_mappings:
2104 if sm.external_sw_if_index == 0xFFFFFFFF:
2105 self.assertEqual(sm.external_ip_address[0:4],
2106 self.pg7.local_ip4n)
2107 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2109 self.assertTrue(resolved)
2111 # remove interface address and check static mappings
2112 self.pg7.unconfig_ip4()
2113 static_mappings = self.vapi.nat44_static_mapping_dump()
2114 self.assertEqual(1, len(static_mappings))
2115 self.assertEqual(self.pg7.sw_if_index,
2116 static_mappings[0].external_sw_if_index)
2117 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2119 # configure interface address again and check static mappings
2120 self.pg7.config_ip4()
2121 static_mappings = self.vapi.nat44_static_mapping_dump()
2122 self.assertEqual(2, len(static_mappings))
2124 for sm in static_mappings:
2125 if sm.external_sw_if_index == 0xFFFFFFFF:
2126 self.assertEqual(sm.external_ip_address[0:4],
2127 self.pg7.local_ip4n)
2128 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2130 self.assertTrue(resolved)
2132 # remove static mapping
2133 self.nat44_add_static_mapping(
2135 external_sw_if_index=self.pg7.sw_if_index,
2138 static_mappings = self.vapi.nat44_static_mapping_dump()
2139 self.assertEqual(0, len(static_mappings))
2141 def test_interface_addr_identity_nat(self):
2142 """ Identity NAT with addresses from interface """
2145 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2146 self.vapi.nat44_add_del_identity_mapping(
2147 sw_if_index=self.pg7.sw_if_index,
2149 protocol=IP_PROTOS.tcp,
2152 # identity mappings with external interface
2153 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2154 self.assertEqual(1, len(identity_mappings))
2155 self.assertEqual(self.pg7.sw_if_index,
2156 identity_mappings[0].sw_if_index)
2158 # configure interface address and check identity mappings
2159 self.pg7.config_ip4()
2160 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2162 self.assertEqual(2, len(identity_mappings))
2163 for sm in identity_mappings:
2164 if sm.sw_if_index == 0xFFFFFFFF:
2165 self.assertEqual(identity_mappings[0].ip_address,
2166 self.pg7.local_ip4n)
2167 self.assertEqual(port, identity_mappings[0].port)
2168 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2170 self.assertTrue(resolved)
2172 # remove interface address and check identity mappings
2173 self.pg7.unconfig_ip4()
2174 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2175 self.assertEqual(1, len(identity_mappings))
2176 self.assertEqual(self.pg7.sw_if_index,
2177 identity_mappings[0].sw_if_index)
2179 def test_ipfix_nat44_sess(self):
2180 """ IPFIX logging NAT44 session created/delted """
2181 self.ipfix_domain_id = 10
2182 self.ipfix_src_port = 20202
2183 colector_port = 30303
2184 bind_layers(UDP, IPFIX, dport=30303)
2185 self.nat44_add_address(self.nat_addr)
2186 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2187 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2189 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2190 src_address=self.pg3.local_ip4n,
2192 template_interval=10,
2193 collector_port=colector_port)
2194 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2195 src_port=self.ipfix_src_port)
2197 pkts = self.create_stream_in(self.pg0, self.pg1)
2198 self.pg0.add_stream(pkts)
2199 self.pg_enable_capture(self.pg_interfaces)
2201 capture = self.pg1.get_capture(len(pkts))
2202 self.verify_capture_out(capture)
2203 self.nat44_add_address(self.nat_addr, is_add=0)
2204 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2205 capture = self.pg3.get_capture(9)
2206 ipfix = IPFIXDecoder()
2207 # first load template
2209 self.assertTrue(p.haslayer(IPFIX))
2210 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2211 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2212 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2213 self.assertEqual(p[UDP].dport, colector_port)
2214 self.assertEqual(p[IPFIX].observationDomainID,
2215 self.ipfix_domain_id)
2216 if p.haslayer(Template):
2217 ipfix.add_template(p.getlayer(Template))
2218 # verify events in data set
2220 if p.haslayer(Data):
2221 data = ipfix.decode_data_set(p.getlayer(Set))
2222 self.verify_ipfix_nat44_ses(data)
2224 def test_ipfix_addr_exhausted(self):
2225 """ IPFIX logging NAT addresses exhausted """
2226 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2227 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2229 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2230 src_address=self.pg3.local_ip4n,
2232 template_interval=10)
2233 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2234 src_port=self.ipfix_src_port)
2236 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2237 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2239 self.pg0.add_stream(p)
2240 self.pg_enable_capture(self.pg_interfaces)
2242 self.pg1.assert_nothing_captured()
2244 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2245 capture = self.pg3.get_capture(9)
2246 ipfix = IPFIXDecoder()
2247 # first load template
2249 self.assertTrue(p.haslayer(IPFIX))
2250 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2251 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2252 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2253 self.assertEqual(p[UDP].dport, 4739)
2254 self.assertEqual(p[IPFIX].observationDomainID,
2255 self.ipfix_domain_id)
2256 if p.haslayer(Template):
2257 ipfix.add_template(p.getlayer(Template))
2258 # verify events in data set
2260 if p.haslayer(Data):
2261 data = ipfix.decode_data_set(p.getlayer(Set))
2262 self.verify_ipfix_addr_exhausted(data)
2264 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2265 def test_ipfix_max_sessions(self):
2266 """ IPFIX logging maximum session entries exceeded """
2267 self.nat44_add_address(self.nat_addr)
2268 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2269 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2272 nat44_config = self.vapi.nat_show_config()
2273 max_sessions = 10 * nat44_config.translation_buckets
2276 for i in range(0, max_sessions):
2277 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2278 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2279 IP(src=src, dst=self.pg1.remote_ip4) /
2282 self.pg0.add_stream(pkts)
2283 self.pg_enable_capture(self.pg_interfaces)
2286 self.pg1.get_capture(max_sessions)
2287 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2288 src_address=self.pg3.local_ip4n,
2290 template_interval=10)
2291 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2292 src_port=self.ipfix_src_port)
2294 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2295 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2297 self.pg0.add_stream(p)
2298 self.pg_enable_capture(self.pg_interfaces)
2300 self.pg1.assert_nothing_captured()
2302 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2303 capture = self.pg3.get_capture(9)
2304 ipfix = IPFIXDecoder()
2305 # first load template
2307 self.assertTrue(p.haslayer(IPFIX))
2308 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2309 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2310 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2311 self.assertEqual(p[UDP].dport, 4739)
2312 self.assertEqual(p[IPFIX].observationDomainID,
2313 self.ipfix_domain_id)
2314 if p.haslayer(Template):
2315 ipfix.add_template(p.getlayer(Template))
2316 # verify events in data set
2318 if p.haslayer(Data):
2319 data = ipfix.decode_data_set(p.getlayer(Set))
2320 self.verify_ipfix_max_sessions(data, max_sessions)
2322 def test_pool_addr_fib(self):
2323 """ NAT44 add pool addresses to FIB """
2324 static_addr = '10.0.0.10'
2325 self.nat44_add_address(self.nat_addr)
2326 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2327 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2329 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2332 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2333 ARP(op=ARP.who_has, pdst=self.nat_addr,
2334 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2335 self.pg1.add_stream(p)
2336 self.pg_enable_capture(self.pg_interfaces)
2338 capture = self.pg1.get_capture(1)
2339 self.assertTrue(capture[0].haslayer(ARP))
2340 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2343 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2344 ARP(op=ARP.who_has, pdst=static_addr,
2345 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2346 self.pg1.add_stream(p)
2347 self.pg_enable_capture(self.pg_interfaces)
2349 capture = self.pg1.get_capture(1)
2350 self.assertTrue(capture[0].haslayer(ARP))
2351 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2353 # send ARP to non-NAT44 interface
2354 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2355 ARP(op=ARP.who_has, pdst=self.nat_addr,
2356 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2357 self.pg2.add_stream(p)
2358 self.pg_enable_capture(self.pg_interfaces)
2360 self.pg1.assert_nothing_captured()
2362 # remove addresses and verify
2363 self.nat44_add_address(self.nat_addr, is_add=0)
2364 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2367 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2368 ARP(op=ARP.who_has, pdst=self.nat_addr,
2369 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2370 self.pg1.add_stream(p)
2371 self.pg_enable_capture(self.pg_interfaces)
2373 self.pg1.assert_nothing_captured()
2375 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2376 ARP(op=ARP.who_has, pdst=static_addr,
2377 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2378 self.pg1.add_stream(p)
2379 self.pg_enable_capture(self.pg_interfaces)
2381 self.pg1.assert_nothing_captured()
2383 def test_vrf_mode(self):
2384 """ NAT44 tenant VRF aware address pool mode """
2388 nat_ip1 = "10.0.0.10"
2389 nat_ip2 = "10.0.0.11"
2391 self.pg0.unconfig_ip4()
2392 self.pg1.unconfig_ip4()
2393 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2394 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2395 self.pg0.set_table_ip4(vrf_id1)
2396 self.pg1.set_table_ip4(vrf_id2)
2397 self.pg0.config_ip4()
2398 self.pg1.config_ip4()
2399 self.pg0.resolve_arp()
2400 self.pg1.resolve_arp()
2402 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2403 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2404 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2405 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2406 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2411 pkts = self.create_stream_in(self.pg0, self.pg2)
2412 self.pg0.add_stream(pkts)
2413 self.pg_enable_capture(self.pg_interfaces)
2415 capture = self.pg2.get_capture(len(pkts))
2416 self.verify_capture_out(capture, nat_ip1)
2419 pkts = self.create_stream_in(self.pg1, self.pg2)
2420 self.pg1.add_stream(pkts)
2421 self.pg_enable_capture(self.pg_interfaces)
2423 capture = self.pg2.get_capture(len(pkts))
2424 self.verify_capture_out(capture, nat_ip2)
2427 self.pg0.unconfig_ip4()
2428 self.pg1.unconfig_ip4()
2429 self.pg0.set_table_ip4(0)
2430 self.pg1.set_table_ip4(0)
2431 self.pg0.config_ip4()
2432 self.pg1.config_ip4()
2433 self.pg0.resolve_arp()
2434 self.pg1.resolve_arp()
2435 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2436 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2438 def test_vrf_feature_independent(self):
2439 """ NAT44 tenant VRF independent address pool mode """
2441 nat_ip1 = "10.0.0.10"
2442 nat_ip2 = "10.0.0.11"
2444 self.nat44_add_address(nat_ip1)
2445 self.nat44_add_address(nat_ip2, vrf_id=99)
2446 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2447 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2448 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2452 pkts = self.create_stream_in(self.pg0, self.pg2)
2453 self.pg0.add_stream(pkts)
2454 self.pg_enable_capture(self.pg_interfaces)
2456 capture = self.pg2.get_capture(len(pkts))
2457 self.verify_capture_out(capture, nat_ip1)
2460 pkts = self.create_stream_in(self.pg1, self.pg2)
2461 self.pg1.add_stream(pkts)
2462 self.pg_enable_capture(self.pg_interfaces)
2464 capture = self.pg2.get_capture(len(pkts))
2465 self.verify_capture_out(capture, nat_ip1)
2467 def test_dynamic_ipless_interfaces(self):
2468 """ NAT44 interfaces without configured IP address """
2470 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2471 mactobinary(self.pg7.remote_mac),
2472 self.pg7.remote_ip4n,
2474 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2475 mactobinary(self.pg8.remote_mac),
2476 self.pg8.remote_ip4n,
2479 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2480 dst_address_length=32,
2481 next_hop_address=self.pg7.remote_ip4n,
2482 next_hop_sw_if_index=self.pg7.sw_if_index)
2483 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2484 dst_address_length=32,
2485 next_hop_address=self.pg8.remote_ip4n,
2486 next_hop_sw_if_index=self.pg8.sw_if_index)
2488 self.nat44_add_address(self.nat_addr)
2489 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2490 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2494 pkts = self.create_stream_in(self.pg7, self.pg8)
2495 self.pg7.add_stream(pkts)
2496 self.pg_enable_capture(self.pg_interfaces)
2498 capture = self.pg8.get_capture(len(pkts))
2499 self.verify_capture_out(capture)
2502 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2503 self.pg8.add_stream(pkts)
2504 self.pg_enable_capture(self.pg_interfaces)
2506 capture = self.pg7.get_capture(len(pkts))
2507 self.verify_capture_in(capture, self.pg7)
2509 def test_static_ipless_interfaces(self):
2510 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2512 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2513 mactobinary(self.pg7.remote_mac),
2514 self.pg7.remote_ip4n,
2516 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2517 mactobinary(self.pg8.remote_mac),
2518 self.pg8.remote_ip4n,
2521 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2522 dst_address_length=32,
2523 next_hop_address=self.pg7.remote_ip4n,
2524 next_hop_sw_if_index=self.pg7.sw_if_index)
2525 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2526 dst_address_length=32,
2527 next_hop_address=self.pg8.remote_ip4n,
2528 next_hop_sw_if_index=self.pg8.sw_if_index)
2530 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2531 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2532 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2536 pkts = self.create_stream_out(self.pg8)
2537 self.pg8.add_stream(pkts)
2538 self.pg_enable_capture(self.pg_interfaces)
2540 capture = self.pg7.get_capture(len(pkts))
2541 self.verify_capture_in(capture, self.pg7)
2544 pkts = self.create_stream_in(self.pg7, self.pg8)
2545 self.pg7.add_stream(pkts)
2546 self.pg_enable_capture(self.pg_interfaces)
2548 capture = self.pg8.get_capture(len(pkts))
2549 self.verify_capture_out(capture, self.nat_addr, True)
2551 def test_static_with_port_ipless_interfaces(self):
2552 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2554 self.tcp_port_out = 30606
2555 self.udp_port_out = 30607
2556 self.icmp_id_out = 30608
2558 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2559 mactobinary(self.pg7.remote_mac),
2560 self.pg7.remote_ip4n,
2562 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2563 mactobinary(self.pg8.remote_mac),
2564 self.pg8.remote_ip4n,
2567 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2568 dst_address_length=32,
2569 next_hop_address=self.pg7.remote_ip4n,
2570 next_hop_sw_if_index=self.pg7.sw_if_index)
2571 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2572 dst_address_length=32,
2573 next_hop_address=self.pg8.remote_ip4n,
2574 next_hop_sw_if_index=self.pg8.sw_if_index)
2576 self.nat44_add_address(self.nat_addr)
2577 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2578 self.tcp_port_in, self.tcp_port_out,
2579 proto=IP_PROTOS.tcp)
2580 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2581 self.udp_port_in, self.udp_port_out,
2582 proto=IP_PROTOS.udp)
2583 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2584 self.icmp_id_in, self.icmp_id_out,
2585 proto=IP_PROTOS.icmp)
2586 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2587 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2591 pkts = self.create_stream_out(self.pg8)
2592 self.pg8.add_stream(pkts)
2593 self.pg_enable_capture(self.pg_interfaces)
2595 capture = self.pg7.get_capture(len(pkts))
2596 self.verify_capture_in(capture, self.pg7)
2599 pkts = self.create_stream_in(self.pg7, self.pg8)
2600 self.pg7.add_stream(pkts)
2601 self.pg_enable_capture(self.pg_interfaces)
2603 capture = self.pg8.get_capture(len(pkts))
2604 self.verify_capture_out(capture)
2606 def test_static_unknown_proto(self):
2607 """ 1:1 NAT translate packet with unknown protocol """
2608 nat_ip = "10.0.0.10"
2609 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2610 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2611 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2615 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2616 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2618 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2619 TCP(sport=1234, dport=1234))
2620 self.pg0.add_stream(p)
2621 self.pg_enable_capture(self.pg_interfaces)
2623 p = self.pg1.get_capture(1)
2626 self.assertEqual(packet[IP].src, nat_ip)
2627 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2628 self.assertTrue(packet.haslayer(GRE))
2629 self.assert_packet_checksums_valid(packet)
2631 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2635 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2636 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2638 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2639 TCP(sport=1234, dport=1234))
2640 self.pg1.add_stream(p)
2641 self.pg_enable_capture(self.pg_interfaces)
2643 p = self.pg0.get_capture(1)
2646 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2647 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2648 self.assertTrue(packet.haslayer(GRE))
2649 self.assert_packet_checksums_valid(packet)
2651 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2654 def test_hairpinning_static_unknown_proto(self):
2655 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2657 host = self.pg0.remote_hosts[0]
2658 server = self.pg0.remote_hosts[1]
2660 host_nat_ip = "10.0.0.10"
2661 server_nat_ip = "10.0.0.11"
2663 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2664 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2665 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2666 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2670 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2671 IP(src=host.ip4, dst=server_nat_ip) /
2673 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2674 TCP(sport=1234, dport=1234))
2675 self.pg0.add_stream(p)
2676 self.pg_enable_capture(self.pg_interfaces)
2678 p = self.pg0.get_capture(1)
2681 self.assertEqual(packet[IP].src, host_nat_ip)
2682 self.assertEqual(packet[IP].dst, server.ip4)
2683 self.assertTrue(packet.haslayer(GRE))
2684 self.assert_packet_checksums_valid(packet)
2686 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2690 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2691 IP(src=server.ip4, dst=host_nat_ip) /
2693 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2694 TCP(sport=1234, dport=1234))
2695 self.pg0.add_stream(p)
2696 self.pg_enable_capture(self.pg_interfaces)
2698 p = self.pg0.get_capture(1)
2701 self.assertEqual(packet[IP].src, server_nat_ip)
2702 self.assertEqual(packet[IP].dst, host.ip4)
2703 self.assertTrue(packet.haslayer(GRE))
2704 self.assert_packet_checksums_valid(packet)
2706 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2709 def test_output_feature(self):
2710 """ NAT44 interface output feature (in2out postrouting) """
2711 self.nat44_add_address(self.nat_addr)
2712 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2713 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2714 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2718 pkts = self.create_stream_in(self.pg0, self.pg3)
2719 self.pg0.add_stream(pkts)
2720 self.pg_enable_capture(self.pg_interfaces)
2722 capture = self.pg3.get_capture(len(pkts))
2723 self.verify_capture_out(capture)
2726 pkts = self.create_stream_out(self.pg3)
2727 self.pg3.add_stream(pkts)
2728 self.pg_enable_capture(self.pg_interfaces)
2730 capture = self.pg0.get_capture(len(pkts))
2731 self.verify_capture_in(capture, self.pg0)
2733 # from non-NAT interface to NAT inside interface
2734 pkts = self.create_stream_in(self.pg2, self.pg0)
2735 self.pg2.add_stream(pkts)
2736 self.pg_enable_capture(self.pg_interfaces)
2738 capture = self.pg0.get_capture(len(pkts))
2739 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2741 def test_output_feature_vrf_aware(self):
2742 """ NAT44 interface output feature VRF aware (in2out postrouting) """
2743 nat_ip_vrf10 = "10.0.0.10"
2744 nat_ip_vrf20 = "10.0.0.20"
2746 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2747 dst_address_length=32,
2748 next_hop_address=self.pg3.remote_ip4n,
2749 next_hop_sw_if_index=self.pg3.sw_if_index,
2751 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2752 dst_address_length=32,
2753 next_hop_address=self.pg3.remote_ip4n,
2754 next_hop_sw_if_index=self.pg3.sw_if_index,
2757 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2758 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2759 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2760 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2761 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2765 pkts = self.create_stream_in(self.pg4, self.pg3)
2766 self.pg4.add_stream(pkts)
2767 self.pg_enable_capture(self.pg_interfaces)
2769 capture = self.pg3.get_capture(len(pkts))
2770 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2773 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2774 self.pg3.add_stream(pkts)
2775 self.pg_enable_capture(self.pg_interfaces)
2777 capture = self.pg4.get_capture(len(pkts))
2778 self.verify_capture_in(capture, self.pg4)
2781 pkts = self.create_stream_in(self.pg6, self.pg3)
2782 self.pg6.add_stream(pkts)
2783 self.pg_enable_capture(self.pg_interfaces)
2785 capture = self.pg3.get_capture(len(pkts))
2786 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2789 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2790 self.pg3.add_stream(pkts)
2791 self.pg_enable_capture(self.pg_interfaces)
2793 capture = self.pg6.get_capture(len(pkts))
2794 self.verify_capture_in(capture, self.pg6)
2796 def test_output_feature_hairpinning(self):
2797 """ NAT44 interface output feature hairpinning (in2out postrouting) """
2798 host = self.pg0.remote_hosts[0]
2799 server = self.pg0.remote_hosts[1]
2802 server_in_port = 5678
2803 server_out_port = 8765
2805 self.nat44_add_address(self.nat_addr)
2806 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2807 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2810 # add static mapping for server
2811 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2812 server_in_port, server_out_port,
2813 proto=IP_PROTOS.tcp)
2815 # send packet from host to server
2816 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2817 IP(src=host.ip4, dst=self.nat_addr) /
2818 TCP(sport=host_in_port, dport=server_out_port))
2819 self.pg0.add_stream(p)
2820 self.pg_enable_capture(self.pg_interfaces)
2822 capture = self.pg0.get_capture(1)
2827 self.assertEqual(ip.src, self.nat_addr)
2828 self.assertEqual(ip.dst, server.ip4)
2829 self.assertNotEqual(tcp.sport, host_in_port)
2830 self.assertEqual(tcp.dport, server_in_port)
2831 self.assert_packet_checksums_valid(p)
2832 host_out_port = tcp.sport
2834 self.logger.error(ppp("Unexpected or invalid packet:", p))
2837 # send reply from server to host
2838 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2839 IP(src=server.ip4, dst=self.nat_addr) /
2840 TCP(sport=server_in_port, dport=host_out_port))
2841 self.pg0.add_stream(p)
2842 self.pg_enable_capture(self.pg_interfaces)
2844 capture = self.pg0.get_capture(1)
2849 self.assertEqual(ip.src, self.nat_addr)
2850 self.assertEqual(ip.dst, host.ip4)
2851 self.assertEqual(tcp.sport, server_out_port)
2852 self.assertEqual(tcp.dport, host_in_port)
2853 self.assert_packet_checksums_valid(p)
2855 self.logger.error(ppp("Unexpected or invalid packet:", p))
2858 def test_one_armed_nat44(self):
2859 """ One armed NAT44 """
2860 remote_host = self.pg9.remote_hosts[0]
2861 local_host = self.pg9.remote_hosts[1]
2864 self.nat44_add_address(self.nat_addr)
2865 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2866 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2870 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2871 IP(src=local_host.ip4, dst=remote_host.ip4) /
2872 TCP(sport=12345, dport=80))
2873 self.pg9.add_stream(p)
2874 self.pg_enable_capture(self.pg_interfaces)
2876 capture = self.pg9.get_capture(1)
2881 self.assertEqual(ip.src, self.nat_addr)
2882 self.assertEqual(ip.dst, remote_host.ip4)
2883 self.assertNotEqual(tcp.sport, 12345)
2884 external_port = tcp.sport
2885 self.assertEqual(tcp.dport, 80)
2886 self.assert_packet_checksums_valid(p)
2888 self.logger.error(ppp("Unexpected or invalid packet:", p))
2892 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2893 IP(src=remote_host.ip4, dst=self.nat_addr) /
2894 TCP(sport=80, dport=external_port))
2895 self.pg9.add_stream(p)
2896 self.pg_enable_capture(self.pg_interfaces)
2898 capture = self.pg9.get_capture(1)
2903 self.assertEqual(ip.src, remote_host.ip4)
2904 self.assertEqual(ip.dst, local_host.ip4)
2905 self.assertEqual(tcp.sport, 80)
2906 self.assertEqual(tcp.dport, 12345)
2907 self.assert_packet_checksums_valid(p)
2909 self.logger.error(ppp("Unexpected or invalid packet:", p))
2912 def test_del_session(self):
2913 """ Delete NAT44 session """
2914 self.nat44_add_address(self.nat_addr)
2915 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2916 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2919 pkts = self.create_stream_in(self.pg0, self.pg1)
2920 self.pg0.add_stream(pkts)
2921 self.pg_enable_capture(self.pg_interfaces)
2923 self.pg1.get_capture(len(pkts))
2925 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2926 nsessions = len(sessions)
2928 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2929 sessions[0].inside_port,
2930 sessions[0].protocol)
2931 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2932 sessions[1].outside_port,
2933 sessions[1].protocol,
2936 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2937 self.assertEqual(nsessions - len(sessions), 2)
2939 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2940 sessions[0].inside_port,
2941 sessions[0].protocol)
2943 self.verify_no_nat44_user()
2945 def test_set_get_reass(self):
2946 """ NAT44 set/get virtual fragmentation reassembly """
2947 reas_cfg1 = self.vapi.nat_get_reass()
2949 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
2950 max_reass=reas_cfg1.ip4_max_reass * 2,
2951 max_frag=reas_cfg1.ip4_max_frag * 2)
2953 reas_cfg2 = self.vapi.nat_get_reass()
2955 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
2956 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
2957 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
2959 self.vapi.nat_set_reass(drop_frag=1)
2960 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
2962 def test_frag_in_order(self):
2963 """ NAT44 translate fragments arriving in order """
2964 self.nat44_add_address(self.nat_addr)
2965 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2966 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2969 data = "A" * 4 + "B" * 16 + "C" * 3
2970 self.tcp_port_in = random.randint(1025, 65535)
2972 reass = self.vapi.nat_reass_dump()
2973 reass_n_start = len(reass)
2976 pkts = self.create_stream_frag(self.pg0,
2977 self.pg1.remote_ip4,
2981 self.pg0.add_stream(pkts)
2982 self.pg_enable_capture(self.pg_interfaces)
2984 frags = self.pg1.get_capture(len(pkts))
2985 p = self.reass_frags_and_verify(frags,
2987 self.pg1.remote_ip4)
2988 self.assertEqual(p[TCP].dport, 20)
2989 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2990 self.tcp_port_out = p[TCP].sport
2991 self.assertEqual(data, p[Raw].load)
2994 pkts = self.create_stream_frag(self.pg1,
2999 self.pg1.add_stream(pkts)
3000 self.pg_enable_capture(self.pg_interfaces)
3002 frags = self.pg0.get_capture(len(pkts))
3003 p = self.reass_frags_and_verify(frags,
3004 self.pg1.remote_ip4,
3005 self.pg0.remote_ip4)
3006 self.assertEqual(p[TCP].sport, 20)
3007 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3008 self.assertEqual(data, p[Raw].load)
3010 reass = self.vapi.nat_reass_dump()
3011 reass_n_end = len(reass)
3013 self.assertEqual(reass_n_end - reass_n_start, 2)
3015 def test_reass_hairpinning(self):
3016 """ NAT44 fragments hairpinning """
3017 server = self.pg0.remote_hosts[1]
3018 host_in_port = random.randint(1025, 65535)
3019 server_in_port = random.randint(1025, 65535)
3020 server_out_port = random.randint(1025, 65535)
3021 data = "A" * 4 + "B" * 16 + "C" * 3
3023 self.nat44_add_address(self.nat_addr)
3024 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3025 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3027 # add static mapping for server
3028 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3029 server_in_port, server_out_port,
3030 proto=IP_PROTOS.tcp)
3032 # send packet from host to server
3033 pkts = self.create_stream_frag(self.pg0,
3038 self.pg0.add_stream(pkts)
3039 self.pg_enable_capture(self.pg_interfaces)
3041 frags = self.pg0.get_capture(len(pkts))
3042 p = self.reass_frags_and_verify(frags,
3045 self.assertNotEqual(p[TCP].sport, host_in_port)
3046 self.assertEqual(p[TCP].dport, server_in_port)
3047 self.assertEqual(data, p[Raw].load)
3049 def test_frag_out_of_order(self):
3050 """ NAT44 translate fragments arriving out of order """
3051 self.nat44_add_address(self.nat_addr)
3052 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3053 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3056 data = "A" * 4 + "B" * 16 + "C" * 3
3057 random.randint(1025, 65535)
3060 pkts = self.create_stream_frag(self.pg0,
3061 self.pg1.remote_ip4,
3066 self.pg0.add_stream(pkts)
3067 self.pg_enable_capture(self.pg_interfaces)
3069 frags = self.pg1.get_capture(len(pkts))
3070 p = self.reass_frags_and_verify(frags,
3072 self.pg1.remote_ip4)
3073 self.assertEqual(p[TCP].dport, 20)
3074 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3075 self.tcp_port_out = p[TCP].sport
3076 self.assertEqual(data, p[Raw].load)
3079 pkts = self.create_stream_frag(self.pg1,
3085 self.pg1.add_stream(pkts)
3086 self.pg_enable_capture(self.pg_interfaces)
3088 frags = self.pg0.get_capture(len(pkts))
3089 p = self.reass_frags_and_verify(frags,
3090 self.pg1.remote_ip4,
3091 self.pg0.remote_ip4)
3092 self.assertEqual(p[TCP].sport, 20)
3093 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3094 self.assertEqual(data, p[Raw].load)
3096 def test_port_restricted(self):
3097 """ Port restricted NAT44 (MAP-E CE) """
3098 self.nat44_add_address(self.nat_addr)
3099 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3100 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3102 self.vapi.nat_set_addr_and_port_alloc_alg(alg=1,
3107 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3108 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3109 TCP(sport=4567, dport=22))
3110 self.pg0.add_stream(p)
3111 self.pg_enable_capture(self.pg_interfaces)
3113 capture = self.pg1.get_capture(1)
3118 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3119 self.assertEqual(ip.src, self.nat_addr)
3120 self.assertEqual(tcp.dport, 22)
3121 self.assertNotEqual(tcp.sport, 4567)
3122 self.assertEqual((tcp.sport >> 6) & 63, 10)
3123 self.assert_packet_checksums_valid(p)
3125 self.logger.error(ppp("Unexpected or invalid packet:", p))
3128 def test_port_range(self):
3129 """ External address port range """
3130 self.nat44_add_address(self.nat_addr)
3131 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3132 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3134 self.vapi.nat_set_addr_and_port_alloc_alg(alg=2,
3139 for port in range(0, 5):
3140 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3141 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3142 TCP(sport=1125 + port))
3144 self.pg0.add_stream(pkts)
3145 self.pg_enable_capture(self.pg_interfaces)
3147 capture = self.pg1.get_capture(3)
3150 self.assertGreaterEqual(tcp.sport, 1025)
3151 self.assertLessEqual(tcp.sport, 1027)
3153 def test_ipfix_max_frags(self):
3154 """ IPFIX logging maximum fragments pending reassembly exceeded """
3155 self.nat44_add_address(self.nat_addr)
3156 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3157 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3159 self.vapi.nat_set_reass(max_frag=0)
3160 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3161 src_address=self.pg3.local_ip4n,
3163 template_interval=10)
3164 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3165 src_port=self.ipfix_src_port)
3167 data = "A" * 4 + "B" * 16 + "C" * 3
3168 self.tcp_port_in = random.randint(1025, 65535)
3169 pkts = self.create_stream_frag(self.pg0,
3170 self.pg1.remote_ip4,
3174 self.pg0.add_stream(pkts[-1])
3175 self.pg_enable_capture(self.pg_interfaces)
3177 self.pg1.assert_nothing_captured()
3179 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3180 capture = self.pg3.get_capture(9)
3181 ipfix = IPFIXDecoder()
3182 # first load template
3184 self.assertTrue(p.haslayer(IPFIX))
3185 self.assertEqual(p[IP].src, self.pg3.local_ip4)
3186 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3187 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3188 self.assertEqual(p[UDP].dport, 4739)
3189 self.assertEqual(p[IPFIX].observationDomainID,
3190 self.ipfix_domain_id)
3191 if p.haslayer(Template):
3192 ipfix.add_template(p.getlayer(Template))
3193 # verify events in data set
3195 if p.haslayer(Data):
3196 data = ipfix.decode_data_set(p.getlayer(Set))
3197 self.verify_ipfix_max_fragments_ip4(data, 0,
3198 self.pg0.remote_ip4n)
3200 def test_multiple_outside_vrf(self):
3201 """ Multiple outside VRF """
3205 self.pg1.unconfig_ip4()
3206 self.pg2.unconfig_ip4()
3207 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
3208 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
3209 self.pg1.set_table_ip4(vrf_id1)
3210 self.pg2.set_table_ip4(vrf_id2)
3211 self.pg1.config_ip4()
3212 self.pg2.config_ip4()
3213 self.pg1.resolve_arp()
3214 self.pg2.resolve_arp()
3216 self.nat44_add_address(self.nat_addr)
3217 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3218 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3220 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
3225 pkts = self.create_stream_in(self.pg0, self.pg1)
3226 self.pg0.add_stream(pkts)
3227 self.pg_enable_capture(self.pg_interfaces)
3229 capture = self.pg1.get_capture(len(pkts))
3230 self.verify_capture_out(capture, self.nat_addr)
3232 pkts = self.create_stream_out(self.pg1, self.nat_addr)
3233 self.pg1.add_stream(pkts)
3234 self.pg_enable_capture(self.pg_interfaces)
3236 capture = self.pg0.get_capture(len(pkts))
3237 self.verify_capture_in(capture, self.pg0)
3239 self.tcp_port_in = 60303
3240 self.udp_port_in = 60304
3241 self.icmp_id_in = 60305
3244 pkts = self.create_stream_in(self.pg0, self.pg2)
3245 self.pg0.add_stream(pkts)
3246 self.pg_enable_capture(self.pg_interfaces)
3248 capture = self.pg2.get_capture(len(pkts))
3249 self.verify_capture_out(capture, self.nat_addr)
3251 pkts = self.create_stream_out(self.pg2, self.nat_addr)
3252 self.pg2.add_stream(pkts)
3253 self.pg_enable_capture(self.pg_interfaces)
3255 capture = self.pg0.get_capture(len(pkts))
3256 self.verify_capture_in(capture, self.pg0)
3259 self.pg1.unconfig_ip4()
3260 self.pg2.unconfig_ip4()
3261 self.pg1.set_table_ip4(0)
3262 self.pg2.set_table_ip4(0)
3263 self.pg1.config_ip4()
3264 self.pg2.config_ip4()
3265 self.pg1.resolve_arp()
3266 self.pg2.resolve_arp()
3268 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3269 def test_session_timeout(self):
3270 """ NAT44 session timeouts """
3271 self.nat44_add_address(self.nat_addr)
3272 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3273 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3275 self.vapi.nat_set_timeouts(udp=5)
3279 for i in range(0, max_sessions):
3280 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
3281 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3282 IP(src=src, dst=self.pg1.remote_ip4) /
3283 UDP(sport=1025, dport=53))
3285 self.pg0.add_stream(pkts)
3286 self.pg_enable_capture(self.pg_interfaces)
3288 self.pg1.get_capture(max_sessions)
3293 for i in range(0, max_sessions):
3294 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
3295 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3296 IP(src=src, dst=self.pg1.remote_ip4) /
3297 UDP(sport=1026, dport=53))
3299 self.pg0.add_stream(pkts)
3300 self.pg_enable_capture(self.pg_interfaces)
3302 self.pg1.get_capture(max_sessions)
3305 users = self.vapi.nat44_user_dump()
3307 nsessions = nsessions + user.nsessions
3308 self.assertLess(nsessions, 2 * max_sessions)
3311 super(TestNAT44, self).tearDown()
3312 if not self.vpp_dead:
3313 self.logger.info(self.vapi.cli("show nat44 addresses"))
3314 self.logger.info(self.vapi.cli("show nat44 interfaces"))
3315 self.logger.info(self.vapi.cli("show nat44 static mappings"))
3316 self.logger.info(self.vapi.cli("show nat44 interface address"))
3317 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3318 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3319 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
3320 self.logger.info(self.vapi.cli("show nat timeouts"))
3322 self.vapi.cli("show nat addr-port-assignment-alg"))
3324 self.vapi.cli("clear logging")
3327 class TestNAT44EndpointDependent(MethodHolder):
3328 """ Endpoint-Dependent mapping and filtering test cases """
3331 def setUpConstants(cls):
3332 super(TestNAT44EndpointDependent, cls).setUpConstants()
3333 cls.vpp_cmdline.extend(["nat", "{", "endpoint-dependent", "}"])
3336 def setUpClass(cls):
3337 super(TestNAT44EndpointDependent, cls).setUpClass()
3338 cls.vapi.cli("set log class nat level debug")
3340 cls.tcp_port_in = 6303
3341 cls.tcp_port_out = 6303
3342 cls.udp_port_in = 6304
3343 cls.udp_port_out = 6304
3344 cls.icmp_id_in = 6305
3345 cls.icmp_id_out = 6305
3346 cls.nat_addr = '10.0.0.3'
3347 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3348 cls.ipfix_src_port = 4739
3349 cls.ipfix_domain_id = 1
3350 cls.tcp_external_port = 80
3352 cls.create_pg_interfaces(range(7))
3353 cls.interfaces = list(cls.pg_interfaces[0:3])
3355 for i in cls.interfaces:
3360 cls.pg0.generate_remote_hosts(3)
3361 cls.pg0.configure_ipv4_neighbors()
3365 cls.pg4.generate_remote_hosts(2)
3366 cls.pg4.config_ip4()
3367 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
3368 cls.vapi.sw_interface_add_del_address(cls.pg4.sw_if_index,
3372 cls.pg4.resolve_arp()
3373 cls.pg4._remote_hosts[1]._ip4 = cls.pg4._remote_hosts[0]._ip4
3374 cls.pg4.resolve_arp()
3376 zero_ip4n = socket.inet_pton(socket.AF_INET, "0.0.0.0")
3377 cls.vapi.ip_table_add_del(1, is_add=1)
3379 cls.pg5._local_ip4 = "10.1.1.1"
3380 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET,
3382 cls.pg5._remote_hosts[0]._ip4 = "10.1.1.2"
3383 cls.pg5._remote_hosts[0]._ip4n = socket.inet_pton(
3384 socket.AF_INET, cls.pg5.remote_ip4)
3385 cls.pg5.set_table_ip4(1)
3386 cls.pg5.config_ip4()
3388 cls.vapi.ip_add_del_route(dst_address=cls.pg5.remote_ip4n,
3389 dst_address_length=32,
3391 next_hop_sw_if_index=cls.pg5.sw_if_index,
3392 next_hop_address=zero_ip4n)
3394 cls.pg6._local_ip4 = "10.1.2.1"
3395 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET,
3397 cls.pg6._remote_hosts[0]._ip4 = "10.1.2.2"
3398 cls.pg6._remote_hosts[0]._ip4n = socket.inet_pton(
3399 socket.AF_INET, cls.pg6.remote_ip4)
3400 cls.pg6.set_table_ip4(1)
3401 cls.pg6.config_ip4()
3403 cls.vapi.ip_add_del_route(dst_address=cls.pg6.remote_ip4n,
3404 dst_address_length=32,
3406 next_hop_sw_if_index=cls.pg6.sw_if_index,
3407 next_hop_address=zero_ip4n)
3409 cls.vapi.ip_add_del_route(dst_address=cls.pg6.remote_ip4n,
3410 dst_address_length=16,
3411 next_hop_address=zero_ip4n,
3413 next_hop_table_id=1)
3414 cls.vapi.ip_add_del_route(dst_address=zero_ip4n,
3415 dst_address_length=0,
3416 next_hop_address=zero_ip4n,
3418 next_hop_table_id=0)
3419 cls.vapi.ip_add_del_route(dst_address=zero_ip4n,
3420 dst_address_length=0,
3422 next_hop_sw_if_index=cls.pg1.sw_if_index,
3423 next_hop_address=cls.pg1.local_ip4n)
3425 cls.pg5.resolve_arp()
3426 cls.pg6.resolve_arp()
3429 super(TestNAT44EndpointDependent, cls).tearDownClass()
3432 def test_dynamic(self):
3433 """ NAT44 dynamic translation test """
3435 self.nat44_add_address(self.nat_addr)
3436 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3437 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3440 nat_config = self.vapi.nat_show_config()
3441 self.assertEqual(1, nat_config.endpoint_dependent)
3444 pkts = self.create_stream_in(self.pg0, self.pg1)
3445 self.pg0.add_stream(pkts)
3446 self.pg_enable_capture(self.pg_interfaces)
3448 capture = self.pg1.get_capture(len(pkts))
3449 self.verify_capture_out(capture)
3452 pkts = self.create_stream_out(self.pg1)
3453 self.pg1.add_stream(pkts)
3454 self.pg_enable_capture(self.pg_interfaces)
3456 capture = self.pg0.get_capture(len(pkts))
3457 self.verify_capture_in(capture, self.pg0)
3459 def test_forwarding(self):
3460 """ NAT44 forwarding test """
3462 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3463 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3465 self.vapi.nat44_forwarding_enable_disable(1)
3467 real_ip = self.pg0.remote_ip4n
3468 alias_ip = self.nat_addr_n
3469 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
3470 external_ip=alias_ip)
3473 # in2out - static mapping match
3475 pkts = self.create_stream_out(self.pg1)
3476 self.pg1.add_stream(pkts)
3477 self.pg_enable_capture(self.pg_interfaces)
3479 capture = self.pg0.get_capture(len(pkts))
3480 self.verify_capture_in(capture, self.pg0)
3482 pkts = self.create_stream_in(self.pg0, self.pg1)
3483 self.pg0.add_stream(pkts)
3484 self.pg_enable_capture(self.pg_interfaces)
3486 capture = self.pg1.get_capture(len(pkts))
3487 self.verify_capture_out(capture, same_port=True)
3489 # in2out - no static mapping match
3491 host0 = self.pg0.remote_hosts[0]
3492 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
3494 pkts = self.create_stream_out(self.pg1,
3495 dst_ip=self.pg0.remote_ip4,
3496 use_inside_ports=True)
3497 self.pg1.add_stream(pkts)
3498 self.pg_enable_capture(self.pg_interfaces)
3500 capture = self.pg0.get_capture(len(pkts))
3501 self.verify_capture_in(capture, self.pg0)
3503 pkts = self.create_stream_in(self.pg0, self.pg1)
3504 self.pg0.add_stream(pkts)
3505 self.pg_enable_capture(self.pg_interfaces)
3507 capture = self.pg1.get_capture(len(pkts))
3508 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3511 self.pg0.remote_hosts[0] = host0
3513 user = self.pg0.remote_hosts[1]
3514 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
3515 self.assertEqual(len(sessions), 3)
3516 self.assertTrue(sessions[0].ext_host_valid)
3517 self.vapi.nat44_del_session(
3518 sessions[0].inside_ip_address,
3519 sessions[0].inside_port,
3520 sessions[0].protocol,
3521 ext_host_address=sessions[0].ext_host_address,
3522 ext_host_port=sessions[0].ext_host_port)
3523 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
3524 self.assertEqual(len(sessions), 2)
3527 self.vapi.nat44_forwarding_enable_disable(0)
3528 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
3529 external_ip=alias_ip,
3532 def test_static_lb(self):
3533 """ NAT44 local service load balancing """
3534 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3537 server1 = self.pg0.remote_hosts[0]
3538 server2 = self.pg0.remote_hosts[1]
3540 locals = [{'addr': server1.ip4n,
3544 {'addr': server2.ip4n,
3549 self.nat44_add_address(self.nat_addr)
3550 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3553 local_num=len(locals),
3555 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3556 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3559 # from client to service
3560 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3561 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3562 TCP(sport=12345, dport=external_port))
3563 self.pg1.add_stream(p)
3564 self.pg_enable_capture(self.pg_interfaces)
3566 capture = self.pg0.get_capture(1)
3572 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3573 if ip.dst == server1.ip4:
3577 self.assertEqual(tcp.dport, local_port)
3578 self.assert_packet_checksums_valid(p)
3580 self.logger.error(ppp("Unexpected or invalid packet:", p))
3583 # from service back to client
3584 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3585 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
3586 TCP(sport=local_port, dport=12345))
3587 self.pg0.add_stream(p)
3588 self.pg_enable_capture(self.pg_interfaces)
3590 capture = self.pg1.get_capture(1)
3595 self.assertEqual(ip.src, self.nat_addr)
3596 self.assertEqual(tcp.sport, external_port)
3597 self.assert_packet_checksums_valid(p)
3599 self.logger.error(ppp("Unexpected or invalid packet:", p))
3602 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3603 self.assertEqual(len(sessions), 1)
3604 self.assertTrue(sessions[0].ext_host_valid)
3605 self.vapi.nat44_del_session(
3606 sessions[0].inside_ip_address,
3607 sessions[0].inside_port,
3608 sessions[0].protocol,
3609 ext_host_address=sessions[0].ext_host_address,
3610 ext_host_port=sessions[0].ext_host_port)
3611 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3612 self.assertEqual(len(sessions), 0)
3614 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3615 def test_static_lb_multi_clients(self):
3616 """ NAT44 local service load balancing - multiple clients"""
3618 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3621 server1 = self.pg0.remote_hosts[0]
3622 server2 = self.pg0.remote_hosts[1]
3624 locals = [{'addr': server1.ip4n,
3628 {'addr': server2.ip4n,
3633 self.nat44_add_address(self.nat_addr)
3634 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3637 local_num=len(locals),
3639 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3640 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3645 clients = ip4_range(self.pg1.remote_ip4, 10, 50)
3647 for client in clients:
3648 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3649 IP(src=client, dst=self.nat_addr) /
3650 TCP(sport=12345, dport=external_port))
3652 self.pg1.add_stream(pkts)
3653 self.pg_enable_capture(self.pg_interfaces)
3655 capture = self.pg0.get_capture(len(pkts))
3657 if p[IP].dst == server1.ip4:
3661 self.assertTrue(server1_n > server2_n)
3663 def test_static_lb_2(self):
3664 """ NAT44 local service load balancing (asymmetrical rule) """
3665 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3668 server1 = self.pg0.remote_hosts[0]
3669 server2 = self.pg0.remote_hosts[1]
3671 locals = [{'addr': server1.ip4n,
3675 {'addr': server2.ip4n,
3680 self.vapi.nat44_forwarding_enable_disable(1)
3681 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3685 local_num=len(locals),
3687 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3688 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3691 # from client to service
3692 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3693 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3694 TCP(sport=12345, dport=external_port))
3695 self.pg1.add_stream(p)
3696 self.pg_enable_capture(self.pg_interfaces)
3698 capture = self.pg0.get_capture(1)
3704 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3705 if ip.dst == server1.ip4:
3709 self.assertEqual(tcp.dport, local_port)
3710 self.assert_packet_checksums_valid(p)
3712 self.logger.error(ppp("Unexpected or invalid packet:", p))
3715 # from service back to client
3716 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3717 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
3718 TCP(sport=local_port, dport=12345))
3719 self.pg0.add_stream(p)
3720 self.pg_enable_capture(self.pg_interfaces)
3722 capture = self.pg1.get_capture(1)
3727 self.assertEqual(ip.src, self.nat_addr)
3728 self.assertEqual(tcp.sport, external_port)
3729 self.assert_packet_checksums_valid(p)
3731 self.logger.error(ppp("Unexpected or invalid packet:", p))
3734 # from client to server (no translation)
3735 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3736 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
3737 TCP(sport=12346, dport=local_port))
3738 self.pg1.add_stream(p)
3739 self.pg_enable_capture(self.pg_interfaces)
3741 capture = self.pg0.get_capture(1)
3747 self.assertEqual(ip.dst, server1.ip4)
3748 self.assertEqual(tcp.dport, local_port)
3749 self.assert_packet_checksums_valid(p)
3751 self.logger.error(ppp("Unexpected or invalid packet:", p))
3754 # from service back to client (no translation)
3755 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
3756 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
3757 TCP(sport=local_port, dport=12346))
3758 self.pg0.add_stream(p)
3759 self.pg_enable_capture(self.pg_interfaces)
3761 capture = self.pg1.get_capture(1)
3766 self.assertEqual(ip.src, server1.ip4)
3767 self.assertEqual(tcp.sport, local_port)
3768 self.assert_packet_checksums_valid(p)
3770 self.logger.error(ppp("Unexpected or invalid packet:", p))
3773 def test_lb_affinity(self):
3774 """ NAT44 local service load balancing affinity """
3775 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3778 server1 = self.pg0.remote_hosts[0]
3779 server2 = self.pg0.remote_hosts[1]
3781 locals = [{'addr': server1.ip4n,
3785 {'addr': server2.ip4n,
3790 self.nat44_add_address(self.nat_addr)
3791 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3795 local_num=len(locals),
3797 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3798 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3801 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3802 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3803 TCP(sport=1025, dport=external_port))
3804 self.pg1.add_stream(p)
3805 self.pg_enable_capture(self.pg_interfaces)
3807 capture = self.pg0.get_capture(1)
3808 backend = capture[0][IP].dst
3810 sessions = self.vapi.nat44_user_session_dump(
3811 socket.inet_pton(socket.AF_INET, backend), 0)
3812 self.assertEqual(len(sessions), 1)
3813 self.assertTrue(sessions[0].ext_host_valid)
3814 self.vapi.nat44_del_session(
3815 sessions[0].inside_ip_address,
3816 sessions[0].inside_port,
3817 sessions[0].protocol,
3818 ext_host_address=sessions[0].ext_host_address,
3819 ext_host_port=sessions[0].ext_host_port)
3822 for port in range(1030, 1100):
3823 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3824 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3825 TCP(sport=port, dport=external_port))
3827 self.pg1.add_stream(pkts)
3828 self.pg_enable_capture(self.pg_interfaces)
3830 capture = self.pg0.get_capture(len(pkts))
3832 self.assertEqual(p[IP].dst, backend)
3834 def test_unknown_proto(self):
3835 """ NAT44 translate packet with unknown protocol """
3836 self.nat44_add_address(self.nat_addr)
3837 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3838 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3842 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3843 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3844 TCP(sport=self.tcp_port_in, dport=20))
3845 self.pg0.add_stream(p)
3846 self.pg_enable_capture(self.pg_interfaces)
3848 p = self.pg1.get_capture(1)
3850 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3851 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3853 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3854 TCP(sport=1234, dport=1234))
3855 self.pg0.add_stream(p)
3856 self.pg_enable_capture(self.pg_interfaces)
3858 p = self.pg1.get_capture(1)
3861 self.assertEqual(packet[IP].src, self.nat_addr)
3862 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3863 self.assertTrue(packet.haslayer(GRE))
3864 self.assert_packet_checksums_valid(packet)
3866 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3870 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3871 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3873 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3874 TCP(sport=1234, dport=1234))
3875 self.pg1.add_stream(p)
3876 self.pg_enable_capture(self.pg_interfaces)
3878 p = self.pg0.get_capture(1)
3881 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3882 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3883 self.assertTrue(packet.haslayer(GRE))
3884 self.assert_packet_checksums_valid(packet)
3886 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3889 def test_hairpinning_unknown_proto(self):
3890 """ NAT44 translate packet with unknown protocol - hairpinning """
3891 host = self.pg0.remote_hosts[0]
3892 server = self.pg0.remote_hosts[1]
3894 server_out_port = 8765
3895 server_nat_ip = "10.0.0.11"
3897 self.nat44_add_address(self.nat_addr)
3898 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3899 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3902 # add static mapping for server
3903 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3906 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3907 IP(src=host.ip4, dst=server_nat_ip) /
3908 TCP(sport=host_in_port, dport=server_out_port))
3909 self.pg0.add_stream(p)
3910 self.pg_enable_capture(self.pg_interfaces)
3912 self.pg0.get_capture(1)
3914 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3915 IP(src=host.ip4, dst=server_nat_ip) /
3917 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3918 TCP(sport=1234, dport=1234))
3919 self.pg0.add_stream(p)
3920 self.pg_enable_capture(self.pg_interfaces)
3922 p = self.pg0.get_capture(1)
3925 self.assertEqual(packet[IP].src, self.nat_addr)
3926 self.assertEqual(packet[IP].dst, server.ip4)
3927 self.assertTrue(packet.haslayer(GRE))
3928 self.assert_packet_checksums_valid(packet)
3930 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3934 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3935 IP(src=server.ip4, dst=self.nat_addr) /
3937 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3938 TCP(sport=1234, dport=1234))
3939 self.pg0.add_stream(p)
3940 self.pg_enable_capture(self.pg_interfaces)
3942 p = self.pg0.get_capture(1)
3945 self.assertEqual(packet[IP].src, server_nat_ip)
3946 self.assertEqual(packet[IP].dst, host.ip4)
3947 self.assertTrue(packet.haslayer(GRE))
3948 self.assert_packet_checksums_valid(packet)
3950 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3953 def test_output_feature_and_service(self):
3954 """ NAT44 interface output feature and services """
3955 external_addr = '1.2.3.4'
3959 self.vapi.nat44_forwarding_enable_disable(1)
3960 self.nat44_add_address(self.nat_addr)
3961 self.vapi.nat44_add_del_identity_mapping(ip=self.pg1.remote_ip4n)
3962 self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
3963 local_port, external_port,
3964 proto=IP_PROTOS.tcp, out2in_only=1)
3965 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3966 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3968 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3971 # from client to service
3972 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3973 IP(src=self.pg1.remote_ip4, dst=external_addr) /
3974 TCP(sport=12345, dport=external_port))
3975 self.pg1.add_stream(p)
3976 self.pg_enable_capture(self.pg_interfaces)
3978 capture = self.pg0.get_capture(1)
3983 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3984 self.assertEqual(tcp.dport, local_port)
3985 self.assert_packet_checksums_valid(p)
3987 self.logger.error(ppp("Unexpected or invalid packet:", p))
3990 # from service back to client
3991 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3992 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3993 TCP(sport=local_port, dport=12345))
3994 self.pg0.add_stream(p)
3995 self.pg_enable_capture(self.pg_interfaces)
3997 capture = self.pg1.get_capture(1)
4002 self.assertEqual(ip.src, external_addr)
4003 self.assertEqual(tcp.sport, external_port)
4004 self.assert_packet_checksums_valid(p)
4006 self.logger.error(ppp("Unexpected or invalid packet:", p))
4009 # from local network host to external network
4010 pkts = self.create_stream_in(self.pg0, self.pg1)
4011 self.pg0.add_stream(pkts)
4012 self.pg_enable_capture(self.pg_interfaces)
4014 capture = self.pg1.get_capture(len(pkts))
4015 self.verify_capture_out(capture)
4016 pkts = self.create_stream_in(self.pg0, self.pg1)
4017 self.pg0.add_stream(pkts)
4018 self.pg_enable_capture(self.pg_interfaces)
4020 capture = self.pg1.get_capture(len(pkts))
4021 self.verify_capture_out(capture)
4023 # from external network back to local network host
4024 pkts = self.create_stream_out(self.pg1)
4025 self.pg1.add_stream(pkts)
4026 self.pg_enable_capture(self.pg_interfaces)
4028 capture = self.pg0.get_capture(len(pkts))
4029 self.verify_capture_in(capture, self.pg0)
4031 def test_output_feature_and_service2(self):
4032 """ NAT44 interface output feature and service host direct access """
4033 self.vapi.nat44_forwarding_enable_disable(1)
4034 self.nat44_add_address(self.nat_addr)
4035 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4038 # session initiaded from service host - translate
4039 pkts = self.create_stream_in(self.pg0, self.pg1)
4040 self.pg0.add_stream(pkts)
4041 self.pg_enable_capture(self.pg_interfaces)
4043 capture = self.pg1.get_capture(len(pkts))
4044 self.verify_capture_out(capture)
4046 pkts = self.create_stream_out(self.pg1)
4047 self.pg1.add_stream(pkts)
4048 self.pg_enable_capture(self.pg_interfaces)
4050 capture = self.pg0.get_capture(len(pkts))
4051 self.verify_capture_in(capture, self.pg0)
4053 # session initiaded from remote host - do not translate
4054 self.tcp_port_in = 60303
4055 self.udp_port_in = 60304
4056 self.icmp_id_in = 60305
4057 pkts = self.create_stream_out(self.pg1,
4058 self.pg0.remote_ip4,
4059 use_inside_ports=True)
4060 self.pg1.add_stream(pkts)
4061 self.pg_enable_capture(self.pg_interfaces)
4063 capture = self.pg0.get_capture(len(pkts))
4064 self.verify_capture_in(capture, self.pg0)
4066 pkts = self.create_stream_in(self.pg0, self.pg1)
4067 self.pg0.add_stream(pkts)
4068 self.pg_enable_capture(self.pg_interfaces)
4070 capture = self.pg1.get_capture(len(pkts))
4071 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
4074 def test_output_feature_and_service3(self):
4075 """ NAT44 interface output feature and DST NAT """
4076 external_addr = '1.2.3.4'
4080 self.vapi.nat44_forwarding_enable_disable(1)
4081 self.nat44_add_address(self.nat_addr)
4082 self.nat44_add_static_mapping(self.pg1.remote_ip4, external_addr,
4083 local_port, external_port,
4084 proto=IP_PROTOS.tcp, out2in_only=1)
4085 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4086 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4088 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4091 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4092 IP(src=self.pg0.remote_ip4, dst=external_addr) /
4093 TCP(sport=12345, dport=external_port))
4094 self.pg0.add_stream(p)
4095 self.pg_enable_capture(self.pg_interfaces)
4097 capture = self.pg1.get_capture(1)
4102 self.assertEqual(ip.src, self.pg0.remote_ip4)
4103 self.assertEqual(tcp.sport, 12345)
4104 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4105 self.assertEqual(tcp.dport, local_port)
4106 self.assert_packet_checksums_valid(p)
4108 self.logger.error(ppp("Unexpected or invalid packet:", p))
4111 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4112 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
4113 TCP(sport=local_port, dport=12345))
4114 self.pg1.add_stream(p)
4115 self.pg_enable_capture(self.pg_interfaces)
4117 capture = self.pg0.get_capture(1)
4122 self.assertEqual(ip.src, external_addr)
4123 self.assertEqual(tcp.sport, external_port)
4124 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4125 self.assertEqual(tcp.dport, 12345)
4126 self.assert_packet_checksums_valid(p)
4128 self.logger.error(ppp("Unexpected or invalid packet:", p))
4131 def test_next_src_nat(self):
4132 """ On way back forward packet to nat44-in2out node. """
4133 twice_nat_addr = '10.0.1.3'
4136 post_twice_nat_port = 0
4138 self.vapi.nat44_forwarding_enable_disable(1)
4139 self.nat44_add_address(twice_nat_addr, twice_nat=1)
4140 self.nat44_add_static_mapping(self.pg6.remote_ip4, self.pg1.remote_ip4,
4141 local_port, external_port,
4142 proto=IP_PROTOS.tcp, out2in_only=1,
4143 self_twice_nat=1, vrf_id=1)
4144 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index,
4147 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4148 IP(src=self.pg6.remote_ip4, dst=self.pg1.remote_ip4) /
4149 TCP(sport=12345, dport=external_port))
4150 self.pg6.add_stream(p)
4151 self.pg_enable_capture(self.pg_interfaces)
4153 capture = self.pg6.get_capture(1)
4158 self.assertEqual(ip.src, twice_nat_addr)
4159 self.assertNotEqual(tcp.sport, 12345)
4160 post_twice_nat_port = tcp.sport
4161 self.assertEqual(ip.dst, self.pg6.remote_ip4)
4162 self.assertEqual(tcp.dport, local_port)
4163 self.assert_packet_checksums_valid(p)
4165 self.logger.error(ppp("Unexpected or invalid packet:", p))
4168 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4169 IP(src=self.pg6.remote_ip4, dst=twice_nat_addr) /
4170 TCP(sport=local_port, dport=post_twice_nat_port))
4171 self.pg6.add_stream(p)
4172 self.pg_enable_capture(self.pg_interfaces)
4174 capture = self.pg6.get_capture(1)
4179 self.assertEqual(ip.src, self.pg1.remote_ip4)
4180 self.assertEqual(tcp.sport, external_port)
4181 self.assertEqual(ip.dst, self.pg6.remote_ip4)
4182 self.assertEqual(tcp.dport, 12345)
4183 self.assert_packet_checksums_valid(p)
4185 self.logger.error(ppp("Unexpected or invalid packet:", p))
4188 def twice_nat_common(self, self_twice_nat=False, same_pg=False, lb=False,
4190 twice_nat_addr = '10.0.1.3'
4198 port_in1 = port_in+1
4199 port_in2 = port_in+2
4204 server1 = self.pg0.remote_hosts[0]
4205 server2 = self.pg0.remote_hosts[1]
4217 eh_translate = ((not self_twice_nat) or (not lb and same_pg) or
4220 self.nat44_add_address(self.nat_addr)
4221 self.nat44_add_address(twice_nat_addr, twice_nat=1)
4223 self.nat44_add_static_mapping(pg0.remote_ip4, self.nat_addr,
4225 proto=IP_PROTOS.tcp,
4226 twice_nat=int(not self_twice_nat),
4227 self_twice_nat=int(self_twice_nat))
4229 locals = [{'addr': server1.ip4n,
4233 {'addr': server2.ip4n,
4237 out_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
4238 self.vapi.nat44_add_del_lb_static_mapping(out_addr_n,
4242 not self_twice_nat),
4245 local_num=len(locals),
4247 self.vapi.nat44_interface_add_del_feature(pg0.sw_if_index)
4248 self.vapi.nat44_interface_add_del_feature(pg1.sw_if_index,
4255 assert client_id is not None
4257 client = self.pg0.remote_hosts[0]
4258 elif client_id == 2:
4259 client = self.pg0.remote_hosts[1]
4261 client = pg1.remote_hosts[0]
4262 p = (Ether(src=pg1.remote_mac, dst=pg1.local_mac) /
4263 IP(src=client.ip4, dst=self.nat_addr) /
4264 TCP(sport=eh_port_out, dport=port_out))
4266 self.pg_enable_capture(self.pg_interfaces)
4268 capture = pg0.get_capture(1)
4274 if ip.dst == server1.ip4:
4280 self.assertEqual(ip.dst, server.ip4)
4282 self.assertIn(tcp.dport, [port_in1, port_in2])
4284 self.assertEqual(tcp.dport, port_in)
4286 self.assertEqual(ip.src, twice_nat_addr)
4287 self.assertNotEqual(tcp.sport, eh_port_out)
4289 self.assertEqual(ip.src, client.ip4)
4290 self.assertEqual(tcp.sport, eh_port_out)
4292 eh_port_in = tcp.sport
4293 saved_port_in = tcp.dport
4294 self.assert_packet_checksums_valid(p)
4296 self.logger.error(ppp("Unexpected or invalid packet:", p))
4299 p = (Ether(src=server.mac, dst=pg0.local_mac) /
4300 IP(src=server.ip4, dst=eh_addr_in) /
4301 TCP(sport=saved_port_in, dport=eh_port_in))
4303 self.pg_enable_capture(self.pg_interfaces)
4305 capture = pg1.get_capture(1)
4310 self.assertEqual(ip.dst, client.ip4)
4311 self.assertEqual(ip.src, self.nat_addr)
4312 self.assertEqual(tcp.dport, eh_port_out)
4313 self.assertEqual(tcp.sport, port_out)
4314 self.assert_packet_checksums_valid(p)
4316 self.logger.error(ppp("Unexpected or invalid packet:", p))
4320 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
4321 self.assertEqual(len(sessions), 1)
4322 self.assertTrue(sessions[0].ext_host_valid)
4323 self.assertTrue(sessions[0].is_twicenat)
4324 self.vapi.nat44_del_session(
4325 sessions[0].inside_ip_address,
4326 sessions[0].inside_port,
4327 sessions[0].protocol,
4328 ext_host_address=sessions[0].ext_host_nat_address,
4329 ext_host_port=sessions[0].ext_host_nat_port)
4330 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
4331 self.assertEqual(len(sessions), 0)
4333 def test_twice_nat(self):
4335 self.twice_nat_common()
4337 def test_self_twice_nat_positive(self):
4338 """ Self Twice NAT44 (positive test) """
4339 self.twice_nat_common(self_twice_nat=True, same_pg=True)
4341 def test_self_twice_nat_negative(self):
4342 """ Self Twice NAT44 (negative test) """
4343 self.twice_nat_common(self_twice_nat=True)
4345 def test_twice_nat_lb(self):
4346 """ Twice NAT44 local service load balancing """
4347 self.twice_nat_common(lb=True)
4349 def test_self_twice_nat_lb_positive(self):
4350 """ Self Twice NAT44 local service load balancing (positive test) """
4351 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4354 def test_self_twice_nat_lb_negative(self):
4355 """ Self Twice NAT44 local service load balancing (negative test) """
4356 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4359 def test_twice_nat_interface_addr(self):
4360 """ Acquire twice NAT44 addresses from interface """
4361 self.vapi.nat44_add_interface_addr(self.pg3.sw_if_index, twice_nat=1)
4363 # no address in NAT pool
4364 adresses = self.vapi.nat44_address_dump()
4365 self.assertEqual(0, len(adresses))
4367 # configure interface address and check NAT address pool
4368 self.pg3.config_ip4()
4369 adresses = self.vapi.nat44_address_dump()
4370 self.assertEqual(1, len(adresses))
4371 self.assertEqual(adresses[0].ip_address[0:4], self.pg3.local_ip4n)
4372 self.assertEqual(adresses[0].twice_nat, 1)
4374 # remove interface address and check NAT address pool
4375 self.pg3.unconfig_ip4()
4376 adresses = self.vapi.nat44_address_dump()
4377 self.assertEqual(0, len(adresses))
4379 def test_tcp_session_close_in(self):
4380 """ Close TCP session from inside network """
4381 self.tcp_port_out = 10505
4382 self.nat44_add_address(self.nat_addr)
4383 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4387 proto=IP_PROTOS.tcp,
4389 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4390 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4393 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4394 start_sessnum = len(sessions)
4396 self.initiate_tcp_session(self.pg0, self.pg1)
4398 # FIN packet in -> out
4399 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4400 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4401 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4402 flags="FA", seq=100, ack=300))
4403 self.pg0.add_stream(p)
4404 self.pg_enable_capture(self.pg_interfaces)
4406 self.pg1.get_capture(1)
4410 # ACK packet out -> in
4411 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4412 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4413 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4414 flags="A", seq=300, ack=101))
4417 # FIN packet out -> in
4418 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4419 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4420 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4421 flags="FA", seq=300, ack=101))
4424 self.pg1.add_stream(pkts)
4425 self.pg_enable_capture(self.pg_interfaces)
4427 self.pg0.get_capture(2)
4429 # ACK packet in -> out
4430 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4431 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4432 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4433 flags="A", seq=101, ack=301))
4434 self.pg0.add_stream(p)
4435 self.pg_enable_capture(self.pg_interfaces)
4437 self.pg1.get_capture(1)
4439 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4441 self.assertEqual(len(sessions) - start_sessnum, 0)
4443 def test_tcp_session_close_out(self):
4444 """ Close TCP session from outside network """
4445 self.tcp_port_out = 10505
4446 self.nat44_add_address(self.nat_addr)
4447 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4451 proto=IP_PROTOS.tcp,
4453 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4454 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4457 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4458 start_sessnum = len(sessions)
4460 self.initiate_tcp_session(self.pg0, self.pg1)
4462 # FIN packet out -> in
4463 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4464 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4465 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4466 flags="FA", seq=100, ack=300))
4467 self.pg1.add_stream(p)
4468 self.pg_enable_capture(self.pg_interfaces)
4470 self.pg0.get_capture(1)
4472 # FIN+ACK packet in -> out
4473 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4474 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4475 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4476 flags="FA", seq=300, ack=101))
4478 self.pg0.add_stream(p)
4479 self.pg_enable_capture(self.pg_interfaces)
4481 self.pg1.get_capture(1)
4483 # ACK packet out -> in
4484 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4485 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4486 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4487 flags="A", seq=101, ack=301))
4488 self.pg1.add_stream(p)
4489 self.pg_enable_capture(self.pg_interfaces)
4491 self.pg0.get_capture(1)
4493 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4495 self.assertEqual(len(sessions) - start_sessnum, 0)
4497 def test_tcp_session_close_simultaneous(self):
4498 """ Close TCP session from inside network """
4499 self.tcp_port_out = 10505
4500 self.nat44_add_address(self.nat_addr)
4501 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4505 proto=IP_PROTOS.tcp,
4507 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4508 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4511 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4512 start_sessnum = len(sessions)
4514 self.initiate_tcp_session(self.pg0, self.pg1)
4516 # FIN packet in -> out
4517 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4518 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4519 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4520 flags="FA", seq=100, ack=300))
4521 self.pg0.add_stream(p)
4522 self.pg_enable_capture(self.pg_interfaces)
4524 self.pg1.get_capture(1)
4526 # FIN packet out -> in
4527 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4528 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4529 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4530 flags="FA", seq=300, ack=100))
4531 self.pg1.add_stream(p)
4532 self.pg_enable_capture(self.pg_interfaces)
4534 self.pg0.get_capture(1)
4536 # ACK packet in -> out
4537 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4538 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4539 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4540 flags="A", seq=101, ack=301))
4541 self.pg0.add_stream(p)
4542 self.pg_enable_capture(self.pg_interfaces)
4544 self.pg1.get_capture(1)
4546 # ACK packet out -> in
4547 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4548 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4549 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4550 flags="A", seq=301, ack=101))
4551 self.pg1.add_stream(p)
4552 self.pg_enable_capture(self.pg_interfaces)
4554 self.pg0.get_capture(1)
4556 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4558 self.assertEqual(len(sessions) - start_sessnum, 0)
4560 def test_one_armed_nat44_static(self):
4561 """ One armed NAT44 and 1:1 NAPT asymmetrical rule """
4562 remote_host = self.pg4.remote_hosts[0]
4563 local_host = self.pg4.remote_hosts[1]
4568 self.vapi.nat44_forwarding_enable_disable(1)
4569 self.nat44_add_address(self.nat_addr, twice_nat=1)
4570 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
4571 local_port, external_port,
4572 proto=IP_PROTOS.tcp, out2in_only=1,
4574 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
4575 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index,
4578 # from client to service
4579 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
4580 IP(src=remote_host.ip4, dst=self.nat_addr) /
4581 TCP(sport=12345, dport=external_port))
4582 self.pg4.add_stream(p)
4583 self.pg_enable_capture(self.pg_interfaces)
4585 capture = self.pg4.get_capture(1)
4590 self.assertEqual(ip.dst, local_host.ip4)
4591 self.assertEqual(ip.src, self.nat_addr)
4592 self.assertEqual(tcp.dport, local_port)
4593 self.assertNotEqual(tcp.sport, 12345)
4594 eh_port_in = tcp.sport
4595 self.assert_packet_checksums_valid(p)
4597 self.logger.error(ppp("Unexpected or invalid packet:", p))
4600 # from service back to client
4601 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
4602 IP(src=local_host.ip4, dst=self.nat_addr) /
4603 TCP(sport=local_port, dport=eh_port_in))
4604 self.pg4.add_stream(p)
4605 self.pg_enable_capture(self.pg_interfaces)
4607 capture = self.pg4.get_capture(1)
4612 self.assertEqual(ip.src, self.nat_addr)
4613 self.assertEqual(ip.dst, remote_host.ip4)
4614 self.assertEqual(tcp.sport, external_port)
4615 self.assertEqual(tcp.dport, 12345)
4616 self.assert_packet_checksums_valid(p)
4618 self.logger.error(ppp("Unexpected or invalid packet:", p))
4621 def test_static_with_port_out2(self):
4622 """ 1:1 NAPT asymmetrical rule """
4627 self.vapi.nat44_forwarding_enable_disable(1)
4628 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
4629 local_port, external_port,
4630 proto=IP_PROTOS.tcp, out2in_only=1)
4631 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4632 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4635 # from client to service
4636 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4637 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4638 TCP(sport=12345, dport=external_port))
4639 self.pg1.add_stream(p)
4640 self.pg_enable_capture(self.pg_interfaces)
4642 capture = self.pg0.get_capture(1)
4647 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4648 self.assertEqual(tcp.dport, local_port)
4649 self.assert_packet_checksums_valid(p)
4651 self.logger.error(ppp("Unexpected or invalid packet:", p))
4655 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4656 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4657 ICMP(type=11) / capture[0][IP])
4658 self.pg0.add_stream(p)
4659 self.pg_enable_capture(self.pg_interfaces)
4661 capture = self.pg1.get_capture(1)
4664 self.assertEqual(p[IP].src, self.nat_addr)
4666 self.assertEqual(inner.dst, self.nat_addr)
4667 self.assertEqual(inner[TCPerror].dport, external_port)
4669 self.logger.error(ppp("Unexpected or invalid packet:", p))
4672 # from service back to client
4673 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4674 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4675 TCP(sport=local_port, dport=12345))
4676 self.pg0.add_stream(p)
4677 self.pg_enable_capture(self.pg_interfaces)
4679 capture = self.pg1.get_capture(1)
4684 self.assertEqual(ip.src, self.nat_addr)
4685 self.assertEqual(tcp.sport, external_port)
4686 self.assert_packet_checksums_valid(p)
4688 self.logger.error(ppp("Unexpected or invalid packet:", p))
4692 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4693 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4694 ICMP(type=11) / capture[0][IP])
4695 self.pg1.add_stream(p)
4696 self.pg_enable_capture(self.pg_interfaces)
4698 capture = self.pg0.get_capture(1)
4701 self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
4703 self.assertEqual(inner.src, self.pg0.remote_ip4)
4704 self.assertEqual(inner[TCPerror].sport, local_port)
4706 self.logger.error(ppp("Unexpected or invalid packet:", p))
4709 # from client to server (no translation)
4710 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4711 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
4712 TCP(sport=12346, dport=local_port))
4713 self.pg1.add_stream(p)
4714 self.pg_enable_capture(self.pg_interfaces)
4716 capture = self.pg0.get_capture(1)
4721 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4722 self.assertEqual(tcp.dport, local_port)
4723 self.assert_packet_checksums_valid(p)
4725 self.logger.error(ppp("Unexpected or invalid packet:", p))
4728 # from service back to client (no translation)
4729 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4730 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4731 TCP(sport=local_port, dport=12346))
4732 self.pg0.add_stream(p)
4733 self.pg_enable_capture(self.pg_interfaces)
4735 capture = self.pg1.get_capture(1)
4740 self.assertEqual(ip.src, self.pg0.remote_ip4)
4741 self.assertEqual(tcp.sport, local_port)
4742 self.assert_packet_checksums_valid(p)
4744 self.logger.error(ppp("Unexpected or invalid packet:", p))
4747 def test_output_feature(self):
4748 """ NAT44 interface output feature (in2out postrouting) """
4749 self.vapi.nat44_forwarding_enable_disable(1)
4750 self.nat44_add_address(self.nat_addr)
4751 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4753 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4757 pkts = self.create_stream_in(self.pg0, self.pg1)
4758 self.pg0.add_stream(pkts)
4759 self.pg_enable_capture(self.pg_interfaces)
4761 capture = self.pg1.get_capture(len(pkts))
4762 self.verify_capture_out(capture)
4765 pkts = self.create_stream_out(self.pg1)
4766 self.pg1.add_stream(pkts)
4767 self.pg_enable_capture(self.pg_interfaces)
4769 capture = self.pg0.get_capture(len(pkts))
4770 self.verify_capture_in(capture, self.pg0)
4772 def test_multiple_vrf(self):
4773 """ Multiple VRF setup """
4774 external_addr = '1.2.3.4'
4779 self.vapi.nat44_forwarding_enable_disable(1)
4780 self.nat44_add_address(self.nat_addr)
4781 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4782 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4784 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4786 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
4787 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index,
4789 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index,
4791 self.nat44_add_static_mapping(self.pg5.remote_ip4, external_addr,
4792 local_port, external_port, vrf_id=1,
4793 proto=IP_PROTOS.tcp, out2in_only=1)
4794 self.nat44_add_static_mapping(
4795 self.pg0.remote_ip4, external_sw_if_index=self.pg0.sw_if_index,
4796 local_port=local_port, vrf_id=0, external_port=external_port,
4797 proto=IP_PROTOS.tcp, out2in_only=1)
4799 # from client to service (both VRF1)
4800 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4801 IP(src=self.pg6.remote_ip4, dst=external_addr) /
4802 TCP(sport=12345, dport=external_port))
4803 self.pg6.add_stream(p)
4804 self.pg_enable_capture(self.pg_interfaces)
4806 capture = self.pg5.get_capture(1)
4811 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4812 self.assertEqual(tcp.dport, local_port)
4813 self.assert_packet_checksums_valid(p)
4815 self.logger.error(ppp("Unexpected or invalid packet:", p))
4818 # from service back to client (both VRF1)
4819 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4820 IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) /
4821 TCP(sport=local_port, dport=12345))
4822 self.pg5.add_stream(p)
4823 self.pg_enable_capture(self.pg_interfaces)
4825 capture = self.pg6.get_capture(1)
4830 self.assertEqual(ip.src, external_addr)
4831 self.assertEqual(tcp.sport, external_port)
4832 self.assert_packet_checksums_valid(p)
4834 self.logger.error(ppp("Unexpected or invalid packet:", p))
4837 # dynamic NAT from VRF1 to VRF0 (output-feature)
4838 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4839 IP(src=self.pg5.remote_ip4, dst=self.pg1.remote_ip4) /
4840 TCP(sport=2345, dport=22))
4841 self.pg5.add_stream(p)
4842 self.pg_enable_capture(self.pg_interfaces)
4844 capture = self.pg1.get_capture(1)
4849 self.assertEqual(ip.src, self.nat_addr)
4850 self.assertNotEqual(tcp.sport, 2345)
4851 self.assert_packet_checksums_valid(p)
4854 self.logger.error(ppp("Unexpected or invalid packet:", p))
4857 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4858 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4859 TCP(sport=22, dport=port))
4860 self.pg1.add_stream(p)
4861 self.pg_enable_capture(self.pg_interfaces)
4863 capture = self.pg5.get_capture(1)
4868 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4869 self.assertEqual(tcp.dport, 2345)
4870 self.assert_packet_checksums_valid(p)
4872 self.logger.error(ppp("Unexpected or invalid packet:", p))
4875 # from client VRF1 to service VRF0
4876 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4877 IP(src=self.pg6.remote_ip4, dst=self.pg0.local_ip4) /
4878 TCP(sport=12346, dport=external_port))
4879 self.pg6.add_stream(p)
4880 self.pg_enable_capture(self.pg_interfaces)
4882 capture = self.pg0.get_capture(1)
4887 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4888 self.assertEqual(tcp.dport, local_port)
4889 self.assert_packet_checksums_valid(p)
4891 self.logger.error(ppp("Unexpected or invalid packet:", p))
4894 # from service VRF0 back to client VRF1
4895 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4896 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
4897 TCP(sport=local_port, dport=12346))
4898 self.pg0.add_stream(p)
4899 self.pg_enable_capture(self.pg_interfaces)
4901 capture = self.pg6.get_capture(1)
4906 self.assertEqual(ip.src, self.pg0.local_ip4)
4907 self.assertEqual(tcp.sport, external_port)
4908 self.assert_packet_checksums_valid(p)
4910 self.logger.error(ppp("Unexpected or invalid packet:", p))
4913 # from client VRF0 to service VRF1
4914 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4915 IP(src=self.pg0.remote_ip4, dst=external_addr) /
4916 TCP(sport=12347, dport=external_port))
4917 self.pg0.add_stream(p)
4918 self.pg_enable_capture(self.pg_interfaces)
4920 capture = self.pg5.get_capture(1)
4925 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4926 self.assertEqual(tcp.dport, local_port)
4927 self.assert_packet_checksums_valid(p)
4929 self.logger.error(ppp("Unexpected or invalid packet:", p))
4932 # from service VRF1 back to client VRF0
4933 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4934 IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) /
4935 TCP(sport=local_port, dport=12347))
4936 self.pg5.add_stream(p)
4937 self.pg_enable_capture(self.pg_interfaces)
4939 capture = self.pg0.get_capture(1)
4944 self.assertEqual(ip.src, external_addr)
4945 self.assertEqual(tcp.sport, external_port)
4946 self.assert_packet_checksums_valid(p)
4948 self.logger.error(ppp("Unexpected or invalid packet:", p))
4951 # from client to server (both VRF1, no translation)
4952 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4953 IP(src=self.pg6.remote_ip4, dst=self.pg5.remote_ip4) /
4954 TCP(sport=12348, dport=local_port))
4955 self.pg6.add_stream(p)
4956 self.pg_enable_capture(self.pg_interfaces)
4958 capture = self.pg5.get_capture(1)
4963 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4964 self.assertEqual(tcp.dport, local_port)
4965 self.assert_packet_checksums_valid(p)
4967 self.logger.error(ppp("Unexpected or invalid packet:", p))
4970 # from server back to client (both VRF1, no translation)
4971 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4972 IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) /
4973 TCP(sport=local_port, dport=12348))
4974 self.pg5.add_stream(p)
4975 self.pg_enable_capture(self.pg_interfaces)
4977 capture = self.pg6.get_capture(1)
4982 self.assertEqual(ip.src, self.pg5.remote_ip4)
4983 self.assertEqual(tcp.sport, local_port)
4984 self.assert_packet_checksums_valid(p)
4986 self.logger.error(ppp("Unexpected or invalid packet:", p))
4989 # from client VRF1 to server VRF0 (no translation)
4990 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4991 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
4992 TCP(sport=local_port, dport=12349))
4993 self.pg0.add_stream(p)
4994 self.pg_enable_capture(self.pg_interfaces)
4996 capture = self.pg6.get_capture(1)
5001 self.assertEqual(ip.src, self.pg0.remote_ip4)
5002 self.assertEqual(tcp.sport, local_port)
5003 self.assert_packet_checksums_valid(p)
5005 self.logger.error(ppp("Unexpected or invalid packet:", p))
5008 # from server VRF0 back to client VRF1 (no translation)
5009 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5010 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
5011 TCP(sport=local_port, dport=12349))
5012 self.pg0.add_stream(p)
5013 self.pg_enable_capture(self.pg_interfaces)
5015 capture = self.pg6.get_capture(1)
5020 self.assertEqual(ip.src, self.pg0.remote_ip4)
5021 self.assertEqual(tcp.sport, local_port)
5022 self.assert_packet_checksums_valid(p)
5024 self.logger.error(ppp("Unexpected or invalid packet:", p))
5027 # from client VRF0 to server VRF1 (no translation)
5028 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5029 IP(src=self.pg0.remote_ip4, dst=self.pg5.remote_ip4) /
5030 TCP(sport=12344, dport=local_port))
5031 self.pg0.add_stream(p)
5032 self.pg_enable_capture(self.pg_interfaces)
5034 capture = self.pg5.get_capture(1)
5039 self.assertEqual(ip.dst, self.pg5.remote_ip4)
5040 self.assertEqual(tcp.dport, local_port)
5041 self.assert_packet_checksums_valid(p)
5043 self.logger.error(ppp("Unexpected or invalid packet:", p))
5046 # from server VRF1 back to client VRF0 (no translation)
5047 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
5048 IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) /
5049 TCP(sport=local_port, dport=12344))
5050 self.pg5.add_stream(p)
5051 self.pg_enable_capture(self.pg_interfaces)
5053 capture = self.pg0.get_capture(1)
5058 self.assertEqual(ip.src, self.pg5.remote_ip4)
5059 self.assertEqual(tcp.sport, local_port)
5060 self.assert_packet_checksums_valid(p)
5062 self.logger.error(ppp("Unexpected or invalid packet:", p))
5065 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5066 def test_session_timeout(self):
5067 """ NAT44 session timeouts """
5068 self.nat44_add_address(self.nat_addr)
5069 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5070 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5072 self.vapi.nat_set_timeouts(icmp=5)
5076 for i in range(0, max_sessions):
5077 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
5078 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5079 IP(src=src, dst=self.pg1.remote_ip4) /
5080 ICMP(id=1025, type='echo-request'))
5082 self.pg0.add_stream(pkts)
5083 self.pg_enable_capture(self.pg_interfaces)
5085 self.pg1.get_capture(max_sessions)
5090 for i in range(0, max_sessions):
5091 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
5092 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5093 IP(src=src, dst=self.pg1.remote_ip4) /
5094 ICMP(id=1026, type='echo-request'))
5096 self.pg0.add_stream(pkts)
5097 self.pg_enable_capture(self.pg_interfaces)
5099 self.pg1.get_capture(max_sessions)
5102 users = self.vapi.nat44_user_dump()
5104 nsessions = nsessions + user.nsessions
5105 self.assertLess(nsessions, 2 * max_sessions)
5107 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5108 def test_session_limit_per_user(self):
5109 """ Maximum sessions per user limit """
5110 self.nat44_add_address(self.nat_addr)
5111 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5112 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5114 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
5115 src_address=self.pg2.local_ip4n,
5117 template_interval=10)
5119 # get maximum number of translations per user
5120 nat44_config = self.vapi.nat_show_config()
5123 for port in range(0, nat44_config.max_translations_per_user):
5124 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5125 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5126 UDP(sport=1025 + port, dport=1025 + port))
5129 self.pg0.add_stream(pkts)
5130 self.pg_enable_capture(self.pg_interfaces)
5132 capture = self.pg1.get_capture(len(pkts))
5134 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5135 src_port=self.ipfix_src_port)
5137 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5138 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5139 UDP(sport=3001, dport=3002))
5140 self.pg0.add_stream(p)
5141 self.pg_enable_capture(self.pg_interfaces)
5143 capture = self.pg1.assert_nothing_captured()
5145 # verify IPFIX logging
5146 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5148 capture = self.pg2.get_capture(10)
5149 ipfix = IPFIXDecoder()
5150 # first load template
5152 self.assertTrue(p.haslayer(IPFIX))
5153 if p.haslayer(Template):
5154 ipfix.add_template(p.getlayer(Template))
5155 # verify events in data set
5157 if p.haslayer(Data):
5158 data = ipfix.decode_data_set(p.getlayer(Set))
5159 self.verify_ipfix_max_entries_per_user(
5161 nat44_config.max_translations_per_user,
5162 self.pg0.remote_ip4n)
5165 super(TestNAT44EndpointDependent, self).tearDown()
5166 if not self.vpp_dead:
5167 self.logger.info(self.vapi.cli("show nat44 addresses"))
5168 self.logger.info(self.vapi.cli("show nat44 interfaces"))
5169 self.logger.info(self.vapi.cli("show nat44 static mappings"))
5170 self.logger.info(self.vapi.cli("show nat44 interface address"))
5171 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
5172 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
5173 self.logger.info(self.vapi.cli("show nat timeouts"))
5175 self.vapi.cli("clear logging")
5178 class TestNAT44Out2InDPO(MethodHolder):
5179 """ NAT44 Test Cases using out2in DPO """
5182 def setUpConstants(cls):
5183 super(TestNAT44Out2InDPO, cls).setUpConstants()
5184 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
5187 def setUpClass(cls):
5188 super(TestNAT44Out2InDPO, cls).setUpClass()
5189 cls.vapi.cli("set log class nat level debug")
5192 cls.tcp_port_in = 6303
5193 cls.tcp_port_out = 6303
5194 cls.udp_port_in = 6304
5195 cls.udp_port_out = 6304
5196 cls.icmp_id_in = 6305
5197 cls.icmp_id_out = 6305
5198 cls.nat_addr = '10.0.0.3'
5199 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5200 cls.dst_ip4 = '192.168.70.1'
5202 cls.create_pg_interfaces(range(2))
5205 cls.pg0.config_ip4()
5206 cls.pg0.resolve_arp()
5209 cls.pg1.config_ip6()
5210 cls.pg1.resolve_ndp()
5212 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
5213 dst_address_length=0,
5214 next_hop_address=cls.pg1.remote_ip6n,
5215 next_hop_sw_if_index=cls.pg1.sw_if_index)
5218 super(TestNAT44Out2InDPO, cls).tearDownClass()
5221 def configure_xlat(self):
5222 self.dst_ip6_pfx = '1:2:3::'
5223 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
5225 self.dst_ip6_pfx_len = 96
5226 self.src_ip6_pfx = '4:5:6::'
5227 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
5229 self.src_ip6_pfx_len = 96
5230 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
5231 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
5232 '\x00\x00\x00\x00', 0, is_translation=1,
5235 def test_464xlat_ce(self):
5236 """ Test 464XLAT CE with NAT44 """
5238 nat_config = self.vapi.nat_show_config()
5239 self.assertEqual(1, nat_config.out2in_dpo)
5241 self.configure_xlat()
5243 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5244 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
5246 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
5247 self.dst_ip6_pfx_len)
5248 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
5249 self.src_ip6_pfx_len)
5252 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
5253 self.pg0.add_stream(pkts)
5254 self.pg_enable_capture(self.pg_interfaces)
5256 capture = self.pg1.get_capture(len(pkts))
5257 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
5260 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
5262 self.pg1.add_stream(pkts)
5263 self.pg_enable_capture(self.pg_interfaces)
5265 capture = self.pg0.get_capture(len(pkts))
5266 self.verify_capture_in(capture, self.pg0)
5268 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
5270 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
5271 self.nat_addr_n, is_add=0)
5273 def test_464xlat_ce_no_nat(self):
5274 """ Test 464XLAT CE without NAT44 """
5276 self.configure_xlat()
5278 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
5279 self.dst_ip6_pfx_len)
5280 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
5281 self.src_ip6_pfx_len)
5283 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
5284 self.pg0.add_stream(pkts)
5285 self.pg_enable_capture(self.pg_interfaces)
5287 capture = self.pg1.get_capture(len(pkts))
5288 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
5289 nat_ip=out_dst_ip6, same_port=True)
5291 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
5292 self.pg1.add_stream(pkts)
5293 self.pg_enable_capture(self.pg_interfaces)
5295 capture = self.pg0.get_capture(len(pkts))
5296 self.verify_capture_in(capture, self.pg0)
5299 class TestDeterministicNAT(MethodHolder):
5300 """ Deterministic NAT Test Cases """
5303 def setUpConstants(cls):
5304 super(TestDeterministicNAT, cls).setUpConstants()
5305 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
5308 def setUpClass(cls):
5309 super(TestDeterministicNAT, cls).setUpClass()
5310 cls.vapi.cli("set log class nat level debug")
5313 cls.tcp_port_in = 6303
5314 cls.tcp_external_port = 6303
5315 cls.udp_port_in = 6304
5316 cls.udp_external_port = 6304
5317 cls.icmp_id_in = 6305
5318 cls.nat_addr = '10.0.0.3'
5320 cls.create_pg_interfaces(range(3))
5321 cls.interfaces = list(cls.pg_interfaces)
5323 for i in cls.interfaces:
5328 cls.pg0.generate_remote_hosts(2)
5329 cls.pg0.configure_ipv4_neighbors()
5332 super(TestDeterministicNAT, cls).tearDownClass()
5335 def create_stream_in(self, in_if, out_if, ttl=64):
5337 Create packet stream for inside network
5339 :param in_if: Inside interface
5340 :param out_if: Outside interface
5341 :param ttl: TTL of generated packets
5345 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5346 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5347 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
5351 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5352 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5353 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
5357 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5358 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5359 ICMP(id=self.icmp_id_in, type='echo-request'))
5364 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
5366 Create packet stream for outside network
5368 :param out_if: Outside interface
5369 :param dst_ip: Destination IP address (Default use global NAT address)
5370 :param ttl: TTL of generated packets
5373 dst_ip = self.nat_addr
5376 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5377 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5378 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
5382 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5383 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5384 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
5388 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5389 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5390 ICMP(id=self.icmp_external_id, type='echo-reply'))
5395 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
5397 Verify captured packets on outside network
5399 :param capture: Captured packets
5400 :param nat_ip: Translated IP address (Default use global NAT address)
5401 :param same_port: Sorce port number is not translated (Default False)
5402 :param packet_num: Expected number of packets (Default 3)
5405 nat_ip = self.nat_addr
5406 self.assertEqual(packet_num, len(capture))
5407 for packet in capture:
5409 self.assertEqual(packet[IP].src, nat_ip)
5410 if packet.haslayer(TCP):
5411 self.tcp_port_out = packet[TCP].sport
5412 elif packet.haslayer(UDP):
5413 self.udp_port_out = packet[UDP].sport
5415 self.icmp_external_id = packet[ICMP].id
5417 self.logger.error(ppp("Unexpected or invalid packet "
5418 "(outside network):", packet))
5421 def test_deterministic_mode(self):
5422 """ NAT plugin run deterministic mode """
5423 in_addr = '172.16.255.0'
5424 out_addr = '172.17.255.50'
5425 in_addr_t = '172.16.255.20'
5426 in_addr_n = socket.inet_aton(in_addr)
5427 out_addr_n = socket.inet_aton(out_addr)
5428 in_addr_t_n = socket.inet_aton(in_addr_t)
5432 nat_config = self.vapi.nat_show_config()
5433 self.assertEqual(1, nat_config.deterministic)
5435 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
5437 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
5438 self.assertEqual(rep1.out_addr[:4], out_addr_n)
5439 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
5440 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
5442 deterministic_mappings = self.vapi.nat_det_map_dump()
5443 self.assertEqual(len(deterministic_mappings), 1)
5444 dsm = deterministic_mappings[0]
5445 self.assertEqual(in_addr_n, dsm.in_addr[:4])
5446 self.assertEqual(in_plen, dsm.in_plen)
5447 self.assertEqual(out_addr_n, dsm.out_addr[:4])
5448 self.assertEqual(out_plen, dsm.out_plen)
5450 self.clear_nat_det()
5451 deterministic_mappings = self.vapi.nat_det_map_dump()
5452 self.assertEqual(len(deterministic_mappings), 0)
5454 def test_set_timeouts(self):
5455 """ Set deterministic NAT timeouts """
5456 timeouts_before = self.vapi.nat_get_timeouts()
5458 self.vapi.nat_set_timeouts(timeouts_before.udp + 10,
5459 timeouts_before.tcp_established + 10,
5460 timeouts_before.tcp_transitory + 10,
5461 timeouts_before.icmp + 10)
5463 timeouts_after = self.vapi.nat_get_timeouts()
5465 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
5466 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
5467 self.assertNotEqual(timeouts_before.tcp_established,
5468 timeouts_after.tcp_established)
5469 self.assertNotEqual(timeouts_before.tcp_transitory,
5470 timeouts_after.tcp_transitory)
5472 def test_det_in(self):
5473 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
5475 nat_ip = "10.0.0.10"
5477 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5479 socket.inet_aton(nat_ip),
5481 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5482 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5486 pkts = self.create_stream_in(self.pg0, self.pg1)
5487 self.pg0.add_stream(pkts)
5488 self.pg_enable_capture(self.pg_interfaces)
5490 capture = self.pg1.get_capture(len(pkts))
5491 self.verify_capture_out(capture, nat_ip)
5494 pkts = self.create_stream_out(self.pg1, nat_ip)
5495 self.pg1.add_stream(pkts)
5496 self.pg_enable_capture(self.pg_interfaces)
5498 capture = self.pg0.get_capture(len(pkts))
5499 self.verify_capture_in(capture, self.pg0)
5502 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
5503 self.assertEqual(len(sessions), 3)
5507 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5508 self.assertEqual(s.in_port, self.tcp_port_in)
5509 self.assertEqual(s.out_port, self.tcp_port_out)
5510 self.assertEqual(s.ext_port, self.tcp_external_port)
5514 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5515 self.assertEqual(s.in_port, self.udp_port_in)
5516 self.assertEqual(s.out_port, self.udp_port_out)
5517 self.assertEqual(s.ext_port, self.udp_external_port)
5521 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5522 self.assertEqual(s.in_port, self.icmp_id_in)
5523 self.assertEqual(s.out_port, self.icmp_external_id)
5525 def test_multiple_users(self):
5526 """ Deterministic NAT multiple users """
5528 nat_ip = "10.0.0.10"
5530 external_port = 6303
5532 host0 = self.pg0.remote_hosts[0]
5533 host1 = self.pg0.remote_hosts[1]
5535 self.vapi.nat_det_add_del_map(host0.ip4n,
5537 socket.inet_aton(nat_ip),
5539 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5540 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5544 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
5545 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
5546 TCP(sport=port_in, dport=external_port))
5547 self.pg0.add_stream(p)
5548 self.pg_enable_capture(self.pg_interfaces)
5550 capture = self.pg1.get_capture(1)
5555 self.assertEqual(ip.src, nat_ip)
5556 self.assertEqual(ip.dst, self.pg1.remote_ip4)
5557 self.assertEqual(tcp.dport, external_port)
5558 port_out0 = tcp.sport
5560 self.logger.error(ppp("Unexpected or invalid packet:", p))
5564 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
5565 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
5566 TCP(sport=port_in, dport=external_port))
5567 self.pg0.add_stream(p)
5568 self.pg_enable_capture(self.pg_interfaces)
5570 capture = self.pg1.get_capture(1)
5575 self.assertEqual(ip.src, nat_ip)
5576 self.assertEqual(ip.dst, self.pg1.remote_ip4)
5577 self.assertEqual(tcp.dport, external_port)
5578 port_out1 = tcp.sport
5580 self.logger.error(ppp("Unexpected or invalid packet:", p))
5583 dms = self.vapi.nat_det_map_dump()
5584 self.assertEqual(1, len(dms))
5585 self.assertEqual(2, dms[0].ses_num)
5588 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5589 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
5590 TCP(sport=external_port, dport=port_out0))
5591 self.pg1.add_stream(p)
5592 self.pg_enable_capture(self.pg_interfaces)
5594 capture = self.pg0.get_capture(1)
5599 self.assertEqual(ip.src, self.pg1.remote_ip4)
5600 self.assertEqual(ip.dst, host0.ip4)
5601 self.assertEqual(tcp.dport, port_in)
5602 self.assertEqual(tcp.sport, external_port)
5604 self.logger.error(ppp("Unexpected or invalid packet:", p))
5608 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5609 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
5610 TCP(sport=external_port, dport=port_out1))
5611 self.pg1.add_stream(p)
5612 self.pg_enable_capture(self.pg_interfaces)
5614 capture = self.pg0.get_capture(1)
5619 self.assertEqual(ip.src, self.pg1.remote_ip4)
5620 self.assertEqual(ip.dst, host1.ip4)
5621 self.assertEqual(tcp.dport, port_in)
5622 self.assertEqual(tcp.sport, external_port)
5624 self.logger.error(ppp("Unexpected or invalid packet", p))
5627 # session close api test
5628 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
5630 self.pg1.remote_ip4n,
5632 dms = self.vapi.nat_det_map_dump()
5633 self.assertEqual(dms[0].ses_num, 1)
5635 self.vapi.nat_det_close_session_in(host0.ip4n,
5637 self.pg1.remote_ip4n,
5639 dms = self.vapi.nat_det_map_dump()
5640 self.assertEqual(dms[0].ses_num, 0)
5642 def test_tcp_session_close_detection_in(self):
5643 """ Deterministic NAT TCP session close from inside network """
5644 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5646 socket.inet_aton(self.nat_addr),
5648 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5649 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5652 self.initiate_tcp_session(self.pg0, self.pg1)
5654 # close the session from inside
5656 # FIN packet in -> out
5657 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5658 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5659 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5661 self.pg0.add_stream(p)
5662 self.pg_enable_capture(self.pg_interfaces)
5664 self.pg1.get_capture(1)
5668 # ACK packet out -> in
5669 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5670 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5671 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5675 # FIN packet out -> in
5676 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5677 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5678 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5682 self.pg1.add_stream(pkts)
5683 self.pg_enable_capture(self.pg_interfaces)
5685 self.pg0.get_capture(2)
5687 # ACK packet in -> out
5688 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5689 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5690 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5692 self.pg0.add_stream(p)
5693 self.pg_enable_capture(self.pg_interfaces)
5695 self.pg1.get_capture(1)
5697 # Check if deterministic NAT44 closed the session
5698 dms = self.vapi.nat_det_map_dump()
5699 self.assertEqual(0, dms[0].ses_num)
5701 self.logger.error("TCP session termination failed")
5704 def test_tcp_session_close_detection_out(self):
5705 """ Deterministic NAT TCP session close from outside network """
5706 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5708 socket.inet_aton(self.nat_addr),
5710 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5711 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5714 self.initiate_tcp_session(self.pg0, self.pg1)
5716 # close the session from outside
5718 # FIN packet out -> in
5719 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5720 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5721 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5723 self.pg1.add_stream(p)
5724 self.pg_enable_capture(self.pg_interfaces)
5726 self.pg0.get_capture(1)
5730 # ACK packet in -> out
5731 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5732 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5733 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5737 # ACK packet in -> out
5738 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5739 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5740 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5744 self.pg0.add_stream(pkts)
5745 self.pg_enable_capture(self.pg_interfaces)
5747 self.pg1.get_capture(2)
5749 # ACK packet out -> in
5750 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5751 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5752 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5754 self.pg1.add_stream(p)
5755 self.pg_enable_capture(self.pg_interfaces)
5757 self.pg0.get_capture(1)
5759 # Check if deterministic NAT44 closed the session
5760 dms = self.vapi.nat_det_map_dump()
5761 self.assertEqual(0, dms[0].ses_num)
5763 self.logger.error("TCP session termination failed")
5766 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5767 def test_session_timeout(self):
5768 """ Deterministic NAT session timeouts """
5769 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5771 socket.inet_aton(self.nat_addr),
5773 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5774 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5777 self.initiate_tcp_session(self.pg0, self.pg1)
5778 self.vapi.nat_set_timeouts(5, 5, 5, 5)
5779 pkts = self.create_stream_in(self.pg0, self.pg1)
5780 self.pg0.add_stream(pkts)
5781 self.pg_enable_capture(self.pg_interfaces)
5783 capture = self.pg1.get_capture(len(pkts))
5786 dms = self.vapi.nat_det_map_dump()
5787 self.assertEqual(0, dms[0].ses_num)
5789 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5790 def test_session_limit_per_user(self):
5791 """ Deterministic NAT maximum sessions per user limit """
5792 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5794 socket.inet_aton(self.nat_addr),
5796 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5797 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5799 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
5800 src_address=self.pg2.local_ip4n,
5802 template_interval=10)
5803 self.vapi.nat_ipfix()
5806 for port in range(1025, 2025):
5807 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5808 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5809 UDP(sport=port, dport=port))
5812 self.pg0.add_stream(pkts)
5813 self.pg_enable_capture(self.pg_interfaces)
5815 capture = self.pg1.get_capture(len(pkts))
5817 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5818 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5819 UDP(sport=3001, dport=3002))
5820 self.pg0.add_stream(p)
5821 self.pg_enable_capture(self.pg_interfaces)
5823 capture = self.pg1.assert_nothing_captured()
5825 # verify ICMP error packet
5826 capture = self.pg0.get_capture(1)
5828 self.assertTrue(p.haslayer(ICMP))
5830 self.assertEqual(icmp.type, 3)
5831 self.assertEqual(icmp.code, 1)
5832 self.assertTrue(icmp.haslayer(IPerror))
5833 inner_ip = icmp[IPerror]
5834 self.assertEqual(inner_ip[UDPerror].sport, 3001)
5835 self.assertEqual(inner_ip[UDPerror].dport, 3002)
5837 dms = self.vapi.nat_det_map_dump()
5839 self.assertEqual(1000, dms[0].ses_num)
5841 # verify IPFIX logging
5842 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5844 capture = self.pg2.get_capture(2)
5845 ipfix = IPFIXDecoder()
5846 # first load template
5848 self.assertTrue(p.haslayer(IPFIX))
5849 if p.haslayer(Template):
5850 ipfix.add_template(p.getlayer(Template))
5851 # verify events in data set
5853 if p.haslayer(Data):
5854 data = ipfix.decode_data_set(p.getlayer(Set))
5855 self.verify_ipfix_max_entries_per_user(data,
5857 self.pg0.remote_ip4n)
5859 def clear_nat_det(self):
5861 Clear deterministic NAT configuration.
5863 self.vapi.nat_ipfix(enable=0)
5864 self.vapi.nat_set_timeouts()
5865 deterministic_mappings = self.vapi.nat_det_map_dump()
5866 for dsm in deterministic_mappings:
5867 self.vapi.nat_det_add_del_map(dsm.in_addr,
5873 interfaces = self.vapi.nat44_interface_dump()
5874 for intf in interfaces:
5875 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
5880 super(TestDeterministicNAT, self).tearDown()
5881 if not self.vpp_dead:
5882 self.logger.info(self.vapi.cli("show nat44 interfaces"))
5883 self.logger.info(self.vapi.cli("show nat timeouts"))
5885 self.vapi.cli("show nat44 deterministic mappings"))
5887 self.vapi.cli("show nat44 deterministic sessions"))
5888 self.clear_nat_det()
5891 class TestNAT64(MethodHolder):
5892 """ NAT64 Test Cases """
5895 def setUpConstants(cls):
5896 super(TestNAT64, cls).setUpConstants()
5897 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
5898 "nat64 st hash buckets 256", "}"])
5901 def setUpClass(cls):
5902 super(TestNAT64, cls).setUpClass()
5905 cls.tcp_port_in = 6303
5906 cls.tcp_port_out = 6303
5907 cls.udp_port_in = 6304
5908 cls.udp_port_out = 6304
5909 cls.icmp_id_in = 6305
5910 cls.icmp_id_out = 6305
5911 cls.nat_addr = '10.0.0.3'
5912 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5914 cls.vrf1_nat_addr = '10.0.10.3'
5915 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
5917 cls.ipfix_src_port = 4739
5918 cls.ipfix_domain_id = 1
5920 cls.create_pg_interfaces(range(6))
5921 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
5922 cls.ip6_interfaces.append(cls.pg_interfaces[2])
5923 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
5925 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
5927 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
5929 cls.pg0.generate_remote_hosts(2)
5931 for i in cls.ip6_interfaces:
5934 i.configure_ipv6_neighbors()
5936 for i in cls.ip4_interfaces:
5942 cls.pg3.config_ip4()
5943 cls.pg3.resolve_arp()
5944 cls.pg3.config_ip6()
5945 cls.pg3.configure_ipv6_neighbors()
5948 cls.pg5.config_ip6()
5951 super(TestNAT64, cls).tearDownClass()
5954 def test_nat64_inside_interface_handles_neighbor_advertisement(self):
5955 """ NAT64 inside interface handles Neighbor Advertisement """
5957 self.vapi.nat64_add_del_interface(self.pg5.sw_if_index)
5960 ping = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
5961 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
5962 ICMPv6EchoRequest())
5964 self.pg5.add_stream(pkts)
5965 self.pg_enable_capture(self.pg_interfaces)
5968 # Wait for Neighbor Solicitation
5969 capture = self.pg5.get_capture(len(pkts))
5970 self.assertEqual(1, len(capture))
5973 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
5974 self.assertTrue(packet.haslayer(ICMPv6ND_NS))
5975 tgt = packet[ICMPv6ND_NS].tgt
5977 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5980 # Send Neighbor Advertisement
5981 p = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
5982 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
5983 ICMPv6ND_NA(tgt=tgt) /
5984 ICMPv6NDOptDstLLAddr(lladdr=self.pg5.remote_mac))
5986 self.pg5.add_stream(pkts)
5987 self.pg_enable_capture(self.pg_interfaces)
5990 # Try to send ping again
5992 self.pg5.add_stream(pkts)
5993 self.pg_enable_capture(self.pg_interfaces)
5996 # Wait for ping reply
5997 capture = self.pg5.get_capture(len(pkts))
5998 self.assertEqual(1, len(capture))
6001 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
6002 self.assertEqual(packet[IPv6].dst, self.pg5.remote_ip6)
6003 self.assertTrue(packet.haslayer(ICMPv6EchoReply))
6005 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6008 def test_pool(self):
6009 """ Add/delete address to NAT64 pool """
6010 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
6012 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
6014 addresses = self.vapi.nat64_pool_addr_dump()
6015 self.assertEqual(len(addresses), 1)
6016 self.assertEqual(addresses[0].address, nat_addr)
6018 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
6020 addresses = self.vapi.nat64_pool_addr_dump()
6021 self.assertEqual(len(addresses), 0)
6023 def test_interface(self):
6024 """ Enable/disable NAT64 feature on the interface """
6025 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6026 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6028 interfaces = self.vapi.nat64_interface_dump()
6029 self.assertEqual(len(interfaces), 2)
6032 for intf in interfaces:
6033 if intf.sw_if_index == self.pg0.sw_if_index:
6034 self.assertEqual(intf.is_inside, 1)
6036 elif intf.sw_if_index == self.pg1.sw_if_index:
6037 self.assertEqual(intf.is_inside, 0)
6039 self.assertTrue(pg0_found)
6040 self.assertTrue(pg1_found)
6042 features = self.vapi.cli("show interface features pg0")
6043 self.assertNotEqual(features.find('nat64-in2out'), -1)
6044 features = self.vapi.cli("show interface features pg1")
6045 self.assertNotEqual(features.find('nat64-out2in'), -1)
6047 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
6048 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
6050 interfaces = self.vapi.nat64_interface_dump()
6051 self.assertEqual(len(interfaces), 0)
6053 def test_static_bib(self):
6054 """ Add/delete static BIB entry """
6055 in_addr = socket.inet_pton(socket.AF_INET6,
6056 '2001:db8:85a3::8a2e:370:7334')
6057 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
6060 proto = IP_PROTOS.tcp
6062 self.vapi.nat64_add_del_static_bib(in_addr,
6067 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
6072 self.assertEqual(bibe.i_addr, in_addr)
6073 self.assertEqual(bibe.o_addr, out_addr)
6074 self.assertEqual(bibe.i_port, in_port)
6075 self.assertEqual(bibe.o_port, out_port)
6076 self.assertEqual(static_bib_num, 1)
6078 self.vapi.nat64_add_del_static_bib(in_addr,
6084 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
6089 self.assertEqual(static_bib_num, 0)
6091 def test_set_timeouts(self):
6092 """ Set NAT64 timeouts """
6093 # verify default values
6094 timeouts = self.vapi.nat_get_timeouts()
6095 self.assertEqual(timeouts.udp, 300)
6096 self.assertEqual(timeouts.icmp, 60)
6097 self.assertEqual(timeouts.tcp_transitory, 240)
6098 self.assertEqual(timeouts.tcp_established, 7440)
6100 # set and verify custom values
6101 self.vapi.nat_set_timeouts(udp=200, icmp=30, tcp_transitory=250,
6102 tcp_established=7450)
6103 timeouts = self.vapi.nat_get_timeouts()
6104 self.assertEqual(timeouts.udp, 200)
6105 self.assertEqual(timeouts.icmp, 30)
6106 self.assertEqual(timeouts.tcp_transitory, 250)
6107 self.assertEqual(timeouts.tcp_established, 7450)
6109 def test_dynamic(self):
6110 """ NAT64 dynamic translation test """
6111 self.tcp_port_in = 6303
6112 self.udp_port_in = 6304
6113 self.icmp_id_in = 6305
6115 ses_num_start = self.nat64_get_ses_num()
6117 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6119 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6120 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6123 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6124 self.pg0.add_stream(pkts)
6125 self.pg_enable_capture(self.pg_interfaces)
6127 capture = self.pg1.get_capture(len(pkts))
6128 self.verify_capture_out(capture, nat_ip=self.nat_addr,
6129 dst_ip=self.pg1.remote_ip4)
6132 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6133 self.pg1.add_stream(pkts)
6134 self.pg_enable_capture(self.pg_interfaces)
6136 capture = self.pg0.get_capture(len(pkts))
6137 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
6138 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
6141 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6142 self.pg0.add_stream(pkts)
6143 self.pg_enable_capture(self.pg_interfaces)
6145 capture = self.pg1.get_capture(len(pkts))
6146 self.verify_capture_out(capture, nat_ip=self.nat_addr,
6147 dst_ip=self.pg1.remote_ip4)
6150 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6151 self.pg1.add_stream(pkts)
6152 self.pg_enable_capture(self.pg_interfaces)
6154 capture = self.pg0.get_capture(len(pkts))
6155 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
6157 ses_num_end = self.nat64_get_ses_num()
6159 self.assertEqual(ses_num_end - ses_num_start, 3)
6161 # tenant with specific VRF
6162 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
6163 self.vrf1_nat_addr_n,
6164 vrf_id=self.vrf1_id)
6165 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
6167 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
6168 self.pg2.add_stream(pkts)
6169 self.pg_enable_capture(self.pg_interfaces)
6171 capture = self.pg1.get_capture(len(pkts))
6172 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
6173 dst_ip=self.pg1.remote_ip4)
6175 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
6176 self.pg1.add_stream(pkts)
6177 self.pg_enable_capture(self.pg_interfaces)
6179 capture = self.pg2.get_capture(len(pkts))
6180 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
6182 def test_static(self):
6183 """ NAT64 static translation test """
6184 self.tcp_port_in = 60303
6185 self.udp_port_in = 60304
6186 self.icmp_id_in = 60305
6187 self.tcp_port_out = 60303
6188 self.udp_port_out = 60304
6189 self.icmp_id_out = 60305
6191 ses_num_start = self.nat64_get_ses_num()
6193 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6195 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6196 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6198 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
6203 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
6208 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
6215 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6216 self.pg0.add_stream(pkts)
6217 self.pg_enable_capture(self.pg_interfaces)
6219 capture = self.pg1.get_capture(len(pkts))
6220 self.verify_capture_out(capture, nat_ip=self.nat_addr,
6221 dst_ip=self.pg1.remote_ip4, same_port=True)
6224 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6225 self.pg1.add_stream(pkts)
6226 self.pg_enable_capture(self.pg_interfaces)
6228 capture = self.pg0.get_capture(len(pkts))
6229 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
6230 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
6232 ses_num_end = self.nat64_get_ses_num()
6234 self.assertEqual(ses_num_end - ses_num_start, 3)
6236 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
6237 def test_session_timeout(self):
6238 """ NAT64 session timeout """
6239 self.icmp_id_in = 1234
6240 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6242 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6243 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6244 self.vapi.nat_set_timeouts(icmp=5, tcp_transitory=5, tcp_established=5)
6246 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6247 self.pg0.add_stream(pkts)
6248 self.pg_enable_capture(self.pg_interfaces)
6250 capture = self.pg1.get_capture(len(pkts))
6252 ses_num_before_timeout = self.nat64_get_ses_num()
6256 # ICMP and TCP session after timeout
6257 ses_num_after_timeout = self.nat64_get_ses_num()
6258 self.assertEqual(ses_num_before_timeout - ses_num_after_timeout, 2)
6260 def test_icmp_error(self):
6261 """ NAT64 ICMP Error message translation """
6262 self.tcp_port_in = 6303
6263 self.udp_port_in = 6304
6264 self.icmp_id_in = 6305
6266 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6268 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6269 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6271 # send some packets to create sessions
6272 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6273 self.pg0.add_stream(pkts)
6274 self.pg_enable_capture(self.pg_interfaces)
6276 capture_ip4 = self.pg1.get_capture(len(pkts))
6277 self.verify_capture_out(capture_ip4,
6278 nat_ip=self.nat_addr,
6279 dst_ip=self.pg1.remote_ip4)
6281 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6282 self.pg1.add_stream(pkts)
6283 self.pg_enable_capture(self.pg_interfaces)
6285 capture_ip6 = self.pg0.get_capture(len(pkts))
6286 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
6287 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
6288 self.pg0.remote_ip6)
6291 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6292 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
6293 ICMPv6DestUnreach(code=1) /
6294 packet[IPv6] for packet in capture_ip6]
6295 self.pg0.add_stream(pkts)
6296 self.pg_enable_capture(self.pg_interfaces)
6298 capture = self.pg1.get_capture(len(pkts))
6299 for packet in capture:
6301 self.assertEqual(packet[IP].src, self.nat_addr)
6302 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
6303 self.assertEqual(packet[ICMP].type, 3)
6304 self.assertEqual(packet[ICMP].code, 13)
6305 inner = packet[IPerror]
6306 self.assertEqual(inner.src, self.pg1.remote_ip4)
6307 self.assertEqual(inner.dst, self.nat_addr)
6308 self.assert_packet_checksums_valid(packet)
6309 if inner.haslayer(TCPerror):
6310 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
6311 elif inner.haslayer(UDPerror):
6312 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
6314 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
6316 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6320 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6321 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
6322 ICMP(type=3, code=13) /
6323 packet[IP] for packet in capture_ip4]
6324 self.pg1.add_stream(pkts)
6325 self.pg_enable_capture(self.pg_interfaces)
6327 capture = self.pg0.get_capture(len(pkts))
6328 for packet in capture:
6330 self.assertEqual(packet[IPv6].src, ip.src)
6331 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6332 icmp = packet[ICMPv6DestUnreach]
6333 self.assertEqual(icmp.code, 1)
6334 inner = icmp[IPerror6]
6335 self.assertEqual(inner.src, self.pg0.remote_ip6)
6336 self.assertEqual(inner.dst, ip.src)
6337 self.assert_icmpv6_checksum_valid(packet)
6338 if inner.haslayer(TCPerror):
6339 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
6340 elif inner.haslayer(UDPerror):
6341 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
6343 self.assertEqual(inner[ICMPv6EchoRequest].id,
6346 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6349 def test_hairpinning(self):
6350 """ NAT64 hairpinning """
6352 client = self.pg0.remote_hosts[0]
6353 server = self.pg0.remote_hosts[1]
6354 server_tcp_in_port = 22
6355 server_tcp_out_port = 4022
6356 server_udp_in_port = 23
6357 server_udp_out_port = 4023
6358 client_tcp_in_port = 1234
6359 client_udp_in_port = 1235
6360 client_tcp_out_port = 0
6361 client_udp_out_port = 0
6362 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
6363 nat_addr_ip6 = ip.src
6365 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6367 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6368 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6370 self.vapi.nat64_add_del_static_bib(server.ip6n,
6373 server_tcp_out_port,
6375 self.vapi.nat64_add_del_static_bib(server.ip6n,
6378 server_udp_out_port,
6383 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6384 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6385 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
6387 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6388 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6389 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
6391 self.pg0.add_stream(pkts)
6392 self.pg_enable_capture(self.pg_interfaces)
6394 capture = self.pg0.get_capture(len(pkts))
6395 for packet in capture:
6397 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6398 self.assertEqual(packet[IPv6].dst, server.ip6)
6399 self.assert_packet_checksums_valid(packet)
6400 if packet.haslayer(TCP):
6401 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
6402 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
6403 client_tcp_out_port = packet[TCP].sport
6405 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
6406 self.assertEqual(packet[UDP].dport, server_udp_in_port)
6407 client_udp_out_port = packet[UDP].sport
6409 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6414 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6415 IPv6(src=server.ip6, dst=nat_addr_ip6) /
6416 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
6418 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6419 IPv6(src=server.ip6, dst=nat_addr_ip6) /
6420 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
6422 self.pg0.add_stream(pkts)
6423 self.pg_enable_capture(self.pg_interfaces)
6425 capture = self.pg0.get_capture(len(pkts))
6426 for packet in capture:
6428 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6429 self.assertEqual(packet[IPv6].dst, client.ip6)
6430 self.assert_packet_checksums_valid(packet)
6431 if packet.haslayer(TCP):
6432 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
6433 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
6435 self.assertEqual(packet[UDP].sport, server_udp_out_port)
6436 self.assertEqual(packet[UDP].dport, client_udp_in_port)
6438 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6443 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6444 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6445 ICMPv6DestUnreach(code=1) /
6446 packet[IPv6] for packet in capture]
6447 self.pg0.add_stream(pkts)
6448 self.pg_enable_capture(self.pg_interfaces)
6450 capture = self.pg0.get_capture(len(pkts))
6451 for packet in capture:
6453 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6454 self.assertEqual(packet[IPv6].dst, server.ip6)
6455 icmp = packet[ICMPv6DestUnreach]
6456 self.assertEqual(icmp.code, 1)
6457 inner = icmp[IPerror6]
6458 self.assertEqual(inner.src, server.ip6)
6459 self.assertEqual(inner.dst, nat_addr_ip6)
6460 self.assert_packet_checksums_valid(packet)
6461 if inner.haslayer(TCPerror):
6462 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
6463 self.assertEqual(inner[TCPerror].dport,
6464 client_tcp_out_port)
6466 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
6467 self.assertEqual(inner[UDPerror].dport,
6468 client_udp_out_port)
6470 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6473 def test_prefix(self):
6474 """ NAT64 Network-Specific Prefix """
6476 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6478 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6479 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6480 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
6481 self.vrf1_nat_addr_n,
6482 vrf_id=self.vrf1_id)
6483 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
6486 global_pref64 = "2001:db8::"
6487 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
6488 global_pref64_len = 32
6489 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
6491 prefix = self.vapi.nat64_prefix_dump()
6492 self.assertEqual(len(prefix), 1)
6493 self.assertEqual(prefix[0].prefix, global_pref64_n)
6494 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
6495 self.assertEqual(prefix[0].vrf_id, 0)
6497 # Add tenant specific prefix
6498 vrf1_pref64 = "2001:db8:122:300::"
6499 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
6500 vrf1_pref64_len = 56
6501 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
6503 vrf_id=self.vrf1_id)
6504 prefix = self.vapi.nat64_prefix_dump()
6505 self.assertEqual(len(prefix), 2)
6508 pkts = self.create_stream_in_ip6(self.pg0,
6511 plen=global_pref64_len)
6512 self.pg0.add_stream(pkts)
6513 self.pg_enable_capture(self.pg_interfaces)
6515 capture = self.pg1.get_capture(len(pkts))
6516 self.verify_capture_out(capture, nat_ip=self.nat_addr,
6517 dst_ip=self.pg1.remote_ip4)
6519 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6520 self.pg1.add_stream(pkts)
6521 self.pg_enable_capture(self.pg_interfaces)
6523 capture = self.pg0.get_capture(len(pkts))
6524 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
6527 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
6529 # Tenant specific prefix
6530 pkts = self.create_stream_in_ip6(self.pg2,
6533 plen=vrf1_pref64_len)
6534 self.pg2.add_stream(pkts)
6535 self.pg_enable_capture(self.pg_interfaces)
6537 capture = self.pg1.get_capture(len(pkts))
6538 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
6539 dst_ip=self.pg1.remote_ip4)
6541 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
6542 self.pg1.add_stream(pkts)
6543 self.pg_enable_capture(self.pg_interfaces)
6545 capture = self.pg2.get_capture(len(pkts))
6546 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
6549 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
6551 def test_unknown_proto(self):
6552 """ NAT64 translate packet with unknown protocol """
6554 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6556 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6557 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6558 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6561 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6562 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
6563 TCP(sport=self.tcp_port_in, dport=20))
6564 self.pg0.add_stream(p)
6565 self.pg_enable_capture(self.pg_interfaces)
6567 p = self.pg1.get_capture(1)
6569 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6570 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
6572 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
6573 TCP(sport=1234, dport=1234))
6574 self.pg0.add_stream(p)
6575 self.pg_enable_capture(self.pg_interfaces)
6577 p = self.pg1.get_capture(1)
6580 self.assertEqual(packet[IP].src, self.nat_addr)
6581 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
6582 self.assertTrue(packet.haslayer(GRE))
6583 self.assert_packet_checksums_valid(packet)
6585 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6589 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6590 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
6592 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
6593 TCP(sport=1234, dport=1234))
6594 self.pg1.add_stream(p)
6595 self.pg_enable_capture(self.pg_interfaces)
6597 p = self.pg0.get_capture(1)
6600 self.assertEqual(packet[IPv6].src, remote_ip6)
6601 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6602 self.assertEqual(packet[IPv6].nh, 47)
6604 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6607 def test_hairpinning_unknown_proto(self):
6608 """ NAT64 translate packet with unknown protocol - hairpinning """
6610 client = self.pg0.remote_hosts[0]
6611 server = self.pg0.remote_hosts[1]
6612 server_tcp_in_port = 22
6613 server_tcp_out_port = 4022
6614 client_tcp_in_port = 1234
6615 client_tcp_out_port = 1235
6616 server_nat_ip = "10.0.0.100"
6617 client_nat_ip = "10.0.0.110"
6618 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
6619 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
6620 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
6621 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
6623 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
6625 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6626 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6628 self.vapi.nat64_add_del_static_bib(server.ip6n,
6631 server_tcp_out_port,
6634 self.vapi.nat64_add_del_static_bib(server.ip6n,
6640 self.vapi.nat64_add_del_static_bib(client.ip6n,
6643 client_tcp_out_port,
6647 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6648 IPv6(src=client.ip6, dst=server_nat_ip6) /
6649 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
6650 self.pg0.add_stream(p)
6651 self.pg_enable_capture(self.pg_interfaces)
6653 p = self.pg0.get_capture(1)
6655 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6656 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
6658 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
6659 TCP(sport=1234, dport=1234))
6660 self.pg0.add_stream(p)
6661 self.pg_enable_capture(self.pg_interfaces)
6663 p = self.pg0.get_capture(1)
6666 self.assertEqual(packet[IPv6].src, client_nat_ip6)
6667 self.assertEqual(packet[IPv6].dst, server.ip6)
6668 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
6670 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6674 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6675 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
6677 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
6678 TCP(sport=1234, dport=1234))
6679 self.pg0.add_stream(p)
6680 self.pg_enable_capture(self.pg_interfaces)
6682 p = self.pg0.get_capture(1)
6685 self.assertEqual(packet[IPv6].src, server_nat_ip6)
6686 self.assertEqual(packet[IPv6].dst, client.ip6)
6687 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
6689 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6692 def test_one_armed_nat64(self):
6693 """ One armed NAT64 """
6695 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
6699 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6701 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
6702 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
6705 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
6706 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
6707 TCP(sport=12345, dport=80))
6708 self.pg3.add_stream(p)
6709 self.pg_enable_capture(self.pg_interfaces)
6711 capture = self.pg3.get_capture(1)
6716 self.assertEqual(ip.src, self.nat_addr)
6717 self.assertEqual(ip.dst, self.pg3.remote_ip4)
6718 self.assertNotEqual(tcp.sport, 12345)
6719 external_port = tcp.sport
6720 self.assertEqual(tcp.dport, 80)
6721 self.assert_packet_checksums_valid(p)
6723 self.logger.error(ppp("Unexpected or invalid packet:", p))
6727 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
6728 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
6729 TCP(sport=80, dport=external_port))
6730 self.pg3.add_stream(p)
6731 self.pg_enable_capture(self.pg_interfaces)
6733 capture = self.pg3.get_capture(1)
6738 self.assertEqual(ip.src, remote_host_ip6)
6739 self.assertEqual(ip.dst, self.pg3.remote_ip6)
6740 self.assertEqual(tcp.sport, 80)
6741 self.assertEqual(tcp.dport, 12345)
6742 self.assert_packet_checksums_valid(p)
6744 self.logger.error(ppp("Unexpected or invalid packet:", p))
6747 def test_frag_in_order(self):
6748 """ NAT64 translate fragments arriving in order """
6749 self.tcp_port_in = random.randint(1025, 65535)
6751 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6753 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6754 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6756 reass = self.vapi.nat_reass_dump()
6757 reass_n_start = len(reass)
6761 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6762 self.tcp_port_in, 20, data)
6763 self.pg0.add_stream(pkts)
6764 self.pg_enable_capture(self.pg_interfaces)
6766 frags = self.pg1.get_capture(len(pkts))
6767 p = self.reass_frags_and_verify(frags,
6769 self.pg1.remote_ip4)
6770 self.assertEqual(p[TCP].dport, 20)
6771 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
6772 self.tcp_port_out = p[TCP].sport
6773 self.assertEqual(data, p[Raw].load)
6776 data = "A" * 4 + "b" * 16 + "C" * 3
6777 pkts = self.create_stream_frag(self.pg1,
6782 self.pg1.add_stream(pkts)
6783 self.pg_enable_capture(self.pg_interfaces)
6785 frags = self.pg0.get_capture(len(pkts))
6786 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6787 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
6788 self.assertEqual(p[TCP].sport, 20)
6789 self.assertEqual(p[TCP].dport, self.tcp_port_in)
6790 self.assertEqual(data, p[Raw].load)
6792 reass = self.vapi.nat_reass_dump()
6793 reass_n_end = len(reass)
6795 self.assertEqual(reass_n_end - reass_n_start, 2)
6797 def test_reass_hairpinning(self):
6798 """ NAT64 fragments hairpinning """
6800 server = self.pg0.remote_hosts[1]
6801 server_in_port = random.randint(1025, 65535)
6802 server_out_port = random.randint(1025, 65535)
6803 client_in_port = random.randint(1025, 65535)
6804 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
6805 nat_addr_ip6 = ip.src
6807 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6809 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6810 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6812 # add static BIB entry for server
6813 self.vapi.nat64_add_del_static_bib(server.ip6n,
6819 # send packet from host to server
6820 pkts = self.create_stream_frag_ip6(self.pg0,
6825 self.pg0.add_stream(pkts)
6826 self.pg_enable_capture(self.pg_interfaces)
6828 frags = self.pg0.get_capture(len(pkts))
6829 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
6830 self.assertNotEqual(p[TCP].sport, client_in_port)
6831 self.assertEqual(p[TCP].dport, server_in_port)
6832 self.assertEqual(data, p[Raw].load)
6834 def test_frag_out_of_order(self):
6835 """ NAT64 translate fragments arriving out of order """
6836 self.tcp_port_in = random.randint(1025, 65535)
6838 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6840 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6841 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6845 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6846 self.tcp_port_in, 20, data)
6848 self.pg0.add_stream(pkts)
6849 self.pg_enable_capture(self.pg_interfaces)
6851 frags = self.pg1.get_capture(len(pkts))
6852 p = self.reass_frags_and_verify(frags,
6854 self.pg1.remote_ip4)
6855 self.assertEqual(p[TCP].dport, 20)
6856 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
6857 self.tcp_port_out = p[TCP].sport
6858 self.assertEqual(data, p[Raw].load)
6861 data = "A" * 4 + "B" * 16 + "C" * 3
6862 pkts = self.create_stream_frag(self.pg1,
6868 self.pg1.add_stream(pkts)
6869 self.pg_enable_capture(self.pg_interfaces)
6871 frags = self.pg0.get_capture(len(pkts))
6872 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6873 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
6874 self.assertEqual(p[TCP].sport, 20)
6875 self.assertEqual(p[TCP].dport, self.tcp_port_in)
6876 self.assertEqual(data, p[Raw].load)
6878 def test_interface_addr(self):
6879 """ Acquire NAT64 pool addresses from interface """
6880 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
6882 # no address in NAT64 pool
6883 adresses = self.vapi.nat44_address_dump()
6884 self.assertEqual(0, len(adresses))
6886 # configure interface address and check NAT64 address pool
6887 self.pg4.config_ip4()
6888 addresses = self.vapi.nat64_pool_addr_dump()
6889 self.assertEqual(len(addresses), 1)
6890 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
6892 # remove interface address and check NAT64 address pool
6893 self.pg4.unconfig_ip4()
6894 addresses = self.vapi.nat64_pool_addr_dump()
6895 self.assertEqual(0, len(adresses))
6897 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
6898 def test_ipfix_max_bibs_sessions(self):
6899 """ IPFIX logging maximum session and BIB entries exceeded """
6902 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
6906 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6908 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6909 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6913 for i in range(0, max_bibs):
6914 src = "fd01:aa::%x" % (i)
6915 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6916 IPv6(src=src, dst=remote_host_ip6) /
6917 TCP(sport=12345, dport=80))
6919 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6920 IPv6(src=src, dst=remote_host_ip6) /
6921 TCP(sport=12345, dport=22))
6923 self.pg0.add_stream(pkts)
6924 self.pg_enable_capture(self.pg_interfaces)
6926 self.pg1.get_capture(max_sessions)
6928 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6929 src_address=self.pg3.local_ip4n,
6931 template_interval=10)
6932 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6933 src_port=self.ipfix_src_port)
6935 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6936 IPv6(src=src, dst=remote_host_ip6) /
6937 TCP(sport=12345, dport=25))
6938 self.pg0.add_stream(p)
6939 self.pg_enable_capture(self.pg_interfaces)
6941 self.pg1.assert_nothing_captured()
6943 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6944 capture = self.pg3.get_capture(9)
6945 ipfix = IPFIXDecoder()
6946 # first load template
6948 self.assertTrue(p.haslayer(IPFIX))
6949 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6950 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6951 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6952 self.assertEqual(p[UDP].dport, 4739)
6953 self.assertEqual(p[IPFIX].observationDomainID,
6954 self.ipfix_domain_id)
6955 if p.haslayer(Template):
6956 ipfix.add_template(p.getlayer(Template))
6957 # verify events in data set
6959 if p.haslayer(Data):
6960 data = ipfix.decode_data_set(p.getlayer(Set))
6961 self.verify_ipfix_max_sessions(data, max_sessions)
6963 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6964 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
6965 TCP(sport=12345, dport=80))
6966 self.pg0.add_stream(p)
6967 self.pg_enable_capture(self.pg_interfaces)
6969 self.pg1.assert_nothing_captured()
6971 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6972 capture = self.pg3.get_capture(1)
6973 # verify events in data set
6975 self.assertTrue(p.haslayer(IPFIX))
6976 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6977 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6978 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6979 self.assertEqual(p[UDP].dport, 4739)
6980 self.assertEqual(p[IPFIX].observationDomainID,
6981 self.ipfix_domain_id)
6982 if p.haslayer(Data):
6983 data = ipfix.decode_data_set(p.getlayer(Set))
6984 self.verify_ipfix_max_bibs(data, max_bibs)
6986 def test_ipfix_max_frags(self):
6987 """ IPFIX logging maximum fragments pending reassembly exceeded """
6988 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6990 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6991 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6992 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
6993 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6994 src_address=self.pg3.local_ip4n,
6996 template_interval=10)
6997 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6998 src_port=self.ipfix_src_port)
7001 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
7002 self.tcp_port_in, 20, data)
7003 self.pg0.add_stream(pkts[-1])
7004 self.pg_enable_capture(self.pg_interfaces)
7006 self.pg1.assert_nothing_captured()
7008 self.vapi.cli("ipfix flush") # FIXME this should be an API call
7009 capture = self.pg3.get_capture(9)
7010 ipfix = IPFIXDecoder()
7011 # first load template
7013 self.assertTrue(p.haslayer(IPFIX))
7014 self.assertEqual(p[IP].src, self.pg3.local_ip4)
7015 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
7016 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
7017 self.assertEqual(p[UDP].dport, 4739)
7018 self.assertEqual(p[IPFIX].observationDomainID,
7019 self.ipfix_domain_id)
7020 if p.haslayer(Template):
7021 ipfix.add_template(p.getlayer(Template))
7022 # verify events in data set
7024 if p.haslayer(Data):
7025 data = ipfix.decode_data_set(p.getlayer(Set))
7026 self.verify_ipfix_max_fragments_ip6(data, 0,
7027 self.pg0.remote_ip6n)
7029 def test_ipfix_bib_ses(self):
7030 """ IPFIX logging NAT64 BIB/session create and delete events """
7031 self.tcp_port_in = random.randint(1025, 65535)
7032 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
7036 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
7038 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
7039 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
7040 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
7041 src_address=self.pg3.local_ip4n,
7043 template_interval=10)
7044 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
7045 src_port=self.ipfix_src_port)
7048 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
7049 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
7050 TCP(sport=self.tcp_port_in, dport=25))
7051 self.pg0.add_stream(p)
7052 self.pg_enable_capture(self.pg_interfaces)
7054 p = self.pg1.get_capture(1)
7055 self.tcp_port_out = p[0][TCP].sport
7056 self.vapi.cli("ipfix flush") # FIXME this should be an API call
7057 capture = self.pg3.get_capture(10)
7058 ipfix = IPFIXDecoder()
7059 # first load template
7061 self.assertTrue(p.haslayer(IPFIX))
7062 self.assertEqual(p[IP].src, self.pg3.local_ip4)
7063 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
7064 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
7065 self.assertEqual(p[UDP].dport, 4739)
7066 self.assertEqual(p[IPFIX].observationDomainID,
7067 self.ipfix_domain_id)
7068 if p.haslayer(Template):
7069 ipfix.add_template(p.getlayer(Template))
7070 # verify events in data set
7072 if p.haslayer(Data):
7073 data = ipfix.decode_data_set(p.getlayer(Set))
7074 if ord(data[0][230]) == 10:
7075 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
7076 elif ord(data[0][230]) == 6:
7077 self.verify_ipfix_nat64_ses(data,
7079 self.pg0.remote_ip6n,
7080 self.pg1.remote_ip4,
7083 self.logger.error(ppp("Unexpected or invalid packet: ", p))
7086 self.pg_enable_capture(self.pg_interfaces)
7087 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
7090 self.vapi.cli("ipfix flush") # FIXME this should be an API call
7091 capture = self.pg3.get_capture(2)
7092 # verify events in data set
7094 self.assertTrue(p.haslayer(IPFIX))
7095 self.assertEqual(p[IP].src, self.pg3.local_ip4)
7096 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
7097 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
7098 self.assertEqual(p[UDP].dport, 4739)
7099 self.assertEqual(p[IPFIX].observationDomainID,
7100 self.ipfix_domain_id)
7101 if p.haslayer(Data):
7102 data = ipfix.decode_data_set(p.getlayer(Set))
7103 if ord(data[0][230]) == 11:
7104 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
7105 elif ord(data[0][230]) == 7:
7106 self.verify_ipfix_nat64_ses(data,
7108 self.pg0.remote_ip6n,
7109 self.pg1.remote_ip4,
7112 self.logger.error(ppp("Unexpected or invalid packet: ", p))
7114 def nat64_get_ses_num(self):
7116 Return number of active NAT64 sessions.
7118 st = self.vapi.nat64_st_dump()
7121 def clear_nat64(self):
7123 Clear NAT64 configuration.
7125 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
7126 domain_id=self.ipfix_domain_id)
7127 self.ipfix_src_port = 4739
7128 self.ipfix_domain_id = 1
7130 self.vapi.nat_set_timeouts()
7132 interfaces = self.vapi.nat64_interface_dump()
7133 for intf in interfaces:
7134 if intf.is_inside > 1:
7135 self.vapi.nat64_add_del_interface(intf.sw_if_index,
7138 self.vapi.nat64_add_del_interface(intf.sw_if_index,
7142 bib = self.vapi.nat64_bib_dump(255)
7145 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
7153 adresses = self.vapi.nat64_pool_addr_dump()
7154 for addr in adresses:
7155 self.vapi.nat64_add_del_pool_addr_range(addr.address,
7160 prefixes = self.vapi.nat64_prefix_dump()
7161 for prefix in prefixes:
7162 self.vapi.nat64_add_del_prefix(prefix.prefix,
7164 vrf_id=prefix.vrf_id,
7168 super(TestNAT64, self).tearDown()
7169 if not self.vpp_dead:
7170 self.logger.info(self.vapi.cli("show nat64 pool"))
7171 self.logger.info(self.vapi.cli("show nat64 interfaces"))
7172 self.logger.info(self.vapi.cli("show nat64 prefix"))
7173 self.logger.info(self.vapi.cli("show nat64 bib all"))
7174 self.logger.info(self.vapi.cli("show nat64 session table all"))
7175 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
7179 class TestDSlite(MethodHolder):
7180 """ DS-Lite Test Cases """
7183 def setUpClass(cls):
7184 super(TestDSlite, cls).setUpClass()
7187 cls.nat_addr = '10.0.0.3'
7188 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
7190 cls.create_pg_interfaces(range(2))
7192 cls.pg0.config_ip4()
7193 cls.pg0.resolve_arp()
7195 cls.pg1.config_ip6()
7196 cls.pg1.generate_remote_hosts(2)
7197 cls.pg1.configure_ipv6_neighbors()
7200 super(TestDSlite, cls).tearDownClass()
7203 def test_dslite(self):
7204 """ Test DS-Lite """
7205 nat_config = self.vapi.nat_show_config()
7206 self.assertEqual(0, nat_config.dslite_ce)
7208 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
7210 aftr_ip4 = '192.0.0.1'
7211 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
7212 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
7213 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
7214 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
7217 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7218 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
7219 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
7220 UDP(sport=20000, dport=10000))
7221 self.pg1.add_stream(p)
7222 self.pg_enable_capture(self.pg_interfaces)
7224 capture = self.pg0.get_capture(1)
7225 capture = capture[0]
7226 self.assertFalse(capture.haslayer(IPv6))
7227 self.assertEqual(capture[IP].src, self.nat_addr)
7228 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
7229 self.assertNotEqual(capture[UDP].sport, 20000)
7230 self.assertEqual(capture[UDP].dport, 10000)
7231 self.assert_packet_checksums_valid(capture)
7232 out_port = capture[UDP].sport
7234 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7235 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
7236 UDP(sport=10000, dport=out_port))
7237 self.pg0.add_stream(p)
7238 self.pg_enable_capture(self.pg_interfaces)
7240 capture = self.pg1.get_capture(1)
7241 capture = capture[0]
7242 self.assertEqual(capture[IPv6].src, aftr_ip6)
7243 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
7244 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7245 self.assertEqual(capture[IP].dst, '192.168.1.1')
7246 self.assertEqual(capture[UDP].sport, 10000)
7247 self.assertEqual(capture[UDP].dport, 20000)
7248 self.assert_packet_checksums_valid(capture)
7251 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7252 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
7253 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
7254 TCP(sport=20001, dport=10001))
7255 self.pg1.add_stream(p)
7256 self.pg_enable_capture(self.pg_interfaces)
7258 capture = self.pg0.get_capture(1)
7259 capture = capture[0]
7260 self.assertFalse(capture.haslayer(IPv6))
7261 self.assertEqual(capture[IP].src, self.nat_addr)
7262 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
7263 self.assertNotEqual(capture[TCP].sport, 20001)
7264 self.assertEqual(capture[TCP].dport, 10001)
7265 self.assert_packet_checksums_valid(capture)
7266 out_port = capture[TCP].sport
7268 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7269 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
7270 TCP(sport=10001, dport=out_port))
7271 self.pg0.add_stream(p)
7272 self.pg_enable_capture(self.pg_interfaces)
7274 capture = self.pg1.get_capture(1)
7275 capture = capture[0]
7276 self.assertEqual(capture[IPv6].src, aftr_ip6)
7277 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
7278 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7279 self.assertEqual(capture[IP].dst, '192.168.1.1')
7280 self.assertEqual(capture[TCP].sport, 10001)
7281 self.assertEqual(capture[TCP].dport, 20001)
7282 self.assert_packet_checksums_valid(capture)
7285 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7286 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
7287 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
7288 ICMP(id=4000, type='echo-request'))
7289 self.pg1.add_stream(p)
7290 self.pg_enable_capture(self.pg_interfaces)
7292 capture = self.pg0.get_capture(1)
7293 capture = capture[0]
7294 self.assertFalse(capture.haslayer(IPv6))
7295 self.assertEqual(capture[IP].src, self.nat_addr)
7296 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
7297 self.assertNotEqual(capture[ICMP].id, 4000)
7298 self.assert_packet_checksums_valid(capture)
7299 out_id = capture[ICMP].id
7301 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7302 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
7303 ICMP(id=out_id, type='echo-reply'))
7304 self.pg0.add_stream(p)
7305 self.pg_enable_capture(self.pg_interfaces)
7307 capture = self.pg1.get_capture(1)
7308 capture = capture[0]
7309 self.assertEqual(capture[IPv6].src, aftr_ip6)
7310 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
7311 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7312 self.assertEqual(capture[IP].dst, '192.168.1.1')
7313 self.assertEqual(capture[ICMP].id, 4000)
7314 self.assert_packet_checksums_valid(capture)
7316 # ping DS-Lite AFTR tunnel endpoint address
7317 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7318 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
7319 ICMPv6EchoRequest())
7320 self.pg1.add_stream(p)
7321 self.pg_enable_capture(self.pg_interfaces)
7323 capture = self.pg1.get_capture(1)
7324 self.assertEqual(1, len(capture))
7325 capture = capture[0]
7326 self.assertEqual(capture[IPv6].src, aftr_ip6)
7327 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
7328 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
7331 super(TestDSlite, self).tearDown()
7332 if not self.vpp_dead:
7333 self.logger.info(self.vapi.cli("show dslite pool"))
7335 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
7336 self.logger.info(self.vapi.cli("show dslite sessions"))
7339 class TestDSliteCE(MethodHolder):
7340 """ DS-Lite CE Test Cases """
7343 def setUpConstants(cls):
7344 super(TestDSliteCE, cls).setUpConstants()
7345 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
7348 def setUpClass(cls):
7349 super(TestDSliteCE, cls).setUpClass()
7352 cls.create_pg_interfaces(range(2))
7354 cls.pg0.config_ip4()
7355 cls.pg0.resolve_arp()
7357 cls.pg1.config_ip6()
7358 cls.pg1.generate_remote_hosts(1)
7359 cls.pg1.configure_ipv6_neighbors()
7362 super(TestDSliteCE, cls).tearDownClass()
7365 def test_dslite_ce(self):
7366 """ Test DS-Lite CE """
7368 nat_config = self.vapi.nat_show_config()
7369 self.assertEqual(1, nat_config.dslite_ce)
7371 b4_ip4 = '192.0.0.2'
7372 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
7373 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
7374 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
7375 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
7377 aftr_ip4 = '192.0.0.1'
7378 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
7379 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
7380 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
7381 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
7383 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
7384 dst_address_length=128,
7385 next_hop_address=self.pg1.remote_ip6n,
7386 next_hop_sw_if_index=self.pg1.sw_if_index,
7390 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7391 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
7392 UDP(sport=10000, dport=20000))
7393 self.pg0.add_stream(p)
7394 self.pg_enable_capture(self.pg_interfaces)
7396 capture = self.pg1.get_capture(1)
7397 capture = capture[0]
7398 self.assertEqual(capture[IPv6].src, b4_ip6)
7399 self.assertEqual(capture[IPv6].dst, aftr_ip6)
7400 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7401 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
7402 self.assertEqual(capture[UDP].sport, 10000)
7403 self.assertEqual(capture[UDP].dport, 20000)
7404 self.assert_packet_checksums_valid(capture)
7407 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7408 IPv6(dst=b4_ip6, src=aftr_ip6) /
7409 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
7410 UDP(sport=20000, dport=10000))
7411 self.pg1.add_stream(p)
7412 self.pg_enable_capture(self.pg_interfaces)
7414 capture = self.pg0.get_capture(1)
7415 capture = capture[0]
7416 self.assertFalse(capture.haslayer(IPv6))
7417 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
7418 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
7419 self.assertEqual(capture[UDP].sport, 20000)
7420 self.assertEqual(capture[UDP].dport, 10000)
7421 self.assert_packet_checksums_valid(capture)
7423 # ping DS-Lite B4 tunnel endpoint address
7424 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7425 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
7426 ICMPv6EchoRequest())
7427 self.pg1.add_stream(p)
7428 self.pg_enable_capture(self.pg_interfaces)
7430 capture = self.pg1.get_capture(1)
7431 self.assertEqual(1, len(capture))
7432 capture = capture[0]
7433 self.assertEqual(capture[IPv6].src, b4_ip6)
7434 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
7435 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
7438 super(TestDSliteCE, self).tearDown()
7439 if not self.vpp_dead:
7441 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
7443 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
7446 class TestNAT66(MethodHolder):
7447 """ NAT66 Test Cases """
7450 def setUpClass(cls):
7451 super(TestNAT66, cls).setUpClass()
7454 cls.nat_addr = 'fd01:ff::2'
7455 cls.nat_addr_n = socket.inet_pton(socket.AF_INET6, cls.nat_addr)
7457 cls.create_pg_interfaces(range(2))
7458 cls.interfaces = list(cls.pg_interfaces)
7460 for i in cls.interfaces:
7463 i.configure_ipv6_neighbors()
7466 super(TestNAT66, cls).tearDownClass()
7469 def test_static(self):
7470 """ 1:1 NAT66 test """
7471 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
7472 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index, is_inside=0)
7473 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
7478 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7479 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7482 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7483 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7486 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7487 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7488 ICMPv6EchoRequest())
7490 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7491 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7492 GRE() / IP() / TCP())
7494 self.pg0.add_stream(pkts)
7495 self.pg_enable_capture(self.pg_interfaces)
7497 capture = self.pg1.get_capture(len(pkts))
7498 for packet in capture:
7500 self.assertEqual(packet[IPv6].src, self.nat_addr)
7501 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
7502 self.assert_packet_checksums_valid(packet)
7504 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7509 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7510 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7513 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7514 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7517 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7518 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7521 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7522 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7523 GRE() / IP() / TCP())
7525 self.pg1.add_stream(pkts)
7526 self.pg_enable_capture(self.pg_interfaces)
7528 capture = self.pg0.get_capture(len(pkts))
7529 for packet in capture:
7531 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
7532 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
7533 self.assert_packet_checksums_valid(packet)
7535 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7538 sm = self.vapi.nat66_static_mapping_dump()
7539 self.assertEqual(len(sm), 1)
7540 self.assertEqual(sm[0].total_pkts, 8)
7542 def test_check_no_translate(self):
7543 """ NAT66 translate only when egress interface is outside interface """
7544 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
7545 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index)
7546 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
7550 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7551 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7553 self.pg0.add_stream([p])
7554 self.pg_enable_capture(self.pg_interfaces)
7556 capture = self.pg1.get_capture(1)
7559 self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
7560 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
7562 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7565 def clear_nat66(self):
7567 Clear NAT66 configuration.
7569 interfaces = self.vapi.nat66_interface_dump()
7570 for intf in interfaces:
7571 self.vapi.nat66_add_del_interface(intf.sw_if_index,
7575 static_mappings = self.vapi.nat66_static_mapping_dump()
7576 for sm in static_mappings:
7577 self.vapi.nat66_add_del_static_mapping(sm.local_ip_address,
7578 sm.external_ip_address,
7583 super(TestNAT66, self).tearDown()
7584 if not self.vpp_dead:
7585 self.logger.info(self.vapi.cli("show nat66 interfaces"))
7586 self.logger.info(self.vapi.cli("show nat66 static mappings"))
7590 if __name__ == '__main__':
7591 unittest.main(testRunner=VppTestRunner)