7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
12 from scapy.layers.l2 import Ether, ARP, GRE
13 from scapy.data import IP_PROTOS
14 from scapy.packet import bind_layers
16 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
17 from time import sleep
18 from util import ip4_range
21 class MethodHolder(VppTestCase):
22 """ NAT create capture and verify method holder """
26 super(MethodHolder, cls).setUpClass()
29 super(MethodHolder, self).tearDown()
31 def check_ip_checksum(self, pkt):
33 Check IP checksum of the packet
35 :param pkt: Packet to check IP checksum
37 new = pkt.__class__(str(pkt))
39 new = new.__class__(str(new))
40 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
42 def check_tcp_checksum(self, pkt):
44 Check TCP checksum in IP packet
46 :param pkt: Packet to check TCP checksum
48 new = pkt.__class__(str(pkt))
50 new = new.__class__(str(new))
51 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
53 def check_udp_checksum(self, pkt):
55 Check UDP checksum in IP packet
57 :param pkt: Packet to check UDP checksum
59 new = pkt.__class__(str(pkt))
61 new = new.__class__(str(new))
62 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
64 def check_icmp_errror_embedded(self, pkt):
66 Check ICMP error embeded packet checksum
68 :param pkt: Packet to check ICMP error embeded packet checksum
70 if pkt.haslayer(IPerror):
71 new = pkt.__class__(str(pkt))
72 del new['IPerror'].chksum
73 new = new.__class__(str(new))
74 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
76 if pkt.haslayer(TCPerror):
77 new = pkt.__class__(str(pkt))
78 del new['TCPerror'].chksum
79 new = new.__class__(str(new))
80 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
82 if pkt.haslayer(UDPerror):
83 if pkt['UDPerror'].chksum != 0:
84 new = pkt.__class__(str(pkt))
85 del new['UDPerror'].chksum
86 new = new.__class__(str(new))
87 self.assertEqual(new['UDPerror'].chksum,
88 pkt['UDPerror'].chksum)
90 if pkt.haslayer(ICMPerror):
91 del new['ICMPerror'].chksum
92 new = new.__class__(str(new))
93 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
95 def check_icmp_checksum(self, pkt):
97 Check ICMP checksum in IPv4 packet
99 :param pkt: Packet to check ICMP checksum
101 new = pkt.__class__(str(pkt))
102 del new['ICMP'].chksum
103 new = new.__class__(str(new))
104 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
105 if pkt.haslayer(IPerror):
106 self.check_icmp_errror_embedded(pkt)
108 def check_icmpv6_checksum(self, pkt):
110 Check ICMPv6 checksum in IPv4 packet
112 :param pkt: Packet to check ICMPv6 checksum
114 new = pkt.__class__(str(pkt))
115 if pkt.haslayer(ICMPv6DestUnreach):
116 del new['ICMPv6DestUnreach'].cksum
117 new = new.__class__(str(new))
118 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
119 pkt['ICMPv6DestUnreach'].cksum)
120 self.check_icmp_errror_embedded(pkt)
121 if pkt.haslayer(ICMPv6EchoRequest):
122 del new['ICMPv6EchoRequest'].cksum
123 new = new.__class__(str(new))
124 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
125 pkt['ICMPv6EchoRequest'].cksum)
126 if pkt.haslayer(ICMPv6EchoReply):
127 del new['ICMPv6EchoReply'].cksum
128 new = new.__class__(str(new))
129 self.assertEqual(new['ICMPv6EchoReply'].cksum,
130 pkt['ICMPv6EchoReply'].cksum)
132 def create_stream_in(self, in_if, out_if, ttl=64):
134 Create packet stream for inside network
136 :param in_if: Inside interface
137 :param out_if: Outside interface
138 :param ttl: TTL of generated packets
142 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
143 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
144 TCP(sport=self.tcp_port_in, dport=20))
148 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
149 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
150 UDP(sport=self.udp_port_in, dport=20))
154 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
155 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
156 ICMP(id=self.icmp_id_in, type='echo-request'))
161 def compose_ip6(self, ip4, pref, plen):
163 Compose IPv4-embedded IPv6 addresses
165 :param ip4: IPv4 address
166 :param pref: IPv6 prefix
167 :param plen: IPv6 prefix length
168 :returns: IPv4-embedded IPv6 addresses
170 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
171 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
186 pref_n[10] = ip4_n[3]
190 pref_n[10] = ip4_n[2]
191 pref_n[11] = ip4_n[3]
194 pref_n[10] = ip4_n[1]
195 pref_n[11] = ip4_n[2]
196 pref_n[12] = ip4_n[3]
198 pref_n[12] = ip4_n[0]
199 pref_n[13] = ip4_n[1]
200 pref_n[14] = ip4_n[2]
201 pref_n[15] = ip4_n[3]
202 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
204 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
206 Create IPv6 packet stream for inside network
208 :param in_if: Inside interface
209 :param out_if: Outside interface
210 :param ttl: Hop Limit of generated packets
211 :param pref: NAT64 prefix
212 :param plen: NAT64 prefix length
216 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
218 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
221 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
222 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
223 TCP(sport=self.tcp_port_in, dport=20))
227 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
228 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
229 UDP(sport=self.udp_port_in, dport=20))
233 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
234 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
235 ICMPv6EchoRequest(id=self.icmp_id_in))
240 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
242 Create packet stream for outside network
244 :param out_if: Outside interface
245 :param dst_ip: Destination IP address (Default use global NAT address)
246 :param ttl: TTL of generated packets
249 dst_ip = self.nat_addr
252 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
253 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
254 TCP(dport=self.tcp_port_out, sport=20))
258 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
259 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
260 UDP(dport=self.udp_port_out, sport=20))
264 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
265 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
266 ICMP(id=self.icmp_id_out, type='echo-reply'))
271 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
272 packet_num=3, dst_ip=None):
274 Verify captured packets on outside network
276 :param capture: Captured packets
277 :param nat_ip: Translated IP address (Default use global NAT address)
278 :param same_port: Sorce port number is not translated (Default False)
279 :param packet_num: Expected number of packets (Default 3)
280 :param dst_ip: Destination IP address (Default do not verify)
283 nat_ip = self.nat_addr
284 self.assertEqual(packet_num, len(capture))
285 for packet in capture:
287 self.check_ip_checksum(packet)
288 self.assertEqual(packet[IP].src, nat_ip)
289 if dst_ip is not None:
290 self.assertEqual(packet[IP].dst, dst_ip)
291 if packet.haslayer(TCP):
293 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
296 packet[TCP].sport, self.tcp_port_in)
297 self.tcp_port_out = packet[TCP].sport
298 self.check_tcp_checksum(packet)
299 elif packet.haslayer(UDP):
301 self.assertEqual(packet[UDP].sport, self.udp_port_in)
304 packet[UDP].sport, self.udp_port_in)
305 self.udp_port_out = packet[UDP].sport
308 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
310 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
311 self.icmp_id_out = packet[ICMP].id
312 self.check_icmp_checksum(packet)
314 self.logger.error(ppp("Unexpected or invalid packet "
315 "(outside network):", packet))
318 def verify_capture_in(self, capture, in_if, packet_num=3):
320 Verify captured packets on inside network
322 :param capture: Captured packets
323 :param in_if: Inside interface
324 :param packet_num: Expected number of packets (Default 3)
326 self.assertEqual(packet_num, len(capture))
327 for packet in capture:
329 self.check_ip_checksum(packet)
330 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
331 if packet.haslayer(TCP):
332 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
333 self.check_tcp_checksum(packet)
334 elif packet.haslayer(UDP):
335 self.assertEqual(packet[UDP].dport, self.udp_port_in)
337 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
338 self.check_icmp_checksum(packet)
340 self.logger.error(ppp("Unexpected or invalid packet "
341 "(inside network):", packet))
344 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
346 Verify captured IPv6 packets on inside network
348 :param capture: Captured packets
349 :param src_ip: Source IP
350 :param dst_ip: Destination IP address
351 :param packet_num: Expected number of packets (Default 3)
353 self.assertEqual(packet_num, len(capture))
354 for packet in capture:
356 self.assertEqual(packet[IPv6].src, src_ip)
357 self.assertEqual(packet[IPv6].dst, dst_ip)
358 if packet.haslayer(TCP):
359 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
360 self.check_tcp_checksum(packet)
361 elif packet.haslayer(UDP):
362 self.assertEqual(packet[UDP].dport, self.udp_port_in)
363 self.check_udp_checksum(packet)
365 self.assertEqual(packet[ICMPv6EchoReply].id,
367 self.check_icmpv6_checksum(packet)
369 self.logger.error(ppp("Unexpected or invalid packet "
370 "(inside network):", packet))
373 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
375 Verify captured packet that don't have to be translated
377 :param capture: Captured packets
378 :param ingress_if: Ingress interface
379 :param egress_if: Egress interface
381 for packet in capture:
383 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
384 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
385 if packet.haslayer(TCP):
386 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
387 elif packet.haslayer(UDP):
388 self.assertEqual(packet[UDP].sport, self.udp_port_in)
390 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
392 self.logger.error(ppp("Unexpected or invalid packet "
393 "(inside network):", packet))
396 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
397 packet_num=3, icmp_type=11):
399 Verify captured packets with ICMP errors on outside network
401 :param capture: Captured packets
402 :param src_ip: Translated IP address or IP address of VPP
403 (Default use global NAT address)
404 :param packet_num: Expected number of packets (Default 3)
405 :param icmp_type: Type of error ICMP packet
406 we are expecting (Default 11)
409 src_ip = self.nat_addr
410 self.assertEqual(packet_num, len(capture))
411 for packet in capture:
413 self.assertEqual(packet[IP].src, src_ip)
414 self.assertTrue(packet.haslayer(ICMP))
416 self.assertEqual(icmp.type, icmp_type)
417 self.assertTrue(icmp.haslayer(IPerror))
418 inner_ip = icmp[IPerror]
419 if inner_ip.haslayer(TCPerror):
420 self.assertEqual(inner_ip[TCPerror].dport,
422 elif inner_ip.haslayer(UDPerror):
423 self.assertEqual(inner_ip[UDPerror].dport,
426 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
428 self.logger.error(ppp("Unexpected or invalid packet "
429 "(outside network):", packet))
432 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
435 Verify captured packets with ICMP errors on inside network
437 :param capture: Captured packets
438 :param in_if: Inside interface
439 :param packet_num: Expected number of packets (Default 3)
440 :param icmp_type: Type of error ICMP packet
441 we are expecting (Default 11)
443 self.assertEqual(packet_num, len(capture))
444 for packet in capture:
446 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447 self.assertTrue(packet.haslayer(ICMP))
449 self.assertEqual(icmp.type, icmp_type)
450 self.assertTrue(icmp.haslayer(IPerror))
451 inner_ip = icmp[IPerror]
452 if inner_ip.haslayer(TCPerror):
453 self.assertEqual(inner_ip[TCPerror].sport,
455 elif inner_ip.haslayer(UDPerror):
456 self.assertEqual(inner_ip[UDPerror].sport,
459 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
461 self.logger.error(ppp("Unexpected or invalid packet "
462 "(inside network):", packet))
465 def verify_ipfix_nat44_ses(self, data):
467 Verify IPFIX NAT44 session create/delete event
469 :param data: Decoded IPFIX data records
471 nat44_ses_create_num = 0
472 nat44_ses_delete_num = 0
473 self.assertEqual(6, len(data))
476 self.assertIn(ord(record[230]), [4, 5])
477 if ord(record[230]) == 4:
478 nat44_ses_create_num += 1
480 nat44_ses_delete_num += 1
482 self.assertEqual(self.pg0.remote_ip4n, record[8])
483 # postNATSourceIPv4Address
484 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
487 self.assertEqual(struct.pack("!I", 0), record[234])
488 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
489 if IP_PROTOS.icmp == ord(record[4]):
490 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
491 self.assertEqual(struct.pack("!H", self.icmp_id_out),
493 elif IP_PROTOS.tcp == ord(record[4]):
494 self.assertEqual(struct.pack("!H", self.tcp_port_in),
496 self.assertEqual(struct.pack("!H", self.tcp_port_out),
498 elif IP_PROTOS.udp == ord(record[4]):
499 self.assertEqual(struct.pack("!H", self.udp_port_in),
501 self.assertEqual(struct.pack("!H", self.udp_port_out),
504 self.fail("Invalid protocol")
505 self.assertEqual(3, nat44_ses_create_num)
506 self.assertEqual(3, nat44_ses_delete_num)
508 def verify_ipfix_addr_exhausted(self, data):
510 Verify IPFIX NAT addresses event
512 :param data: Decoded IPFIX data records
514 self.assertEqual(1, len(data))
517 self.assertEqual(ord(record[230]), 3)
519 self.assertEqual(struct.pack("!I", 0), record[283])
522 class TestNAT44(MethodHolder):
523 """ NAT44 Test Cases """
527 super(TestNAT44, cls).setUpClass()
530 cls.tcp_port_in = 6303
531 cls.tcp_port_out = 6303
532 cls.udp_port_in = 6304
533 cls.udp_port_out = 6304
534 cls.icmp_id_in = 6305
535 cls.icmp_id_out = 6305
536 cls.nat_addr = '10.0.0.3'
537 cls.ipfix_src_port = 4739
538 cls.ipfix_domain_id = 1
540 cls.create_pg_interfaces(range(9))
541 cls.interfaces = list(cls.pg_interfaces[0:4])
543 for i in cls.interfaces:
548 cls.pg0.generate_remote_hosts(3)
549 cls.pg0.configure_ipv4_neighbors()
551 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
553 cls.pg4._local_ip4 = "172.16.255.1"
554 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
555 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
556 cls.pg4.set_table_ip4(10)
557 cls.pg5._local_ip4 = "172.17.255.3"
558 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
559 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
560 cls.pg5.set_table_ip4(10)
561 cls.pg6._local_ip4 = "172.16.255.1"
562 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
563 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
564 cls.pg6.set_table_ip4(20)
565 for i in cls.overlapping_interfaces:
574 super(TestNAT44, cls).tearDownClass()
577 def clear_nat44(self):
579 Clear NAT44 configuration.
581 # I found no elegant way to do this
582 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
583 dst_address_length=32,
584 next_hop_address=self.pg7.remote_ip4n,
585 next_hop_sw_if_index=self.pg7.sw_if_index,
587 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
588 dst_address_length=32,
589 next_hop_address=self.pg8.remote_ip4n,
590 next_hop_sw_if_index=self.pg8.sw_if_index,
593 for intf in [self.pg7, self.pg8]:
594 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
596 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
601 if self.pg7.has_ip4_config:
602 self.pg7.unconfig_ip4()
604 interfaces = self.vapi.nat44_interface_addr_dump()
605 for intf in interfaces:
606 self.vapi.nat44_add_interface_addr(intf.sw_if_index, is_add=0)
608 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
609 domain_id=self.ipfix_domain_id)
610 self.ipfix_src_port = 4739
611 self.ipfix_domain_id = 1
613 interfaces = self.vapi.nat44_interface_dump()
614 for intf in interfaces:
615 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
619 interfaces = self.vapi.nat44_interface_output_feature_dump()
620 for intf in interfaces:
621 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
625 static_mappings = self.vapi.nat44_static_mapping_dump()
626 for sm in static_mappings:
627 self.vapi.nat44_add_del_static_mapping(
629 sm.external_ip_address,
630 local_port=sm.local_port,
631 external_port=sm.external_port,
632 addr_only=sm.addr_only,
634 protocol=sm.protocol,
637 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
638 for lb_sm in lb_static_mappings:
639 self.vapi.nat44_add_del_lb_static_mapping(
646 adresses = self.vapi.nat44_address_dump()
647 for addr in adresses:
648 self.vapi.nat44_add_del_address_range(addr.ip_address,
652 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
653 local_port=0, external_port=0, vrf_id=0,
654 is_add=1, external_sw_if_index=0xFFFFFFFF,
657 Add/delete NAT44 static mapping
659 :param local_ip: Local IP address
660 :param external_ip: External IP address
661 :param local_port: Local port number (Optional)
662 :param external_port: External port number (Optional)
663 :param vrf_id: VRF ID (Default 0)
664 :param is_add: 1 if add, 0 if delete (Default add)
665 :param external_sw_if_index: External interface instead of IP address
666 :param proto: IP protocol (Mandatory if port specified)
669 if local_port and external_port:
671 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
672 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
673 self.vapi.nat44_add_del_static_mapping(
676 external_sw_if_index,
684 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
686 Add/delete NAT44 address
688 :param ip: IP address
689 :param is_add: 1 if add, 0 if delete (Default add)
691 nat_addr = socket.inet_pton(socket.AF_INET, ip)
692 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
695 def test_dynamic(self):
696 """ NAT44 dynamic translation test """
698 self.nat44_add_address(self.nat_addr)
699 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
700 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
704 pkts = self.create_stream_in(self.pg0, self.pg1)
705 self.pg0.add_stream(pkts)
706 self.pg_enable_capture(self.pg_interfaces)
708 capture = self.pg1.get_capture(len(pkts))
709 self.verify_capture_out(capture)
712 pkts = self.create_stream_out(self.pg1)
713 self.pg1.add_stream(pkts)
714 self.pg_enable_capture(self.pg_interfaces)
716 capture = self.pg0.get_capture(len(pkts))
717 self.verify_capture_in(capture, self.pg0)
719 def test_dynamic_icmp_errors_in2out_ttl_1(self):
720 """ NAT44 handling of client packets with TTL=1 """
722 self.nat44_add_address(self.nat_addr)
723 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
724 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
727 # Client side - generate traffic
728 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
729 self.pg0.add_stream(pkts)
730 self.pg_enable_capture(self.pg_interfaces)
733 # Client side - verify ICMP type 11 packets
734 capture = self.pg0.get_capture(len(pkts))
735 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
737 def test_dynamic_icmp_errors_out2in_ttl_1(self):
738 """ NAT44 handling of server packets with TTL=1 """
740 self.nat44_add_address(self.nat_addr)
741 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
742 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
745 # Client side - create sessions
746 pkts = self.create_stream_in(self.pg0, self.pg1)
747 self.pg0.add_stream(pkts)
748 self.pg_enable_capture(self.pg_interfaces)
751 # Server side - generate traffic
752 capture = self.pg1.get_capture(len(pkts))
753 self.verify_capture_out(capture)
754 pkts = self.create_stream_out(self.pg1, ttl=1)
755 self.pg1.add_stream(pkts)
756 self.pg_enable_capture(self.pg_interfaces)
759 # Server side - verify ICMP type 11 packets
760 capture = self.pg1.get_capture(len(pkts))
761 self.verify_capture_out_with_icmp_errors(capture,
762 src_ip=self.pg1.local_ip4)
764 def test_dynamic_icmp_errors_in2out_ttl_2(self):
765 """ NAT44 handling of error responses to client packets with TTL=2 """
767 self.nat44_add_address(self.nat_addr)
768 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
769 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
772 # Client side - generate traffic
773 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
774 self.pg0.add_stream(pkts)
775 self.pg_enable_capture(self.pg_interfaces)
778 # Server side - simulate ICMP type 11 response
779 capture = self.pg1.get_capture(len(pkts))
780 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
781 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
782 ICMP(type=11) / packet[IP] for packet in capture]
783 self.pg1.add_stream(pkts)
784 self.pg_enable_capture(self.pg_interfaces)
787 # Client side - verify ICMP type 11 packets
788 capture = self.pg0.get_capture(len(pkts))
789 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
791 def test_dynamic_icmp_errors_out2in_ttl_2(self):
792 """ NAT44 handling of error responses to server packets with TTL=2 """
794 self.nat44_add_address(self.nat_addr)
795 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
796 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
799 # Client side - create sessions
800 pkts = self.create_stream_in(self.pg0, self.pg1)
801 self.pg0.add_stream(pkts)
802 self.pg_enable_capture(self.pg_interfaces)
805 # Server side - generate traffic
806 capture = self.pg1.get_capture(len(pkts))
807 self.verify_capture_out(capture)
808 pkts = self.create_stream_out(self.pg1, ttl=2)
809 self.pg1.add_stream(pkts)
810 self.pg_enable_capture(self.pg_interfaces)
813 # Client side - simulate ICMP type 11 response
814 capture = self.pg0.get_capture(len(pkts))
815 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
816 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
817 ICMP(type=11) / packet[IP] for packet in capture]
818 self.pg0.add_stream(pkts)
819 self.pg_enable_capture(self.pg_interfaces)
822 # Server side - verify ICMP type 11 packets
823 capture = self.pg1.get_capture(len(pkts))
824 self.verify_capture_out_with_icmp_errors(capture)
826 def test_ping_out_interface_from_outside(self):
827 """ Ping NAT44 out interface from outside network """
829 self.nat44_add_address(self.nat_addr)
830 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
831 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
834 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
835 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
836 ICMP(id=self.icmp_id_out, type='echo-request'))
838 self.pg1.add_stream(pkts)
839 self.pg_enable_capture(self.pg_interfaces)
841 capture = self.pg1.get_capture(len(pkts))
842 self.assertEqual(1, len(capture))
845 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
846 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
847 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
848 self.assertEqual(packet[ICMP].type, 0) # echo reply
850 self.logger.error(ppp("Unexpected or invalid packet "
851 "(outside network):", packet))
854 def test_ping_internal_host_from_outside(self):
855 """ Ping internal host from outside network """
857 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
858 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
859 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
863 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
864 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
865 ICMP(id=self.icmp_id_out, type='echo-request'))
866 self.pg1.add_stream(pkt)
867 self.pg_enable_capture(self.pg_interfaces)
869 capture = self.pg0.get_capture(1)
870 self.verify_capture_in(capture, self.pg0, packet_num=1)
871 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
874 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
875 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
876 ICMP(id=self.icmp_id_in, type='echo-reply'))
877 self.pg0.add_stream(pkt)
878 self.pg_enable_capture(self.pg_interfaces)
880 capture = self.pg1.get_capture(1)
881 self.verify_capture_out(capture, same_port=True, packet_num=1)
882 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
884 def test_static_in(self):
885 """ 1:1 NAT initialized from inside network """
888 self.tcp_port_out = 6303
889 self.udp_port_out = 6304
890 self.icmp_id_out = 6305
892 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
893 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
894 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
898 pkts = self.create_stream_in(self.pg0, self.pg1)
899 self.pg0.add_stream(pkts)
900 self.pg_enable_capture(self.pg_interfaces)
902 capture = self.pg1.get_capture(len(pkts))
903 self.verify_capture_out(capture, nat_ip, True)
906 pkts = self.create_stream_out(self.pg1, nat_ip)
907 self.pg1.add_stream(pkts)
908 self.pg_enable_capture(self.pg_interfaces)
910 capture = self.pg0.get_capture(len(pkts))
911 self.verify_capture_in(capture, self.pg0)
913 def test_static_out(self):
914 """ 1:1 NAT initialized from outside network """
917 self.tcp_port_out = 6303
918 self.udp_port_out = 6304
919 self.icmp_id_out = 6305
921 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
922 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
923 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
927 pkts = self.create_stream_out(self.pg1, nat_ip)
928 self.pg1.add_stream(pkts)
929 self.pg_enable_capture(self.pg_interfaces)
931 capture = self.pg0.get_capture(len(pkts))
932 self.verify_capture_in(capture, self.pg0)
935 pkts = self.create_stream_in(self.pg0, self.pg1)
936 self.pg0.add_stream(pkts)
937 self.pg_enable_capture(self.pg_interfaces)
939 capture = self.pg1.get_capture(len(pkts))
940 self.verify_capture_out(capture, nat_ip, True)
942 def test_static_with_port_in(self):
943 """ 1:1 NAPT initialized from inside network """
945 self.tcp_port_out = 3606
946 self.udp_port_out = 3607
947 self.icmp_id_out = 3608
949 self.nat44_add_address(self.nat_addr)
950 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
951 self.tcp_port_in, self.tcp_port_out,
953 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
954 self.udp_port_in, self.udp_port_out,
956 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
957 self.icmp_id_in, self.icmp_id_out,
958 proto=IP_PROTOS.icmp)
959 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
960 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
964 pkts = self.create_stream_in(self.pg0, self.pg1)
965 self.pg0.add_stream(pkts)
966 self.pg_enable_capture(self.pg_interfaces)
968 capture = self.pg1.get_capture(len(pkts))
969 self.verify_capture_out(capture)
972 pkts = self.create_stream_out(self.pg1)
973 self.pg1.add_stream(pkts)
974 self.pg_enable_capture(self.pg_interfaces)
976 capture = self.pg0.get_capture(len(pkts))
977 self.verify_capture_in(capture, self.pg0)
979 def test_static_with_port_out(self):
980 """ 1:1 NAPT initialized from outside network """
982 self.tcp_port_out = 30606
983 self.udp_port_out = 30607
984 self.icmp_id_out = 30608
986 self.nat44_add_address(self.nat_addr)
987 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
988 self.tcp_port_in, self.tcp_port_out,
990 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
991 self.udp_port_in, self.udp_port_out,
993 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
994 self.icmp_id_in, self.icmp_id_out,
995 proto=IP_PROTOS.icmp)
996 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
997 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1001 pkts = self.create_stream_out(self.pg1)
1002 self.pg1.add_stream(pkts)
1003 self.pg_enable_capture(self.pg_interfaces)
1005 capture = self.pg0.get_capture(len(pkts))
1006 self.verify_capture_in(capture, self.pg0)
1009 pkts = self.create_stream_in(self.pg0, self.pg1)
1010 self.pg0.add_stream(pkts)
1011 self.pg_enable_capture(self.pg_interfaces)
1013 capture = self.pg1.get_capture(len(pkts))
1014 self.verify_capture_out(capture)
1016 def test_static_vrf_aware(self):
1017 """ 1:1 NAT VRF awareness """
1019 nat_ip1 = "10.0.0.30"
1020 nat_ip2 = "10.0.0.40"
1021 self.tcp_port_out = 6303
1022 self.udp_port_out = 6304
1023 self.icmp_id_out = 6305
1025 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1027 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1029 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1031 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1032 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1034 # inside interface VRF match NAT44 static mapping VRF
1035 pkts = self.create_stream_in(self.pg4, self.pg3)
1036 self.pg4.add_stream(pkts)
1037 self.pg_enable_capture(self.pg_interfaces)
1039 capture = self.pg3.get_capture(len(pkts))
1040 self.verify_capture_out(capture, nat_ip1, True)
1042 # inside interface VRF don't match NAT44 static mapping VRF (packets
1044 pkts = self.create_stream_in(self.pg0, self.pg3)
1045 self.pg0.add_stream(pkts)
1046 self.pg_enable_capture(self.pg_interfaces)
1048 self.pg3.assert_nothing_captured()
1050 def test_static_lb(self):
1051 """ NAT44 local service load balancing """
1052 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1055 server1 = self.pg0.remote_hosts[0]
1056 server2 = self.pg0.remote_hosts[1]
1058 locals = [{'addr': server1.ip4n,
1061 {'addr': server2.ip4n,
1065 self.nat44_add_address(self.nat_addr)
1066 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1069 local_num=len(locals),
1071 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1072 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1075 # from client to service
1076 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1077 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1078 TCP(sport=12345, dport=external_port))
1079 self.pg1.add_stream(p)
1080 self.pg_enable_capture(self.pg_interfaces)
1082 capture = self.pg0.get_capture(1)
1088 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1089 if ip.dst == server1.ip4:
1093 self.assertEqual(tcp.dport, local_port)
1094 self.check_tcp_checksum(p)
1095 self.check_ip_checksum(p)
1097 self.logger.error(ppp("Unexpected or invalid packet:", p))
1100 # from service back to client
1101 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1102 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1103 TCP(sport=local_port, dport=12345))
1104 self.pg0.add_stream(p)
1105 self.pg_enable_capture(self.pg_interfaces)
1107 capture = self.pg1.get_capture(1)
1112 self.assertEqual(ip.src, self.nat_addr)
1113 self.assertEqual(tcp.sport, external_port)
1114 self.check_tcp_checksum(p)
1115 self.check_ip_checksum(p)
1117 self.logger.error(ppp("Unexpected or invalid packet:", p))
1123 clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1125 for client in clients:
1126 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1127 IP(src=client, dst=self.nat_addr) /
1128 TCP(sport=12345, dport=external_port))
1130 self.pg1.add_stream(pkts)
1131 self.pg_enable_capture(self.pg_interfaces)
1133 capture = self.pg0.get_capture(len(pkts))
1135 if p[IP].dst == server1.ip4:
1139 self.assertTrue(server1_n > server2_n)
1141 def test_multiple_inside_interfaces(self):
1142 """ NAT44 multiple non-overlapping address space inside interfaces """
1144 self.nat44_add_address(self.nat_addr)
1145 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1146 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1147 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1150 # between two NAT44 inside interfaces (no translation)
1151 pkts = self.create_stream_in(self.pg0, self.pg1)
1152 self.pg0.add_stream(pkts)
1153 self.pg_enable_capture(self.pg_interfaces)
1155 capture = self.pg1.get_capture(len(pkts))
1156 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1158 # from NAT44 inside to interface without NAT44 feature (no translation)
1159 pkts = self.create_stream_in(self.pg0, self.pg2)
1160 self.pg0.add_stream(pkts)
1161 self.pg_enable_capture(self.pg_interfaces)
1163 capture = self.pg2.get_capture(len(pkts))
1164 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1166 # in2out 1st interface
1167 pkts = self.create_stream_in(self.pg0, self.pg3)
1168 self.pg0.add_stream(pkts)
1169 self.pg_enable_capture(self.pg_interfaces)
1171 capture = self.pg3.get_capture(len(pkts))
1172 self.verify_capture_out(capture)
1174 # out2in 1st interface
1175 pkts = self.create_stream_out(self.pg3)
1176 self.pg3.add_stream(pkts)
1177 self.pg_enable_capture(self.pg_interfaces)
1179 capture = self.pg0.get_capture(len(pkts))
1180 self.verify_capture_in(capture, self.pg0)
1182 # in2out 2nd interface
1183 pkts = self.create_stream_in(self.pg1, self.pg3)
1184 self.pg1.add_stream(pkts)
1185 self.pg_enable_capture(self.pg_interfaces)
1187 capture = self.pg3.get_capture(len(pkts))
1188 self.verify_capture_out(capture)
1190 # out2in 2nd interface
1191 pkts = self.create_stream_out(self.pg3)
1192 self.pg3.add_stream(pkts)
1193 self.pg_enable_capture(self.pg_interfaces)
1195 capture = self.pg1.get_capture(len(pkts))
1196 self.verify_capture_in(capture, self.pg1)
1198 def test_inside_overlapping_interfaces(self):
1199 """ NAT44 multiple inside interfaces with overlapping address space """
1201 static_nat_ip = "10.0.0.10"
1202 self.nat44_add_address(self.nat_addr)
1203 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1205 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1206 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1207 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1208 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1211 # between NAT44 inside interfaces with same VRF (no translation)
1212 pkts = self.create_stream_in(self.pg4, self.pg5)
1213 self.pg4.add_stream(pkts)
1214 self.pg_enable_capture(self.pg_interfaces)
1216 capture = self.pg5.get_capture(len(pkts))
1217 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1219 # between NAT44 inside interfaces with different VRF (hairpinning)
1220 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1221 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1222 TCP(sport=1234, dport=5678))
1223 self.pg4.add_stream(p)
1224 self.pg_enable_capture(self.pg_interfaces)
1226 capture = self.pg6.get_capture(1)
1231 self.assertEqual(ip.src, self.nat_addr)
1232 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1233 self.assertNotEqual(tcp.sport, 1234)
1234 self.assertEqual(tcp.dport, 5678)
1236 self.logger.error(ppp("Unexpected or invalid packet:", p))
1239 # in2out 1st interface
1240 pkts = self.create_stream_in(self.pg4, self.pg3)
1241 self.pg4.add_stream(pkts)
1242 self.pg_enable_capture(self.pg_interfaces)
1244 capture = self.pg3.get_capture(len(pkts))
1245 self.verify_capture_out(capture)
1247 # out2in 1st interface
1248 pkts = self.create_stream_out(self.pg3)
1249 self.pg3.add_stream(pkts)
1250 self.pg_enable_capture(self.pg_interfaces)
1252 capture = self.pg4.get_capture(len(pkts))
1253 self.verify_capture_in(capture, self.pg4)
1255 # in2out 2nd interface
1256 pkts = self.create_stream_in(self.pg5, self.pg3)
1257 self.pg5.add_stream(pkts)
1258 self.pg_enable_capture(self.pg_interfaces)
1260 capture = self.pg3.get_capture(len(pkts))
1261 self.verify_capture_out(capture)
1263 # out2in 2nd interface
1264 pkts = self.create_stream_out(self.pg3)
1265 self.pg3.add_stream(pkts)
1266 self.pg_enable_capture(self.pg_interfaces)
1268 capture = self.pg5.get_capture(len(pkts))
1269 self.verify_capture_in(capture, self.pg5)
1272 addresses = self.vapi.nat44_address_dump()
1273 self.assertEqual(len(addresses), 1)
1274 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1275 self.assertEqual(len(sessions), 3)
1276 for session in sessions:
1277 self.assertFalse(session.is_static)
1278 self.assertEqual(session.inside_ip_address[0:4],
1279 self.pg5.remote_ip4n)
1280 self.assertEqual(session.outside_ip_address,
1281 addresses[0].ip_address)
1282 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1283 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1284 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1285 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1286 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1287 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1288 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1289 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1290 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1292 # in2out 3rd interface
1293 pkts = self.create_stream_in(self.pg6, self.pg3)
1294 self.pg6.add_stream(pkts)
1295 self.pg_enable_capture(self.pg_interfaces)
1297 capture = self.pg3.get_capture(len(pkts))
1298 self.verify_capture_out(capture, static_nat_ip, True)
1300 # out2in 3rd interface
1301 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1302 self.pg3.add_stream(pkts)
1303 self.pg_enable_capture(self.pg_interfaces)
1305 capture = self.pg6.get_capture(len(pkts))
1306 self.verify_capture_in(capture, self.pg6)
1308 # general user and session dump verifications
1309 users = self.vapi.nat44_user_dump()
1310 self.assertTrue(len(users) >= 3)
1311 addresses = self.vapi.nat44_address_dump()
1312 self.assertEqual(len(addresses), 1)
1314 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1316 for session in sessions:
1317 self.assertEqual(user.ip_address, session.inside_ip_address)
1318 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1319 self.assertTrue(session.protocol in
1320 [IP_PROTOS.tcp, IP_PROTOS.udp,
1324 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1325 self.assertTrue(len(sessions) >= 4)
1326 for session in sessions:
1327 self.assertFalse(session.is_static)
1328 self.assertEqual(session.inside_ip_address[0:4],
1329 self.pg4.remote_ip4n)
1330 self.assertEqual(session.outside_ip_address,
1331 addresses[0].ip_address)
1334 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1335 self.assertTrue(len(sessions) >= 3)
1336 for session in sessions:
1337 self.assertTrue(session.is_static)
1338 self.assertEqual(session.inside_ip_address[0:4],
1339 self.pg6.remote_ip4n)
1340 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1341 map(int, static_nat_ip.split('.')))
1342 self.assertTrue(session.inside_port in
1343 [self.tcp_port_in, self.udp_port_in,
1346 def test_hairpinning(self):
1347 """ NAT44 hairpinning - 1:1 NAPT """
1349 host = self.pg0.remote_hosts[0]
1350 server = self.pg0.remote_hosts[1]
1353 server_in_port = 5678
1354 server_out_port = 8765
1356 self.nat44_add_address(self.nat_addr)
1357 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1358 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1360 # add static mapping for server
1361 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1362 server_in_port, server_out_port,
1363 proto=IP_PROTOS.tcp)
1365 # send packet from host to server
1366 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1367 IP(src=host.ip4, dst=self.nat_addr) /
1368 TCP(sport=host_in_port, dport=server_out_port))
1369 self.pg0.add_stream(p)
1370 self.pg_enable_capture(self.pg_interfaces)
1372 capture = self.pg0.get_capture(1)
1377 self.assertEqual(ip.src, self.nat_addr)
1378 self.assertEqual(ip.dst, server.ip4)
1379 self.assertNotEqual(tcp.sport, host_in_port)
1380 self.assertEqual(tcp.dport, server_in_port)
1381 self.check_tcp_checksum(p)
1382 host_out_port = tcp.sport
1384 self.logger.error(ppp("Unexpected or invalid packet:", p))
1387 # send reply from server to host
1388 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1389 IP(src=server.ip4, dst=self.nat_addr) /
1390 TCP(sport=server_in_port, dport=host_out_port))
1391 self.pg0.add_stream(p)
1392 self.pg_enable_capture(self.pg_interfaces)
1394 capture = self.pg0.get_capture(1)
1399 self.assertEqual(ip.src, self.nat_addr)
1400 self.assertEqual(ip.dst, host.ip4)
1401 self.assertEqual(tcp.sport, server_out_port)
1402 self.assertEqual(tcp.dport, host_in_port)
1403 self.check_tcp_checksum(p)
1405 self.logger.error(ppp("Unexpected or invalid packet:"), p)
1408 def test_hairpinning2(self):
1409 """ NAT44 hairpinning - 1:1 NAT"""
1411 server1_nat_ip = "10.0.0.10"
1412 server2_nat_ip = "10.0.0.11"
1413 host = self.pg0.remote_hosts[0]
1414 server1 = self.pg0.remote_hosts[1]
1415 server2 = self.pg0.remote_hosts[2]
1416 server_tcp_port = 22
1417 server_udp_port = 20
1419 self.nat44_add_address(self.nat_addr)
1420 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1421 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1424 # add static mapping for servers
1425 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1426 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1430 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1431 IP(src=host.ip4, dst=server1_nat_ip) /
1432 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1434 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1435 IP(src=host.ip4, dst=server1_nat_ip) /
1436 UDP(sport=self.udp_port_in, dport=server_udp_port))
1438 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1439 IP(src=host.ip4, dst=server1_nat_ip) /
1440 ICMP(id=self.icmp_id_in, type='echo-request'))
1442 self.pg0.add_stream(pkts)
1443 self.pg_enable_capture(self.pg_interfaces)
1445 capture = self.pg0.get_capture(len(pkts))
1446 for packet in capture:
1448 self.assertEqual(packet[IP].src, self.nat_addr)
1449 self.assertEqual(packet[IP].dst, server1.ip4)
1450 if packet.haslayer(TCP):
1451 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1452 self.assertEqual(packet[TCP].dport, server_tcp_port)
1453 self.tcp_port_out = packet[TCP].sport
1454 self.check_tcp_checksum(packet)
1455 elif packet.haslayer(UDP):
1456 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1457 self.assertEqual(packet[UDP].dport, server_udp_port)
1458 self.udp_port_out = packet[UDP].sport
1460 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1461 self.icmp_id_out = packet[ICMP].id
1463 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1468 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1469 IP(src=server1.ip4, dst=self.nat_addr) /
1470 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1472 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1473 IP(src=server1.ip4, dst=self.nat_addr) /
1474 UDP(sport=server_udp_port, dport=self.udp_port_out))
1476 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1477 IP(src=server1.ip4, dst=self.nat_addr) /
1478 ICMP(id=self.icmp_id_out, type='echo-reply'))
1480 self.pg0.add_stream(pkts)
1481 self.pg_enable_capture(self.pg_interfaces)
1483 capture = self.pg0.get_capture(len(pkts))
1484 for packet in capture:
1486 self.assertEqual(packet[IP].src, server1_nat_ip)
1487 self.assertEqual(packet[IP].dst, host.ip4)
1488 if packet.haslayer(TCP):
1489 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1490 self.assertEqual(packet[TCP].sport, server_tcp_port)
1491 self.check_tcp_checksum(packet)
1492 elif packet.haslayer(UDP):
1493 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1494 self.assertEqual(packet[UDP].sport, server_udp_port)
1496 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1498 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1501 # server2 to server1
1503 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1504 IP(src=server2.ip4, dst=server1_nat_ip) /
1505 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1507 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1508 IP(src=server2.ip4, dst=server1_nat_ip) /
1509 UDP(sport=self.udp_port_in, dport=server_udp_port))
1511 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1512 IP(src=server2.ip4, dst=server1_nat_ip) /
1513 ICMP(id=self.icmp_id_in, type='echo-request'))
1515 self.pg0.add_stream(pkts)
1516 self.pg_enable_capture(self.pg_interfaces)
1518 capture = self.pg0.get_capture(len(pkts))
1519 for packet in capture:
1521 self.assertEqual(packet[IP].src, server2_nat_ip)
1522 self.assertEqual(packet[IP].dst, server1.ip4)
1523 if packet.haslayer(TCP):
1524 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1525 self.assertEqual(packet[TCP].dport, server_tcp_port)
1526 self.tcp_port_out = packet[TCP].sport
1527 self.check_tcp_checksum(packet)
1528 elif packet.haslayer(UDP):
1529 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1530 self.assertEqual(packet[UDP].dport, server_udp_port)
1531 self.udp_port_out = packet[UDP].sport
1533 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1534 self.icmp_id_out = packet[ICMP].id
1536 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1539 # server1 to server2
1541 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1542 IP(src=server1.ip4, dst=server2_nat_ip) /
1543 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1545 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1546 IP(src=server1.ip4, dst=server2_nat_ip) /
1547 UDP(sport=server_udp_port, dport=self.udp_port_out))
1549 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1550 IP(src=server1.ip4, dst=server2_nat_ip) /
1551 ICMP(id=self.icmp_id_out, type='echo-reply'))
1553 self.pg0.add_stream(pkts)
1554 self.pg_enable_capture(self.pg_interfaces)
1556 capture = self.pg0.get_capture(len(pkts))
1557 for packet in capture:
1559 self.assertEqual(packet[IP].src, server1_nat_ip)
1560 self.assertEqual(packet[IP].dst, server2.ip4)
1561 if packet.haslayer(TCP):
1562 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1563 self.assertEqual(packet[TCP].sport, server_tcp_port)
1564 self.check_tcp_checksum(packet)
1565 elif packet.haslayer(UDP):
1566 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1567 self.assertEqual(packet[UDP].sport, server_udp_port)
1569 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1571 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1574 def test_max_translations_per_user(self):
1575 """ MAX translations per user - recycle the least recently used """
1577 self.nat44_add_address(self.nat_addr)
1578 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1579 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1582 # get maximum number of translations per user
1583 nat44_config = self.vapi.nat_show_config()
1585 # send more than maximum number of translations per user packets
1586 pkts_num = nat44_config.max_translations_per_user + 5
1588 for port in range(0, pkts_num):
1589 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1590 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1591 TCP(sport=1025 + port))
1593 self.pg0.add_stream(pkts)
1594 self.pg_enable_capture(self.pg_interfaces)
1597 # verify number of translated packet
1598 self.pg1.get_capture(pkts_num)
1600 def test_interface_addr(self):
1601 """ Acquire NAT44 addresses from interface """
1602 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1604 # no address in NAT pool
1605 adresses = self.vapi.nat44_address_dump()
1606 self.assertEqual(0, len(adresses))
1608 # configure interface address and check NAT address pool
1609 self.pg7.config_ip4()
1610 adresses = self.vapi.nat44_address_dump()
1611 self.assertEqual(1, len(adresses))
1612 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1614 # remove interface address and check NAT address pool
1615 self.pg7.unconfig_ip4()
1616 adresses = self.vapi.nat44_address_dump()
1617 self.assertEqual(0, len(adresses))
1619 def test_interface_addr_static_mapping(self):
1620 """ Static mapping with addresses from interface """
1621 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1622 self.nat44_add_static_mapping(
1624 external_sw_if_index=self.pg7.sw_if_index)
1626 # static mappings with external interface
1627 static_mappings = self.vapi.nat44_static_mapping_dump()
1628 self.assertEqual(1, len(static_mappings))
1629 self.assertEqual(self.pg7.sw_if_index,
1630 static_mappings[0].external_sw_if_index)
1632 # configure interface address and check static mappings
1633 self.pg7.config_ip4()
1634 static_mappings = self.vapi.nat44_static_mapping_dump()
1635 self.assertEqual(1, len(static_mappings))
1636 self.assertEqual(static_mappings[0].external_ip_address[0:4],
1637 self.pg7.local_ip4n)
1638 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1640 # remove interface address and check static mappings
1641 self.pg7.unconfig_ip4()
1642 static_mappings = self.vapi.nat44_static_mapping_dump()
1643 self.assertEqual(0, len(static_mappings))
1645 def test_ipfix_nat44_sess(self):
1646 """ IPFIX logging NAT44 session created/delted """
1647 self.ipfix_domain_id = 10
1648 self.ipfix_src_port = 20202
1649 colector_port = 30303
1650 bind_layers(UDP, IPFIX, dport=30303)
1651 self.nat44_add_address(self.nat_addr)
1652 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1653 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1655 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1656 src_address=self.pg3.local_ip4n,
1658 template_interval=10,
1659 collector_port=colector_port)
1660 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1661 src_port=self.ipfix_src_port)
1663 pkts = self.create_stream_in(self.pg0, self.pg1)
1664 self.pg0.add_stream(pkts)
1665 self.pg_enable_capture(self.pg_interfaces)
1667 capture = self.pg1.get_capture(len(pkts))
1668 self.verify_capture_out(capture)
1669 self.nat44_add_address(self.nat_addr, is_add=0)
1670 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1671 capture = self.pg3.get_capture(3)
1672 ipfix = IPFIXDecoder()
1673 # first load template
1675 self.assertTrue(p.haslayer(IPFIX))
1676 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1677 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1678 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1679 self.assertEqual(p[UDP].dport, colector_port)
1680 self.assertEqual(p[IPFIX].observationDomainID,
1681 self.ipfix_domain_id)
1682 if p.haslayer(Template):
1683 ipfix.add_template(p.getlayer(Template))
1684 # verify events in data set
1686 if p.haslayer(Data):
1687 data = ipfix.decode_data_set(p.getlayer(Set))
1688 self.verify_ipfix_nat44_ses(data)
1690 def test_ipfix_addr_exhausted(self):
1691 """ IPFIX logging NAT addresses exhausted """
1692 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1693 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1695 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1696 src_address=self.pg3.local_ip4n,
1698 template_interval=10)
1699 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1700 src_port=self.ipfix_src_port)
1702 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1703 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1705 self.pg0.add_stream(p)
1706 self.pg_enable_capture(self.pg_interfaces)
1708 capture = self.pg1.get_capture(0)
1709 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1710 capture = self.pg3.get_capture(3)
1711 ipfix = IPFIXDecoder()
1712 # first load template
1714 self.assertTrue(p.haslayer(IPFIX))
1715 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1716 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1717 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1718 self.assertEqual(p[UDP].dport, 4739)
1719 self.assertEqual(p[IPFIX].observationDomainID,
1720 self.ipfix_domain_id)
1721 if p.haslayer(Template):
1722 ipfix.add_template(p.getlayer(Template))
1723 # verify events in data set
1725 if p.haslayer(Data):
1726 data = ipfix.decode_data_set(p.getlayer(Set))
1727 self.verify_ipfix_addr_exhausted(data)
1729 def test_pool_addr_fib(self):
1730 """ NAT44 add pool addresses to FIB """
1731 static_addr = '10.0.0.10'
1732 self.nat44_add_address(self.nat_addr)
1733 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1734 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1736 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
1739 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1740 ARP(op=ARP.who_has, pdst=self.nat_addr,
1741 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1742 self.pg1.add_stream(p)
1743 self.pg_enable_capture(self.pg_interfaces)
1745 capture = self.pg1.get_capture(1)
1746 self.assertTrue(capture[0].haslayer(ARP))
1747 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1750 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1751 ARP(op=ARP.who_has, pdst=static_addr,
1752 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1753 self.pg1.add_stream(p)
1754 self.pg_enable_capture(self.pg_interfaces)
1756 capture = self.pg1.get_capture(1)
1757 self.assertTrue(capture[0].haslayer(ARP))
1758 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1760 # send ARP to non-NAT44 interface
1761 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1762 ARP(op=ARP.who_has, pdst=self.nat_addr,
1763 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1764 self.pg2.add_stream(p)
1765 self.pg_enable_capture(self.pg_interfaces)
1767 capture = self.pg1.get_capture(0)
1769 # remove addresses and verify
1770 self.nat44_add_address(self.nat_addr, is_add=0)
1771 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
1774 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1775 ARP(op=ARP.who_has, pdst=self.nat_addr,
1776 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1777 self.pg1.add_stream(p)
1778 self.pg_enable_capture(self.pg_interfaces)
1780 capture = self.pg1.get_capture(0)
1782 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1783 ARP(op=ARP.who_has, pdst=static_addr,
1784 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1785 self.pg1.add_stream(p)
1786 self.pg_enable_capture(self.pg_interfaces)
1788 capture = self.pg1.get_capture(0)
1790 def test_vrf_mode(self):
1791 """ NAT44 tenant VRF aware address pool mode """
1795 nat_ip1 = "10.0.0.10"
1796 nat_ip2 = "10.0.0.11"
1798 self.pg0.unconfig_ip4()
1799 self.pg1.unconfig_ip4()
1800 self.pg0.set_table_ip4(vrf_id1)
1801 self.pg1.set_table_ip4(vrf_id2)
1802 self.pg0.config_ip4()
1803 self.pg1.config_ip4()
1805 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
1806 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
1807 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1808 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1809 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1813 pkts = self.create_stream_in(self.pg0, self.pg2)
1814 self.pg0.add_stream(pkts)
1815 self.pg_enable_capture(self.pg_interfaces)
1817 capture = self.pg2.get_capture(len(pkts))
1818 self.verify_capture_out(capture, nat_ip1)
1821 pkts = self.create_stream_in(self.pg1, self.pg2)
1822 self.pg1.add_stream(pkts)
1823 self.pg_enable_capture(self.pg_interfaces)
1825 capture = self.pg2.get_capture(len(pkts))
1826 self.verify_capture_out(capture, nat_ip2)
1828 def test_vrf_feature_independent(self):
1829 """ NAT44 tenant VRF independent address pool mode """
1831 nat_ip1 = "10.0.0.10"
1832 nat_ip2 = "10.0.0.11"
1834 self.nat44_add_address(nat_ip1)
1835 self.nat44_add_address(nat_ip2)
1836 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1837 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1838 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1842 pkts = self.create_stream_in(self.pg0, self.pg2)
1843 self.pg0.add_stream(pkts)
1844 self.pg_enable_capture(self.pg_interfaces)
1846 capture = self.pg2.get_capture(len(pkts))
1847 self.verify_capture_out(capture, nat_ip1)
1850 pkts = self.create_stream_in(self.pg1, self.pg2)
1851 self.pg1.add_stream(pkts)
1852 self.pg_enable_capture(self.pg_interfaces)
1854 capture = self.pg2.get_capture(len(pkts))
1855 self.verify_capture_out(capture, nat_ip1)
1857 def test_dynamic_ipless_interfaces(self):
1858 """ NAT44 interfaces without configured IP address """
1860 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1861 self.pg7.remote_mac,
1862 self.pg7.remote_ip4n,
1864 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1865 self.pg8.remote_mac,
1866 self.pg8.remote_ip4n,
1869 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1870 dst_address_length=32,
1871 next_hop_address=self.pg7.remote_ip4n,
1872 next_hop_sw_if_index=self.pg7.sw_if_index)
1873 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1874 dst_address_length=32,
1875 next_hop_address=self.pg8.remote_ip4n,
1876 next_hop_sw_if_index=self.pg8.sw_if_index)
1878 self.nat44_add_address(self.nat_addr)
1879 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1880 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1884 pkts = self.create_stream_in(self.pg7, self.pg8)
1885 self.pg7.add_stream(pkts)
1886 self.pg_enable_capture(self.pg_interfaces)
1888 capture = self.pg8.get_capture(len(pkts))
1889 self.verify_capture_out(capture)
1892 pkts = self.create_stream_out(self.pg8, self.nat_addr)
1893 self.pg8.add_stream(pkts)
1894 self.pg_enable_capture(self.pg_interfaces)
1896 capture = self.pg7.get_capture(len(pkts))
1897 self.verify_capture_in(capture, self.pg7)
1899 def test_static_ipless_interfaces(self):
1900 """ NAT44 interfaces without configured IP address - 1:1 NAT """
1902 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1903 self.pg7.remote_mac,
1904 self.pg7.remote_ip4n,
1906 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1907 self.pg8.remote_mac,
1908 self.pg8.remote_ip4n,
1911 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1912 dst_address_length=32,
1913 next_hop_address=self.pg7.remote_ip4n,
1914 next_hop_sw_if_index=self.pg7.sw_if_index)
1915 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1916 dst_address_length=32,
1917 next_hop_address=self.pg8.remote_ip4n,
1918 next_hop_sw_if_index=self.pg8.sw_if_index)
1920 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
1921 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1922 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1926 pkts = self.create_stream_out(self.pg8)
1927 self.pg8.add_stream(pkts)
1928 self.pg_enable_capture(self.pg_interfaces)
1930 capture = self.pg7.get_capture(len(pkts))
1931 self.verify_capture_in(capture, self.pg7)
1934 pkts = self.create_stream_in(self.pg7, self.pg8)
1935 self.pg7.add_stream(pkts)
1936 self.pg_enable_capture(self.pg_interfaces)
1938 capture = self.pg8.get_capture(len(pkts))
1939 self.verify_capture_out(capture, self.nat_addr, True)
1941 def test_static_with_port_ipless_interfaces(self):
1942 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
1944 self.tcp_port_out = 30606
1945 self.udp_port_out = 30607
1946 self.icmp_id_out = 30608
1948 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1949 self.pg7.remote_mac,
1950 self.pg7.remote_ip4n,
1952 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1953 self.pg8.remote_mac,
1954 self.pg8.remote_ip4n,
1957 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1958 dst_address_length=32,
1959 next_hop_address=self.pg7.remote_ip4n,
1960 next_hop_sw_if_index=self.pg7.sw_if_index)
1961 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1962 dst_address_length=32,
1963 next_hop_address=self.pg8.remote_ip4n,
1964 next_hop_sw_if_index=self.pg8.sw_if_index)
1966 self.nat44_add_address(self.nat_addr)
1967 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1968 self.tcp_port_in, self.tcp_port_out,
1969 proto=IP_PROTOS.tcp)
1970 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1971 self.udp_port_in, self.udp_port_out,
1972 proto=IP_PROTOS.udp)
1973 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1974 self.icmp_id_in, self.icmp_id_out,
1975 proto=IP_PROTOS.icmp)
1976 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1977 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1981 pkts = self.create_stream_out(self.pg8)
1982 self.pg8.add_stream(pkts)
1983 self.pg_enable_capture(self.pg_interfaces)
1985 capture = self.pg7.get_capture(len(pkts))
1986 self.verify_capture_in(capture, self.pg7)
1989 pkts = self.create_stream_in(self.pg7, self.pg8)
1990 self.pg7.add_stream(pkts)
1991 self.pg_enable_capture(self.pg_interfaces)
1993 capture = self.pg8.get_capture(len(pkts))
1994 self.verify_capture_out(capture)
1996 def test_static_unknown_proto(self):
1997 """ 1:1 NAT translate packet with unknown protocol """
1998 nat_ip = "10.0.0.10"
1999 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2000 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2001 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2005 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2006 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2008 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2009 TCP(sport=1234, dport=1234))
2010 self.pg0.add_stream(p)
2011 self.pg_enable_capture(self.pg_interfaces)
2013 p = self.pg1.get_capture(1)
2016 self.assertEqual(packet[IP].src, nat_ip)
2017 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2018 self.assertTrue(packet.haslayer(GRE))
2019 self.check_ip_checksum(packet)
2021 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2025 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2026 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2028 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2029 TCP(sport=1234, dport=1234))
2030 self.pg1.add_stream(p)
2031 self.pg_enable_capture(self.pg_interfaces)
2033 p = self.pg0.get_capture(1)
2036 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2037 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2038 self.assertTrue(packet.haslayer(GRE))
2039 self.check_ip_checksum(packet)
2041 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2044 def test_hairpinning_static_unknown_proto(self):
2045 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2047 host = self.pg0.remote_hosts[0]
2048 server = self.pg0.remote_hosts[1]
2050 host_nat_ip = "10.0.0.10"
2051 server_nat_ip = "10.0.0.11"
2053 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2054 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2055 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2056 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2060 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2061 IP(src=host.ip4, dst=server_nat_ip) /
2063 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2064 TCP(sport=1234, dport=1234))
2065 self.pg0.add_stream(p)
2066 self.pg_enable_capture(self.pg_interfaces)
2068 p = self.pg0.get_capture(1)
2071 self.assertEqual(packet[IP].src, host_nat_ip)
2072 self.assertEqual(packet[IP].dst, server.ip4)
2073 self.assertTrue(packet.haslayer(GRE))
2074 self.check_ip_checksum(packet)
2076 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2080 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2081 IP(src=server.ip4, dst=host_nat_ip) /
2083 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2084 TCP(sport=1234, dport=1234))
2085 self.pg0.add_stream(p)
2086 self.pg_enable_capture(self.pg_interfaces)
2088 p = self.pg0.get_capture(1)
2091 self.assertEqual(packet[IP].src, server_nat_ip)
2092 self.assertEqual(packet[IP].dst, host.ip4)
2093 self.assertTrue(packet.haslayer(GRE))
2094 self.check_ip_checksum(packet)
2096 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2099 def test_unknown_proto(self):
2100 """ NAT44 translate packet with unknown protocol """
2101 self.nat44_add_address(self.nat_addr)
2102 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2103 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2107 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2108 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2109 TCP(sport=self.tcp_port_in, dport=20))
2110 self.pg0.add_stream(p)
2111 self.pg_enable_capture(self.pg_interfaces)
2113 p = self.pg1.get_capture(1)
2115 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2116 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2118 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2119 TCP(sport=1234, dport=1234))
2120 self.pg0.add_stream(p)
2121 self.pg_enable_capture(self.pg_interfaces)
2123 p = self.pg1.get_capture(1)
2126 self.assertEqual(packet[IP].src, self.nat_addr)
2127 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2128 self.assertTrue(packet.haslayer(GRE))
2129 self.check_ip_checksum(packet)
2131 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2135 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2136 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2138 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2139 TCP(sport=1234, dport=1234))
2140 self.pg1.add_stream(p)
2141 self.pg_enable_capture(self.pg_interfaces)
2143 p = self.pg0.get_capture(1)
2146 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2147 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2148 self.assertTrue(packet.haslayer(GRE))
2149 self.check_ip_checksum(packet)
2151 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2154 def test_hairpinning_unknown_proto(self):
2155 """ NAT44 translate packet with unknown protocol - hairpinning """
2156 host = self.pg0.remote_hosts[0]
2157 server = self.pg0.remote_hosts[1]
2160 server_in_port = 5678
2161 server_out_port = 8765
2162 server_nat_ip = "10.0.0.11"
2164 self.nat44_add_address(self.nat_addr)
2165 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2166 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2169 # add static mapping for server
2170 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2173 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2174 IP(src=host.ip4, dst=server_nat_ip) /
2175 TCP(sport=host_in_port, dport=server_out_port))
2176 self.pg0.add_stream(p)
2177 self.pg_enable_capture(self.pg_interfaces)
2179 capture = self.pg0.get_capture(1)
2181 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2182 IP(src=host.ip4, dst=server_nat_ip) /
2184 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2185 TCP(sport=1234, dport=1234))
2186 self.pg0.add_stream(p)
2187 self.pg_enable_capture(self.pg_interfaces)
2189 p = self.pg0.get_capture(1)
2192 self.assertEqual(packet[IP].src, self.nat_addr)
2193 self.assertEqual(packet[IP].dst, server.ip4)
2194 self.assertTrue(packet.haslayer(GRE))
2195 self.check_ip_checksum(packet)
2197 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2201 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2202 IP(src=server.ip4, dst=self.nat_addr) /
2204 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2205 TCP(sport=1234, dport=1234))
2206 self.pg0.add_stream(p)
2207 self.pg_enable_capture(self.pg_interfaces)
2209 p = self.pg0.get_capture(1)
2212 self.assertEqual(packet[IP].src, server_nat_ip)
2213 self.assertEqual(packet[IP].dst, host.ip4)
2214 self.assertTrue(packet.haslayer(GRE))
2215 self.check_ip_checksum(packet)
2217 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2220 def test_output_feature(self):
2221 """ NAT44 interface output feature (in2out postrouting) """
2222 self.nat44_add_address(self.nat_addr)
2223 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2224 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2225 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2229 pkts = self.create_stream_in(self.pg0, self.pg3)
2230 self.pg0.add_stream(pkts)
2231 self.pg_enable_capture(self.pg_interfaces)
2233 capture = self.pg3.get_capture(len(pkts))
2234 self.verify_capture_out(capture)
2237 pkts = self.create_stream_out(self.pg3)
2238 self.pg3.add_stream(pkts)
2239 self.pg_enable_capture(self.pg_interfaces)
2241 capture = self.pg0.get_capture(len(pkts))
2242 self.verify_capture_in(capture, self.pg0)
2244 # from non-NAT interface to NAT inside interface
2245 pkts = self.create_stream_in(self.pg2, self.pg0)
2246 self.pg2.add_stream(pkts)
2247 self.pg_enable_capture(self.pg_interfaces)
2249 capture = self.pg0.get_capture(len(pkts))
2250 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2252 def test_output_feature_vrf_aware(self):
2253 """ NAT44 interface output feature VRF aware (in2out postrouting) """
2254 nat_ip_vrf10 = "10.0.0.10"
2255 nat_ip_vrf20 = "10.0.0.20"
2257 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2258 dst_address_length=32,
2259 next_hop_address=self.pg3.remote_ip4n,
2260 next_hop_sw_if_index=self.pg3.sw_if_index,
2262 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2263 dst_address_length=32,
2264 next_hop_address=self.pg3.remote_ip4n,
2265 next_hop_sw_if_index=self.pg3.sw_if_index,
2268 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2269 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2270 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2271 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2272 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2276 pkts = self.create_stream_in(self.pg4, self.pg3)
2277 self.pg4.add_stream(pkts)
2278 self.pg_enable_capture(self.pg_interfaces)
2280 capture = self.pg3.get_capture(len(pkts))
2281 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2284 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2285 self.pg3.add_stream(pkts)
2286 self.pg_enable_capture(self.pg_interfaces)
2288 capture = self.pg4.get_capture(len(pkts))
2289 self.verify_capture_in(capture, self.pg4)
2292 pkts = self.create_stream_in(self.pg6, self.pg3)
2293 self.pg6.add_stream(pkts)
2294 self.pg_enable_capture(self.pg_interfaces)
2296 capture = self.pg3.get_capture(len(pkts))
2297 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2300 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2301 self.pg3.add_stream(pkts)
2302 self.pg_enable_capture(self.pg_interfaces)
2304 capture = self.pg6.get_capture(len(pkts))
2305 self.verify_capture_in(capture, self.pg6)
2307 def test_output_feature_hairpinning(self):
2308 """ NAT44 interface output feature hairpinning (in2out postrouting) """
2309 host = self.pg0.remote_hosts[0]
2310 server = self.pg0.remote_hosts[1]
2313 server_in_port = 5678
2314 server_out_port = 8765
2316 self.nat44_add_address(self.nat_addr)
2317 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2318 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2321 # add static mapping for server
2322 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2323 server_in_port, server_out_port,
2324 proto=IP_PROTOS.tcp)
2326 # send packet from host to server
2327 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2328 IP(src=host.ip4, dst=self.nat_addr) /
2329 TCP(sport=host_in_port, dport=server_out_port))
2330 self.pg0.add_stream(p)
2331 self.pg_enable_capture(self.pg_interfaces)
2333 capture = self.pg0.get_capture(1)
2338 self.assertEqual(ip.src, self.nat_addr)
2339 self.assertEqual(ip.dst, server.ip4)
2340 self.assertNotEqual(tcp.sport, host_in_port)
2341 self.assertEqual(tcp.dport, server_in_port)
2342 self.check_tcp_checksum(p)
2343 host_out_port = tcp.sport
2345 self.logger.error(ppp("Unexpected or invalid packet:", p))
2348 # send reply from server to host
2349 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2350 IP(src=server.ip4, dst=self.nat_addr) /
2351 TCP(sport=server_in_port, dport=host_out_port))
2352 self.pg0.add_stream(p)
2353 self.pg_enable_capture(self.pg_interfaces)
2355 capture = self.pg0.get_capture(1)
2360 self.assertEqual(ip.src, self.nat_addr)
2361 self.assertEqual(ip.dst, host.ip4)
2362 self.assertEqual(tcp.sport, server_out_port)
2363 self.assertEqual(tcp.dport, host_in_port)
2364 self.check_tcp_checksum(p)
2366 self.logger.error(ppp("Unexpected or invalid packet:"), p)
2370 super(TestNAT44, self).tearDown()
2371 if not self.vpp_dead:
2372 self.logger.info(self.vapi.cli("show nat44 verbose"))
2376 class TestDeterministicNAT(MethodHolder):
2377 """ Deterministic NAT Test Cases """
2380 def setUpConstants(cls):
2381 super(TestDeterministicNAT, cls).setUpConstants()
2382 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
2385 def setUpClass(cls):
2386 super(TestDeterministicNAT, cls).setUpClass()
2389 cls.tcp_port_in = 6303
2390 cls.tcp_external_port = 6303
2391 cls.udp_port_in = 6304
2392 cls.udp_external_port = 6304
2393 cls.icmp_id_in = 6305
2394 cls.nat_addr = '10.0.0.3'
2396 cls.create_pg_interfaces(range(3))
2397 cls.interfaces = list(cls.pg_interfaces)
2399 for i in cls.interfaces:
2404 cls.pg0.generate_remote_hosts(2)
2405 cls.pg0.configure_ipv4_neighbors()
2408 super(TestDeterministicNAT, cls).tearDownClass()
2411 def create_stream_in(self, in_if, out_if, ttl=64):
2413 Create packet stream for inside network
2415 :param in_if: Inside interface
2416 :param out_if: Outside interface
2417 :param ttl: TTL of generated packets
2421 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2422 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2423 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
2427 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2428 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2429 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2433 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2434 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2435 ICMP(id=self.icmp_id_in, type='echo-request'))
2440 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2442 Create packet stream for outside network
2444 :param out_if: Outside interface
2445 :param dst_ip: Destination IP address (Default use global NAT address)
2446 :param ttl: TTL of generated packets
2449 dst_ip = self.nat_addr
2452 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2453 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2454 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2458 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2459 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2460 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2464 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2465 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2466 ICMP(id=self.icmp_external_id, type='echo-reply'))
2471 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2473 Verify captured packets on outside network
2475 :param capture: Captured packets
2476 :param nat_ip: Translated IP address (Default use global NAT address)
2477 :param same_port: Sorce port number is not translated (Default False)
2478 :param packet_num: Expected number of packets (Default 3)
2481 nat_ip = self.nat_addr
2482 self.assertEqual(packet_num, len(capture))
2483 for packet in capture:
2485 self.assertEqual(packet[IP].src, nat_ip)
2486 if packet.haslayer(TCP):
2487 self.tcp_port_out = packet[TCP].sport
2488 elif packet.haslayer(UDP):
2489 self.udp_port_out = packet[UDP].sport
2491 self.icmp_external_id = packet[ICMP].id
2493 self.logger.error(ppp("Unexpected or invalid packet "
2494 "(outside network):", packet))
2497 def initiate_tcp_session(self, in_if, out_if):
2499 Initiates TCP session
2501 :param in_if: Inside interface
2502 :param out_if: Outside interface
2505 # SYN packet in->out
2506 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2507 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2508 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2511 self.pg_enable_capture(self.pg_interfaces)
2513 capture = out_if.get_capture(1)
2515 self.tcp_port_out = p[TCP].sport
2517 # SYN + ACK packet out->in
2518 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2519 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
2520 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2522 out_if.add_stream(p)
2523 self.pg_enable_capture(self.pg_interfaces)
2525 in_if.get_capture(1)
2527 # ACK packet in->out
2528 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2529 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2530 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2533 self.pg_enable_capture(self.pg_interfaces)
2535 out_if.get_capture(1)
2538 self.logger.error("TCP 3 way handshake failed")
2541 def verify_ipfix_max_entries_per_user(self, data):
2543 Verify IPFIX maximum entries per user exceeded event
2545 :param data: Decoded IPFIX data records
2547 self.assertEqual(1, len(data))
2550 self.assertEqual(ord(record[230]), 13)
2551 # natQuotaExceededEvent
2552 self.assertEqual('\x03\x00\x00\x00', record[466])
2554 self.assertEqual(self.pg0.remote_ip4n, record[8])
2556 def test_deterministic_mode(self):
2557 """ NAT plugin run deterministic mode """
2558 in_addr = '172.16.255.0'
2559 out_addr = '172.17.255.50'
2560 in_addr_t = '172.16.255.20'
2561 in_addr_n = socket.inet_aton(in_addr)
2562 out_addr_n = socket.inet_aton(out_addr)
2563 in_addr_t_n = socket.inet_aton(in_addr_t)
2567 nat_config = self.vapi.nat_show_config()
2568 self.assertEqual(1, nat_config.deterministic)
2570 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
2572 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
2573 self.assertEqual(rep1.out_addr[:4], out_addr_n)
2574 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
2575 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2577 deterministic_mappings = self.vapi.nat_det_map_dump()
2578 self.assertEqual(len(deterministic_mappings), 1)
2579 dsm = deterministic_mappings[0]
2580 self.assertEqual(in_addr_n, dsm.in_addr[:4])
2581 self.assertEqual(in_plen, dsm.in_plen)
2582 self.assertEqual(out_addr_n, dsm.out_addr[:4])
2583 self.assertEqual(out_plen, dsm.out_plen)
2585 self.clear_nat_det()
2586 deterministic_mappings = self.vapi.nat_det_map_dump()
2587 self.assertEqual(len(deterministic_mappings), 0)
2589 def test_set_timeouts(self):
2590 """ Set deterministic NAT timeouts """
2591 timeouts_before = self.vapi.nat_det_get_timeouts()
2593 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
2594 timeouts_before.tcp_established + 10,
2595 timeouts_before.tcp_transitory + 10,
2596 timeouts_before.icmp + 10)
2598 timeouts_after = self.vapi.nat_det_get_timeouts()
2600 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
2601 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
2602 self.assertNotEqual(timeouts_before.tcp_established,
2603 timeouts_after.tcp_established)
2604 self.assertNotEqual(timeouts_before.tcp_transitory,
2605 timeouts_after.tcp_transitory)
2607 def test_det_in(self):
2608 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
2610 nat_ip = "10.0.0.10"
2612 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2614 socket.inet_aton(nat_ip),
2616 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2617 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2621 pkts = self.create_stream_in(self.pg0, self.pg1)
2622 self.pg0.add_stream(pkts)
2623 self.pg_enable_capture(self.pg_interfaces)
2625 capture = self.pg1.get_capture(len(pkts))
2626 self.verify_capture_out(capture, nat_ip)
2629 pkts = self.create_stream_out(self.pg1, nat_ip)
2630 self.pg1.add_stream(pkts)
2631 self.pg_enable_capture(self.pg_interfaces)
2633 capture = self.pg0.get_capture(len(pkts))
2634 self.verify_capture_in(capture, self.pg0)
2637 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
2638 self.assertEqual(len(sessions), 3)
2642 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2643 self.assertEqual(s.in_port, self.tcp_port_in)
2644 self.assertEqual(s.out_port, self.tcp_port_out)
2645 self.assertEqual(s.ext_port, self.tcp_external_port)
2649 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2650 self.assertEqual(s.in_port, self.udp_port_in)
2651 self.assertEqual(s.out_port, self.udp_port_out)
2652 self.assertEqual(s.ext_port, self.udp_external_port)
2656 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2657 self.assertEqual(s.in_port, self.icmp_id_in)
2658 self.assertEqual(s.out_port, self.icmp_external_id)
2660 def test_multiple_users(self):
2661 """ Deterministic NAT multiple users """
2663 nat_ip = "10.0.0.10"
2665 external_port = 6303
2667 host0 = self.pg0.remote_hosts[0]
2668 host1 = self.pg0.remote_hosts[1]
2670 self.vapi.nat_det_add_del_map(host0.ip4n,
2672 socket.inet_aton(nat_ip),
2674 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2675 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2679 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2680 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2681 TCP(sport=port_in, dport=external_port))
2682 self.pg0.add_stream(p)
2683 self.pg_enable_capture(self.pg_interfaces)
2685 capture = self.pg1.get_capture(1)
2690 self.assertEqual(ip.src, nat_ip)
2691 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2692 self.assertEqual(tcp.dport, external_port)
2693 port_out0 = tcp.sport
2695 self.logger.error(ppp("Unexpected or invalid packet:", p))
2699 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2700 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2701 TCP(sport=port_in, dport=external_port))
2702 self.pg0.add_stream(p)
2703 self.pg_enable_capture(self.pg_interfaces)
2705 capture = self.pg1.get_capture(1)
2710 self.assertEqual(ip.src, nat_ip)
2711 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2712 self.assertEqual(tcp.dport, external_port)
2713 port_out1 = tcp.sport
2715 self.logger.error(ppp("Unexpected or invalid packet:", p))
2718 dms = self.vapi.nat_det_map_dump()
2719 self.assertEqual(1, len(dms))
2720 self.assertEqual(2, dms[0].ses_num)
2723 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2724 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2725 TCP(sport=external_port, dport=port_out0))
2726 self.pg1.add_stream(p)
2727 self.pg_enable_capture(self.pg_interfaces)
2729 capture = self.pg0.get_capture(1)
2734 self.assertEqual(ip.src, self.pg1.remote_ip4)
2735 self.assertEqual(ip.dst, host0.ip4)
2736 self.assertEqual(tcp.dport, port_in)
2737 self.assertEqual(tcp.sport, external_port)
2739 self.logger.error(ppp("Unexpected or invalid packet:", p))
2743 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2744 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2745 TCP(sport=external_port, dport=port_out1))
2746 self.pg1.add_stream(p)
2747 self.pg_enable_capture(self.pg_interfaces)
2749 capture = self.pg0.get_capture(1)
2754 self.assertEqual(ip.src, self.pg1.remote_ip4)
2755 self.assertEqual(ip.dst, host1.ip4)
2756 self.assertEqual(tcp.dport, port_in)
2757 self.assertEqual(tcp.sport, external_port)
2759 self.logger.error(ppp("Unexpected or invalid packet", p))
2762 # session close api test
2763 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
2765 self.pg1.remote_ip4n,
2767 dms = self.vapi.nat_det_map_dump()
2768 self.assertEqual(dms[0].ses_num, 1)
2770 self.vapi.nat_det_close_session_in(host0.ip4n,
2772 self.pg1.remote_ip4n,
2774 dms = self.vapi.nat_det_map_dump()
2775 self.assertEqual(dms[0].ses_num, 0)
2777 def test_tcp_session_close_detection_in(self):
2778 """ Deterministic NAT TCP session close from inside network """
2779 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2781 socket.inet_aton(self.nat_addr),
2783 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2784 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2787 self.initiate_tcp_session(self.pg0, self.pg1)
2789 # close the session from inside
2791 # FIN packet in -> out
2792 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2793 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2794 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2796 self.pg0.add_stream(p)
2797 self.pg_enable_capture(self.pg_interfaces)
2799 self.pg1.get_capture(1)
2803 # ACK packet out -> in
2804 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2805 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2806 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2810 # FIN packet out -> in
2811 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2812 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2813 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2817 self.pg1.add_stream(pkts)
2818 self.pg_enable_capture(self.pg_interfaces)
2820 self.pg0.get_capture(2)
2822 # ACK packet in -> out
2823 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2824 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2825 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2827 self.pg0.add_stream(p)
2828 self.pg_enable_capture(self.pg_interfaces)
2830 self.pg1.get_capture(1)
2832 # Check if deterministic NAT44 closed the session
2833 dms = self.vapi.nat_det_map_dump()
2834 self.assertEqual(0, dms[0].ses_num)
2836 self.logger.error("TCP session termination failed")
2839 def test_tcp_session_close_detection_out(self):
2840 """ Deterministic NAT TCP session close from outside network """
2841 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2843 socket.inet_aton(self.nat_addr),
2845 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2846 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2849 self.initiate_tcp_session(self.pg0, self.pg1)
2851 # close the session from outside
2853 # FIN packet out -> in
2854 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2855 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2856 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2858 self.pg1.add_stream(p)
2859 self.pg_enable_capture(self.pg_interfaces)
2861 self.pg0.get_capture(1)
2865 # ACK packet in -> out
2866 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2867 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2868 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2872 # ACK packet in -> out
2873 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2874 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2875 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2879 self.pg0.add_stream(pkts)
2880 self.pg_enable_capture(self.pg_interfaces)
2882 self.pg1.get_capture(2)
2884 # ACK packet out -> in
2885 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2886 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2887 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2889 self.pg1.add_stream(p)
2890 self.pg_enable_capture(self.pg_interfaces)
2892 self.pg0.get_capture(1)
2894 # Check if deterministic NAT44 closed the session
2895 dms = self.vapi.nat_det_map_dump()
2896 self.assertEqual(0, dms[0].ses_num)
2898 self.logger.error("TCP session termination failed")
2901 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2902 def test_session_timeout(self):
2903 """ Deterministic NAT session timeouts """
2904 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2906 socket.inet_aton(self.nat_addr),
2908 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2909 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2912 self.initiate_tcp_session(self.pg0, self.pg1)
2913 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
2914 pkts = self.create_stream_in(self.pg0, self.pg1)
2915 self.pg0.add_stream(pkts)
2916 self.pg_enable_capture(self.pg_interfaces)
2918 capture = self.pg1.get_capture(len(pkts))
2921 dms = self.vapi.nat_det_map_dump()
2922 self.assertEqual(0, dms[0].ses_num)
2924 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2925 def test_session_limit_per_user(self):
2926 """ Deterministic NAT maximum sessions per user limit """
2927 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2929 socket.inet_aton(self.nat_addr),
2931 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2932 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2934 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2935 src_address=self.pg2.local_ip4n,
2937 template_interval=10)
2938 self.vapi.nat_ipfix()
2941 for port in range(1025, 2025):
2942 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2943 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2944 UDP(sport=port, dport=port))
2947 self.pg0.add_stream(pkts)
2948 self.pg_enable_capture(self.pg_interfaces)
2950 capture = self.pg1.get_capture(len(pkts))
2952 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2953 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2954 UDP(sport=3001, dport=3002))
2955 self.pg0.add_stream(p)
2956 self.pg_enable_capture(self.pg_interfaces)
2958 capture = self.pg1.assert_nothing_captured()
2960 # verify ICMP error packet
2961 capture = self.pg0.get_capture(1)
2963 self.assertTrue(p.haslayer(ICMP))
2965 self.assertEqual(icmp.type, 3)
2966 self.assertEqual(icmp.code, 1)
2967 self.assertTrue(icmp.haslayer(IPerror))
2968 inner_ip = icmp[IPerror]
2969 self.assertEqual(inner_ip[UDPerror].sport, 3001)
2970 self.assertEqual(inner_ip[UDPerror].dport, 3002)
2972 dms = self.vapi.nat_det_map_dump()
2974 self.assertEqual(1000, dms[0].ses_num)
2976 # verify IPFIX logging
2977 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2979 capture = self.pg2.get_capture(2)
2980 ipfix = IPFIXDecoder()
2981 # first load template
2983 self.assertTrue(p.haslayer(IPFIX))
2984 if p.haslayer(Template):
2985 ipfix.add_template(p.getlayer(Template))
2986 # verify events in data set
2988 if p.haslayer(Data):
2989 data = ipfix.decode_data_set(p.getlayer(Set))
2990 self.verify_ipfix_max_entries_per_user(data)
2992 def clear_nat_det(self):
2994 Clear deterministic NAT configuration.
2996 self.vapi.nat_ipfix(enable=0)
2997 self.vapi.nat_det_set_timeouts()
2998 deterministic_mappings = self.vapi.nat_det_map_dump()
2999 for dsm in deterministic_mappings:
3000 self.vapi.nat_det_add_del_map(dsm.in_addr,
3006 interfaces = self.vapi.nat44_interface_dump()
3007 for intf in interfaces:
3008 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
3013 super(TestDeterministicNAT, self).tearDown()
3014 if not self.vpp_dead:
3015 self.logger.info(self.vapi.cli("show nat44 detail"))
3016 self.clear_nat_det()
3019 class TestNAT64(MethodHolder):
3020 """ NAT64 Test Cases """
3023 def setUpClass(cls):
3024 super(TestNAT64, cls).setUpClass()
3027 cls.tcp_port_in = 6303
3028 cls.tcp_port_out = 6303
3029 cls.udp_port_in = 6304
3030 cls.udp_port_out = 6304
3031 cls.icmp_id_in = 6305
3032 cls.icmp_id_out = 6305
3033 cls.nat_addr = '10.0.0.3'
3034 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3036 cls.vrf1_nat_addr = '10.0.10.3'
3037 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
3040 cls.create_pg_interfaces(range(3))
3041 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
3042 cls.ip6_interfaces.append(cls.pg_interfaces[2])
3043 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
3045 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
3047 cls.pg0.generate_remote_hosts(2)
3049 for i in cls.ip6_interfaces:
3052 i.configure_ipv6_neighbors()
3054 for i in cls.ip4_interfaces:
3060 super(TestNAT64, cls).tearDownClass()
3063 def test_pool(self):
3064 """ Add/delete address to NAT64 pool """
3065 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
3067 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
3069 addresses = self.vapi.nat64_pool_addr_dump()
3070 self.assertEqual(len(addresses), 1)
3071 self.assertEqual(addresses[0].address, nat_addr)
3073 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
3075 addresses = self.vapi.nat64_pool_addr_dump()
3076 self.assertEqual(len(addresses), 0)
3078 def test_interface(self):
3079 """ Enable/disable NAT64 feature on the interface """
3080 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3081 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3083 interfaces = self.vapi.nat64_interface_dump()
3084 self.assertEqual(len(interfaces), 2)
3087 for intf in interfaces:
3088 if intf.sw_if_index == self.pg0.sw_if_index:
3089 self.assertEqual(intf.is_inside, 1)
3091 elif intf.sw_if_index == self.pg1.sw_if_index:
3092 self.assertEqual(intf.is_inside, 0)
3094 self.assertTrue(pg0_found)
3095 self.assertTrue(pg1_found)
3097 features = self.vapi.cli("show interface features pg0")
3098 self.assertNotEqual(features.find('nat64-in2out'), -1)
3099 features = self.vapi.cli("show interface features pg1")
3100 self.assertNotEqual(features.find('nat64-out2in'), -1)
3102 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
3103 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
3105 interfaces = self.vapi.nat64_interface_dump()
3106 self.assertEqual(len(interfaces), 0)
3108 def test_static_bib(self):
3109 """ Add/delete static BIB entry """
3110 in_addr = socket.inet_pton(socket.AF_INET6,
3111 '2001:db8:85a3::8a2e:370:7334')
3112 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
3115 proto = IP_PROTOS.tcp
3117 self.vapi.nat64_add_del_static_bib(in_addr,
3122 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3127 self.assertEqual(bibe.i_addr, in_addr)
3128 self.assertEqual(bibe.o_addr, out_addr)
3129 self.assertEqual(bibe.i_port, in_port)
3130 self.assertEqual(bibe.o_port, out_port)
3131 self.assertEqual(static_bib_num, 1)
3133 self.vapi.nat64_add_del_static_bib(in_addr,
3139 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3144 self.assertEqual(static_bib_num, 0)
3146 def test_set_timeouts(self):
3147 """ Set NAT64 timeouts """
3148 # verify default values
3149 timeouts = self.vapi.nat64_get_timeouts()
3150 self.assertEqual(timeouts.udp, 300)
3151 self.assertEqual(timeouts.icmp, 60)
3152 self.assertEqual(timeouts.tcp_trans, 240)
3153 self.assertEqual(timeouts.tcp_est, 7440)
3154 self.assertEqual(timeouts.tcp_incoming_syn, 6)
3156 # set and verify custom values
3157 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
3158 tcp_est=7450, tcp_incoming_syn=10)
3159 timeouts = self.vapi.nat64_get_timeouts()
3160 self.assertEqual(timeouts.udp, 200)
3161 self.assertEqual(timeouts.icmp, 30)
3162 self.assertEqual(timeouts.tcp_trans, 250)
3163 self.assertEqual(timeouts.tcp_est, 7450)
3164 self.assertEqual(timeouts.tcp_incoming_syn, 10)
3166 def test_dynamic(self):
3167 """ NAT64 dynamic translation test """
3168 self.tcp_port_in = 6303
3169 self.udp_port_in = 6304
3170 self.icmp_id_in = 6305
3172 ses_num_start = self.nat64_get_ses_num()
3174 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3176 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3177 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3180 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3181 self.pg0.add_stream(pkts)
3182 self.pg_enable_capture(self.pg_interfaces)
3184 capture = self.pg1.get_capture(len(pkts))
3185 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3186 dst_ip=self.pg1.remote_ip4)
3189 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3190 self.pg1.add_stream(pkts)
3191 self.pg_enable_capture(self.pg_interfaces)
3193 capture = self.pg0.get_capture(len(pkts))
3194 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3195 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3198 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3199 self.pg0.add_stream(pkts)
3200 self.pg_enable_capture(self.pg_interfaces)
3202 capture = self.pg1.get_capture(len(pkts))
3203 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3204 dst_ip=self.pg1.remote_ip4)
3207 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3208 self.pg1.add_stream(pkts)
3209 self.pg_enable_capture(self.pg_interfaces)
3211 capture = self.pg0.get_capture(len(pkts))
3212 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3214 ses_num_end = self.nat64_get_ses_num()
3216 self.assertEqual(ses_num_end - ses_num_start, 3)
3218 # tenant with specific VRF
3219 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3220 self.vrf1_nat_addr_n,
3221 vrf_id=self.vrf1_id)
3222 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3224 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
3225 self.pg2.add_stream(pkts)
3226 self.pg_enable_capture(self.pg_interfaces)
3228 capture = self.pg1.get_capture(len(pkts))
3229 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3230 dst_ip=self.pg1.remote_ip4)
3232 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3233 self.pg1.add_stream(pkts)
3234 self.pg_enable_capture(self.pg_interfaces)
3236 capture = self.pg2.get_capture(len(pkts))
3237 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
3239 def test_static(self):
3240 """ NAT64 static translation test """
3241 self.tcp_port_in = 60303
3242 self.udp_port_in = 60304
3243 self.icmp_id_in = 60305
3244 self.tcp_port_out = 60303
3245 self.udp_port_out = 60304
3246 self.icmp_id_out = 60305
3248 ses_num_start = self.nat64_get_ses_num()
3250 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3252 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3253 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3255 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3260 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3265 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3272 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3273 self.pg0.add_stream(pkts)
3274 self.pg_enable_capture(self.pg_interfaces)
3276 capture = self.pg1.get_capture(len(pkts))
3277 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3278 dst_ip=self.pg1.remote_ip4, same_port=True)
3281 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3282 self.pg1.add_stream(pkts)
3283 self.pg_enable_capture(self.pg_interfaces)
3285 capture = self.pg0.get_capture(len(pkts))
3286 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3287 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3289 ses_num_end = self.nat64_get_ses_num()
3291 self.assertEqual(ses_num_end - ses_num_start, 3)
3293 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3294 def test_session_timeout(self):
3295 """ NAT64 session timeout """
3296 self.icmp_id_in = 1234
3297 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3299 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3300 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3301 self.vapi.nat64_set_timeouts(icmp=5)
3303 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3304 self.pg0.add_stream(pkts)
3305 self.pg_enable_capture(self.pg_interfaces)
3307 capture = self.pg1.get_capture(len(pkts))
3309 ses_num_before_timeout = self.nat64_get_ses_num()
3313 # ICMP session after timeout
3314 ses_num_after_timeout = self.nat64_get_ses_num()
3315 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
3317 def test_icmp_error(self):
3318 """ NAT64 ICMP Error message translation """
3319 self.tcp_port_in = 6303
3320 self.udp_port_in = 6304
3321 self.icmp_id_in = 6305
3323 ses_num_start = self.nat64_get_ses_num()
3325 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3327 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3328 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3330 # send some packets to create sessions
3331 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3332 self.pg0.add_stream(pkts)
3333 self.pg_enable_capture(self.pg_interfaces)
3335 capture_ip4 = self.pg1.get_capture(len(pkts))
3336 self.verify_capture_out(capture_ip4,
3337 nat_ip=self.nat_addr,
3338 dst_ip=self.pg1.remote_ip4)
3340 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3341 self.pg1.add_stream(pkts)
3342 self.pg_enable_capture(self.pg_interfaces)
3344 capture_ip6 = self.pg0.get_capture(len(pkts))
3345 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3346 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
3347 self.pg0.remote_ip6)
3350 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3351 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
3352 ICMPv6DestUnreach(code=1) /
3353 packet[IPv6] for packet in capture_ip6]
3354 self.pg0.add_stream(pkts)
3355 self.pg_enable_capture(self.pg_interfaces)
3357 capture = self.pg1.get_capture(len(pkts))
3358 for packet in capture:
3360 self.assertEqual(packet[IP].src, self.nat_addr)
3361 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3362 self.assertEqual(packet[ICMP].type, 3)
3363 self.assertEqual(packet[ICMP].code, 13)
3364 inner = packet[IPerror]
3365 self.assertEqual(inner.src, self.pg1.remote_ip4)
3366 self.assertEqual(inner.dst, self.nat_addr)
3367 self.check_icmp_checksum(packet)
3368 if inner.haslayer(TCPerror):
3369 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
3370 elif inner.haslayer(UDPerror):
3371 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
3373 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
3375 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3379 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3380 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3381 ICMP(type=3, code=13) /
3382 packet[IP] for packet in capture_ip4]
3383 self.pg1.add_stream(pkts)
3384 self.pg_enable_capture(self.pg_interfaces)
3386 capture = self.pg0.get_capture(len(pkts))
3387 for packet in capture:
3389 self.assertEqual(packet[IPv6].src, ip.src)
3390 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3391 icmp = packet[ICMPv6DestUnreach]
3392 self.assertEqual(icmp.code, 1)
3393 inner = icmp[IPerror6]
3394 self.assertEqual(inner.src, self.pg0.remote_ip6)
3395 self.assertEqual(inner.dst, ip.src)
3396 self.check_icmpv6_checksum(packet)
3397 if inner.haslayer(TCPerror):
3398 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
3399 elif inner.haslayer(UDPerror):
3400 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
3402 self.assertEqual(inner[ICMPv6EchoRequest].id,
3405 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3408 def test_hairpinning(self):
3409 """ NAT64 hairpinning """
3411 client = self.pg0.remote_hosts[0]
3412 server = self.pg0.remote_hosts[1]
3413 server_tcp_in_port = 22
3414 server_tcp_out_port = 4022
3415 server_udp_in_port = 23
3416 server_udp_out_port = 4023
3417 client_tcp_in_port = 1234
3418 client_udp_in_port = 1235
3419 client_tcp_out_port = 0
3420 client_udp_out_port = 0
3421 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
3422 nat_addr_ip6 = ip.src
3424 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3426 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3427 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3429 self.vapi.nat64_add_del_static_bib(server.ip6n,
3432 server_tcp_out_port,
3434 self.vapi.nat64_add_del_static_bib(server.ip6n,
3437 server_udp_out_port,
3442 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3443 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3444 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3446 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3447 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3448 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3450 self.pg0.add_stream(pkts)
3451 self.pg_enable_capture(self.pg_interfaces)
3453 capture = self.pg0.get_capture(len(pkts))
3454 for packet in capture:
3456 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3457 self.assertEqual(packet[IPv6].dst, server.ip6)
3458 if packet.haslayer(TCP):
3459 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3460 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3461 self.check_tcp_checksum(packet)
3462 client_tcp_out_port = packet[TCP].sport
3464 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3465 self.assertEqual(packet[UDP].dport, server_udp_in_port)
3466 self.check_udp_checksum(packet)
3467 client_udp_out_port = packet[UDP].sport
3469 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3474 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3475 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3476 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3478 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3479 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3480 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3482 self.pg0.add_stream(pkts)
3483 self.pg_enable_capture(self.pg_interfaces)
3485 capture = self.pg0.get_capture(len(pkts))
3486 for packet in capture:
3488 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3489 self.assertEqual(packet[IPv6].dst, client.ip6)
3490 if packet.haslayer(TCP):
3491 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3492 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3493 self.check_tcp_checksum(packet)
3495 self.assertEqual(packet[UDP].sport, server_udp_out_port)
3496 self.assertEqual(packet[UDP].dport, client_udp_in_port)
3497 self.check_udp_checksum(packet)
3499 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3504 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3505 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3506 ICMPv6DestUnreach(code=1) /
3507 packet[IPv6] for packet in capture]
3508 self.pg0.add_stream(pkts)
3509 self.pg_enable_capture(self.pg_interfaces)
3511 capture = self.pg0.get_capture(len(pkts))
3512 for packet in capture:
3514 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3515 self.assertEqual(packet[IPv6].dst, server.ip6)
3516 icmp = packet[ICMPv6DestUnreach]
3517 self.assertEqual(icmp.code, 1)
3518 inner = icmp[IPerror6]
3519 self.assertEqual(inner.src, server.ip6)
3520 self.assertEqual(inner.dst, nat_addr_ip6)
3521 self.check_icmpv6_checksum(packet)
3522 if inner.haslayer(TCPerror):
3523 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3524 self.assertEqual(inner[TCPerror].dport,
3525 client_tcp_out_port)
3527 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3528 self.assertEqual(inner[UDPerror].dport,
3529 client_udp_out_port)
3531 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3534 def test_prefix(self):
3535 """ NAT64 Network-Specific Prefix """
3537 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3539 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3540 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3541 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3542 self.vrf1_nat_addr_n,
3543 vrf_id=self.vrf1_id)
3544 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3547 global_pref64 = "2001:db8::"
3548 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
3549 global_pref64_len = 32
3550 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
3552 prefix = self.vapi.nat64_prefix_dump()
3553 self.assertEqual(len(prefix), 1)
3554 self.assertEqual(prefix[0].prefix, global_pref64_n)
3555 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
3556 self.assertEqual(prefix[0].vrf_id, 0)
3558 # Add tenant specific prefix
3559 vrf1_pref64 = "2001:db8:122:300::"
3560 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
3561 vrf1_pref64_len = 56
3562 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
3564 vrf_id=self.vrf1_id)
3565 prefix = self.vapi.nat64_prefix_dump()
3566 self.assertEqual(len(prefix), 2)
3569 pkts = self.create_stream_in_ip6(self.pg0,
3572 plen=global_pref64_len)
3573 self.pg0.add_stream(pkts)
3574 self.pg_enable_capture(self.pg_interfaces)
3576 capture = self.pg1.get_capture(len(pkts))
3577 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3578 dst_ip=self.pg1.remote_ip4)
3580 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3581 self.pg1.add_stream(pkts)
3582 self.pg_enable_capture(self.pg_interfaces)
3584 capture = self.pg0.get_capture(len(pkts))
3585 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3588 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
3590 # Tenant specific prefix
3591 pkts = self.create_stream_in_ip6(self.pg2,
3594 plen=vrf1_pref64_len)
3595 self.pg2.add_stream(pkts)
3596 self.pg_enable_capture(self.pg_interfaces)
3598 capture = self.pg1.get_capture(len(pkts))
3599 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3600 dst_ip=self.pg1.remote_ip4)
3602 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3603 self.pg1.add_stream(pkts)
3604 self.pg_enable_capture(self.pg_interfaces)
3606 capture = self.pg2.get_capture(len(pkts))
3607 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3610 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
3612 def test_unknown_proto(self):
3613 """ NAT64 translate packet with unknown protocol """
3615 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3617 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3618 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3619 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
3622 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3623 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
3624 TCP(sport=self.tcp_port_in, dport=20))
3625 self.pg0.add_stream(p)
3626 self.pg_enable_capture(self.pg_interfaces)
3628 p = self.pg1.get_capture(1)
3630 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3631 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
3633 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3634 TCP(sport=1234, dport=1234))
3635 self.pg0.add_stream(p)
3636 self.pg_enable_capture(self.pg_interfaces)
3638 p = self.pg1.get_capture(1)
3641 self.assertEqual(packet[IP].src, self.nat_addr)
3642 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3643 self.assertTrue(packet.haslayer(GRE))
3644 self.check_ip_checksum(packet)
3646 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3650 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3651 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3653 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3654 TCP(sport=1234, dport=1234))
3655 self.pg1.add_stream(p)
3656 self.pg_enable_capture(self.pg_interfaces)
3658 p = self.pg0.get_capture(1)
3661 self.assertEqual(packet[IPv6].src, remote_ip6)
3662 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3663 self.assertEqual(packet[IPv6].nh, 47)
3665 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3668 def test_hairpinning_unknown_proto(self):
3669 """ NAT64 translate packet with unknown protocol - hairpinning """
3671 client = self.pg0.remote_hosts[0]
3672 server = self.pg0.remote_hosts[1]
3673 server_tcp_in_port = 22
3674 server_tcp_out_port = 4022
3675 client_tcp_in_port = 1234
3676 client_tcp_out_port = 1235
3677 server_nat_ip = "10.0.0.100"
3678 client_nat_ip = "10.0.0.110"
3679 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
3680 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
3681 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
3682 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
3684 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
3686 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3687 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3689 self.vapi.nat64_add_del_static_bib(server.ip6n,
3692 server_tcp_out_port,
3695 self.vapi.nat64_add_del_static_bib(server.ip6n,
3701 self.vapi.nat64_add_del_static_bib(client.ip6n,
3704 client_tcp_out_port,
3708 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3709 IPv6(src=client.ip6, dst=server_nat_ip6) /
3710 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3711 self.pg0.add_stream(p)
3712 self.pg_enable_capture(self.pg_interfaces)
3714 p = self.pg0.get_capture(1)
3716 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3717 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
3719 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3720 TCP(sport=1234, dport=1234))
3721 self.pg0.add_stream(p)
3722 self.pg_enable_capture(self.pg_interfaces)
3724 p = self.pg0.get_capture(1)
3727 self.assertEqual(packet[IPv6].src, client_nat_ip6)
3728 self.assertEqual(packet[IPv6].dst, server.ip6)
3729 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3731 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3735 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3736 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
3738 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3739 TCP(sport=1234, dport=1234))
3740 self.pg0.add_stream(p)
3741 self.pg_enable_capture(self.pg_interfaces)
3743 p = self.pg0.get_capture(1)
3746 self.assertEqual(packet[IPv6].src, server_nat_ip6)
3747 self.assertEqual(packet[IPv6].dst, client.ip6)
3748 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3750 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3753 def nat64_get_ses_num(self):
3755 Return number of active NAT64 sessions.
3757 st = self.vapi.nat64_st_dump()
3760 def clear_nat64(self):
3762 Clear NAT64 configuration.
3764 self.vapi.nat64_set_timeouts()
3766 interfaces = self.vapi.nat64_interface_dump()
3767 for intf in interfaces:
3768 self.vapi.nat64_add_del_interface(intf.sw_if_index,
3772 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3775 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3783 bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
3786 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3794 bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
3797 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3805 adresses = self.vapi.nat64_pool_addr_dump()
3806 for addr in adresses:
3807 self.vapi.nat64_add_del_pool_addr_range(addr.address,
3812 prefixes = self.vapi.nat64_prefix_dump()
3813 for prefix in prefixes:
3814 self.vapi.nat64_add_del_prefix(prefix.prefix,
3816 vrf_id=prefix.vrf_id,
3820 super(TestNAT64, self).tearDown()
3821 if not self.vpp_dead:
3822 self.logger.info(self.vapi.cli("show nat64 pool"))
3823 self.logger.info(self.vapi.cli("show nat64 interfaces"))
3824 self.logger.info(self.vapi.cli("show nat64 prefix"))
3825 self.logger.info(self.vapi.cli("show nat64 bib all"))
3826 self.logger.info(self.vapi.cli("show nat64 session table all"))
3829 if __name__ == '__main__':
3830 unittest.main(testRunner=VppTestRunner)