7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
12 from scapy.layers.l2 import Ether, ARP, GRE
13 from scapy.data import IP_PROTOS
14 from scapy.packet import bind_layers
16 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
17 from time import sleep
18 from util import ip4_range
21 class MethodHolder(VppTestCase):
22 """ NAT create capture and verify method holder """
26 super(MethodHolder, cls).setUpClass()
29 super(MethodHolder, self).tearDown()
31 def check_ip_checksum(self, pkt):
33 Check IP checksum of the packet
35 :param pkt: Packet to check IP checksum
37 new = pkt.__class__(str(pkt))
39 new = new.__class__(str(new))
40 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
42 def check_tcp_checksum(self, pkt):
44 Check TCP checksum in IP packet
46 :param pkt: Packet to check TCP checksum
48 new = pkt.__class__(str(pkt))
50 new = new.__class__(str(new))
51 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
53 def check_udp_checksum(self, pkt):
55 Check UDP checksum in IP packet
57 :param pkt: Packet to check UDP checksum
59 new = pkt.__class__(str(pkt))
61 new = new.__class__(str(new))
62 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
64 def check_icmp_errror_embedded(self, pkt):
66 Check ICMP error embeded packet checksum
68 :param pkt: Packet to check ICMP error embeded packet checksum
70 if pkt.haslayer(IPerror):
71 new = pkt.__class__(str(pkt))
72 del new['IPerror'].chksum
73 new = new.__class__(str(new))
74 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
76 if pkt.haslayer(TCPerror):
77 new = pkt.__class__(str(pkt))
78 del new['TCPerror'].chksum
79 new = new.__class__(str(new))
80 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
82 if pkt.haslayer(UDPerror):
83 if pkt['UDPerror'].chksum != 0:
84 new = pkt.__class__(str(pkt))
85 del new['UDPerror'].chksum
86 new = new.__class__(str(new))
87 self.assertEqual(new['UDPerror'].chksum,
88 pkt['UDPerror'].chksum)
90 if pkt.haslayer(ICMPerror):
91 del new['ICMPerror'].chksum
92 new = new.__class__(str(new))
93 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
95 def check_icmp_checksum(self, pkt):
97 Check ICMP checksum in IPv4 packet
99 :param pkt: Packet to check ICMP checksum
101 new = pkt.__class__(str(pkt))
102 del new['ICMP'].chksum
103 new = new.__class__(str(new))
104 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
105 if pkt.haslayer(IPerror):
106 self.check_icmp_errror_embedded(pkt)
108 def check_icmpv6_checksum(self, pkt):
110 Check ICMPv6 checksum in IPv4 packet
112 :param pkt: Packet to check ICMPv6 checksum
114 new = pkt.__class__(str(pkt))
115 if pkt.haslayer(ICMPv6DestUnreach):
116 del new['ICMPv6DestUnreach'].cksum
117 new = new.__class__(str(new))
118 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
119 pkt['ICMPv6DestUnreach'].cksum)
120 self.check_icmp_errror_embedded(pkt)
121 if pkt.haslayer(ICMPv6EchoRequest):
122 del new['ICMPv6EchoRequest'].cksum
123 new = new.__class__(str(new))
124 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
125 pkt['ICMPv6EchoRequest'].cksum)
126 if pkt.haslayer(ICMPv6EchoReply):
127 del new['ICMPv6EchoReply'].cksum
128 new = new.__class__(str(new))
129 self.assertEqual(new['ICMPv6EchoReply'].cksum,
130 pkt['ICMPv6EchoReply'].cksum)
132 def create_stream_in(self, in_if, out_if, ttl=64):
134 Create packet stream for inside network
136 :param in_if: Inside interface
137 :param out_if: Outside interface
138 :param ttl: TTL of generated packets
142 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
143 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
144 TCP(sport=self.tcp_port_in, dport=20))
148 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
149 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
150 UDP(sport=self.udp_port_in, dport=20))
154 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
155 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
156 ICMP(id=self.icmp_id_in, type='echo-request'))
161 def compose_ip6(self, ip4, pref, plen):
163 Compose IPv4-embedded IPv6 addresses
165 :param ip4: IPv4 address
166 :param pref: IPv6 prefix
167 :param plen: IPv6 prefix length
168 :returns: IPv4-embedded IPv6 addresses
170 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
171 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
186 pref_n[10] = ip4_n[3]
190 pref_n[10] = ip4_n[2]
191 pref_n[11] = ip4_n[3]
194 pref_n[10] = ip4_n[1]
195 pref_n[11] = ip4_n[2]
196 pref_n[12] = ip4_n[3]
198 pref_n[12] = ip4_n[0]
199 pref_n[13] = ip4_n[1]
200 pref_n[14] = ip4_n[2]
201 pref_n[15] = ip4_n[3]
202 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
204 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
206 Create IPv6 packet stream for inside network
208 :param in_if: Inside interface
209 :param out_if: Outside interface
210 :param ttl: Hop Limit of generated packets
211 :param pref: NAT64 prefix
212 :param plen: NAT64 prefix length
216 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
218 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
221 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
222 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
223 TCP(sport=self.tcp_port_in, dport=20))
227 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
228 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
229 UDP(sport=self.udp_port_in, dport=20))
233 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
234 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
235 ICMPv6EchoRequest(id=self.icmp_id_in))
240 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
242 Create packet stream for outside network
244 :param out_if: Outside interface
245 :param dst_ip: Destination IP address (Default use global NAT address)
246 :param ttl: TTL of generated packets
249 dst_ip = self.nat_addr
252 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
253 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
254 TCP(dport=self.tcp_port_out, sport=20))
258 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
259 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
260 UDP(dport=self.udp_port_out, sport=20))
264 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
265 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
266 ICMP(id=self.icmp_id_out, type='echo-reply'))
271 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
272 packet_num=3, dst_ip=None):
274 Verify captured packets on outside network
276 :param capture: Captured packets
277 :param nat_ip: Translated IP address (Default use global NAT address)
278 :param same_port: Sorce port number is not translated (Default False)
279 :param packet_num: Expected number of packets (Default 3)
280 :param dst_ip: Destination IP address (Default do not verify)
283 nat_ip = self.nat_addr
284 self.assertEqual(packet_num, len(capture))
285 for packet in capture:
287 self.check_ip_checksum(packet)
288 self.assertEqual(packet[IP].src, nat_ip)
289 if dst_ip is not None:
290 self.assertEqual(packet[IP].dst, dst_ip)
291 if packet.haslayer(TCP):
293 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
296 packet[TCP].sport, self.tcp_port_in)
297 self.tcp_port_out = packet[TCP].sport
298 self.check_tcp_checksum(packet)
299 elif packet.haslayer(UDP):
301 self.assertEqual(packet[UDP].sport, self.udp_port_in)
304 packet[UDP].sport, self.udp_port_in)
305 self.udp_port_out = packet[UDP].sport
308 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
310 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
311 self.icmp_id_out = packet[ICMP].id
312 self.check_icmp_checksum(packet)
314 self.logger.error(ppp("Unexpected or invalid packet "
315 "(outside network):", packet))
318 def verify_capture_in(self, capture, in_if, packet_num=3):
320 Verify captured packets on inside network
322 :param capture: Captured packets
323 :param in_if: Inside interface
324 :param packet_num: Expected number of packets (Default 3)
326 self.assertEqual(packet_num, len(capture))
327 for packet in capture:
329 self.check_ip_checksum(packet)
330 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
331 if packet.haslayer(TCP):
332 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
333 self.check_tcp_checksum(packet)
334 elif packet.haslayer(UDP):
335 self.assertEqual(packet[UDP].dport, self.udp_port_in)
337 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
338 self.check_icmp_checksum(packet)
340 self.logger.error(ppp("Unexpected or invalid packet "
341 "(inside network):", packet))
344 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
346 Verify captured IPv6 packets on inside network
348 :param capture: Captured packets
349 :param src_ip: Source IP
350 :param dst_ip: Destination IP address
351 :param packet_num: Expected number of packets (Default 3)
353 self.assertEqual(packet_num, len(capture))
354 for packet in capture:
356 self.assertEqual(packet[IPv6].src, src_ip)
357 self.assertEqual(packet[IPv6].dst, dst_ip)
358 if packet.haslayer(TCP):
359 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
360 self.check_tcp_checksum(packet)
361 elif packet.haslayer(UDP):
362 self.assertEqual(packet[UDP].dport, self.udp_port_in)
363 self.check_udp_checksum(packet)
365 self.assertEqual(packet[ICMPv6EchoReply].id,
367 self.check_icmpv6_checksum(packet)
369 self.logger.error(ppp("Unexpected or invalid packet "
370 "(inside network):", packet))
373 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
375 Verify captured packet that don't have to be translated
377 :param capture: Captured packets
378 :param ingress_if: Ingress interface
379 :param egress_if: Egress interface
381 for packet in capture:
383 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
384 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
385 if packet.haslayer(TCP):
386 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
387 elif packet.haslayer(UDP):
388 self.assertEqual(packet[UDP].sport, self.udp_port_in)
390 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
392 self.logger.error(ppp("Unexpected or invalid packet "
393 "(inside network):", packet))
396 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
397 packet_num=3, icmp_type=11):
399 Verify captured packets with ICMP errors on outside network
401 :param capture: Captured packets
402 :param src_ip: Translated IP address or IP address of VPP
403 (Default use global NAT address)
404 :param packet_num: Expected number of packets (Default 3)
405 :param icmp_type: Type of error ICMP packet
406 we are expecting (Default 11)
409 src_ip = self.nat_addr
410 self.assertEqual(packet_num, len(capture))
411 for packet in capture:
413 self.assertEqual(packet[IP].src, src_ip)
414 self.assertTrue(packet.haslayer(ICMP))
416 self.assertEqual(icmp.type, icmp_type)
417 self.assertTrue(icmp.haslayer(IPerror))
418 inner_ip = icmp[IPerror]
419 if inner_ip.haslayer(TCPerror):
420 self.assertEqual(inner_ip[TCPerror].dport,
422 elif inner_ip.haslayer(UDPerror):
423 self.assertEqual(inner_ip[UDPerror].dport,
426 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
428 self.logger.error(ppp("Unexpected or invalid packet "
429 "(outside network):", packet))
432 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
435 Verify captured packets with ICMP errors on inside network
437 :param capture: Captured packets
438 :param in_if: Inside interface
439 :param packet_num: Expected number of packets (Default 3)
440 :param icmp_type: Type of error ICMP packet
441 we are expecting (Default 11)
443 self.assertEqual(packet_num, len(capture))
444 for packet in capture:
446 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447 self.assertTrue(packet.haslayer(ICMP))
449 self.assertEqual(icmp.type, icmp_type)
450 self.assertTrue(icmp.haslayer(IPerror))
451 inner_ip = icmp[IPerror]
452 if inner_ip.haslayer(TCPerror):
453 self.assertEqual(inner_ip[TCPerror].sport,
455 elif inner_ip.haslayer(UDPerror):
456 self.assertEqual(inner_ip[UDPerror].sport,
459 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
461 self.logger.error(ppp("Unexpected or invalid packet "
462 "(inside network):", packet))
465 def verify_ipfix_nat44_ses(self, data):
467 Verify IPFIX NAT44 session create/delete event
469 :param data: Decoded IPFIX data records
471 nat44_ses_create_num = 0
472 nat44_ses_delete_num = 0
473 self.assertEqual(6, len(data))
476 self.assertIn(ord(record[230]), [4, 5])
477 if ord(record[230]) == 4:
478 nat44_ses_create_num += 1
480 nat44_ses_delete_num += 1
482 self.assertEqual(self.pg0.remote_ip4n, record[8])
483 # postNATSourceIPv4Address
484 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
487 self.assertEqual(struct.pack("!I", 0), record[234])
488 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
489 if IP_PROTOS.icmp == ord(record[4]):
490 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
491 self.assertEqual(struct.pack("!H", self.icmp_id_out),
493 elif IP_PROTOS.tcp == ord(record[4]):
494 self.assertEqual(struct.pack("!H", self.tcp_port_in),
496 self.assertEqual(struct.pack("!H", self.tcp_port_out),
498 elif IP_PROTOS.udp == ord(record[4]):
499 self.assertEqual(struct.pack("!H", self.udp_port_in),
501 self.assertEqual(struct.pack("!H", self.udp_port_out),
504 self.fail("Invalid protocol")
505 self.assertEqual(3, nat44_ses_create_num)
506 self.assertEqual(3, nat44_ses_delete_num)
508 def verify_ipfix_addr_exhausted(self, data):
510 Verify IPFIX NAT addresses event
512 :param data: Decoded IPFIX data records
514 self.assertEqual(1, len(data))
517 self.assertEqual(ord(record[230]), 3)
519 self.assertEqual(struct.pack("!I", 0), record[283])
522 class TestNAT44(MethodHolder):
523 """ NAT44 Test Cases """
527 super(TestNAT44, cls).setUpClass()
530 cls.tcp_port_in = 6303
531 cls.tcp_port_out = 6303
532 cls.udp_port_in = 6304
533 cls.udp_port_out = 6304
534 cls.icmp_id_in = 6305
535 cls.icmp_id_out = 6305
536 cls.nat_addr = '10.0.0.3'
537 cls.ipfix_src_port = 4739
538 cls.ipfix_domain_id = 1
540 cls.create_pg_interfaces(range(9))
541 cls.interfaces = list(cls.pg_interfaces[0:4])
543 for i in cls.interfaces:
548 cls.pg0.generate_remote_hosts(3)
549 cls.pg0.configure_ipv4_neighbors()
551 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
553 cls.pg4._local_ip4 = "172.16.255.1"
554 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
555 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
556 cls.pg4.set_table_ip4(10)
557 cls.pg5._local_ip4 = "172.17.255.3"
558 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
559 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
560 cls.pg5.set_table_ip4(10)
561 cls.pg6._local_ip4 = "172.16.255.1"
562 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
563 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
564 cls.pg6.set_table_ip4(20)
565 for i in cls.overlapping_interfaces:
574 super(TestNAT44, cls).tearDownClass()
577 def clear_nat44(self):
579 Clear NAT44 configuration.
581 # I found no elegant way to do this
582 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
583 dst_address_length=32,
584 next_hop_address=self.pg7.remote_ip4n,
585 next_hop_sw_if_index=self.pg7.sw_if_index,
587 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
588 dst_address_length=32,
589 next_hop_address=self.pg8.remote_ip4n,
590 next_hop_sw_if_index=self.pg8.sw_if_index,
593 for intf in [self.pg7, self.pg8]:
594 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
596 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
601 if self.pg7.has_ip4_config:
602 self.pg7.unconfig_ip4()
604 interfaces = self.vapi.nat44_interface_addr_dump()
605 for intf in interfaces:
606 self.vapi.nat44_add_interface_addr(intf.sw_if_index, is_add=0)
608 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
609 domain_id=self.ipfix_domain_id)
610 self.ipfix_src_port = 4739
611 self.ipfix_domain_id = 1
613 interfaces = self.vapi.nat44_interface_dump()
614 for intf in interfaces:
615 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
619 interfaces = self.vapi.nat44_interface_output_feature_dump()
620 for intf in interfaces:
621 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
625 static_mappings = self.vapi.nat44_static_mapping_dump()
626 for sm in static_mappings:
627 self.vapi.nat44_add_del_static_mapping(
629 sm.external_ip_address,
630 local_port=sm.local_port,
631 external_port=sm.external_port,
632 addr_only=sm.addr_only,
634 protocol=sm.protocol,
637 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
638 for lb_sm in lb_static_mappings:
639 self.vapi.nat44_add_del_lb_static_mapping(
646 adresses = self.vapi.nat44_address_dump()
647 for addr in adresses:
648 self.vapi.nat44_add_del_address_range(addr.ip_address,
652 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
653 local_port=0, external_port=0, vrf_id=0,
654 is_add=1, external_sw_if_index=0xFFFFFFFF,
657 Add/delete NAT44 static mapping
659 :param local_ip: Local IP address
660 :param external_ip: External IP address
661 :param local_port: Local port number (Optional)
662 :param external_port: External port number (Optional)
663 :param vrf_id: VRF ID (Default 0)
664 :param is_add: 1 if add, 0 if delete (Default add)
665 :param external_sw_if_index: External interface instead of IP address
666 :param proto: IP protocol (Mandatory if port specified)
669 if local_port and external_port:
671 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
672 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
673 self.vapi.nat44_add_del_static_mapping(
676 external_sw_if_index,
684 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
686 Add/delete NAT44 address
688 :param ip: IP address
689 :param is_add: 1 if add, 0 if delete (Default add)
691 nat_addr = socket.inet_pton(socket.AF_INET, ip)
692 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
695 def test_dynamic(self):
696 """ NAT44 dynamic translation test """
698 self.nat44_add_address(self.nat_addr)
699 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
700 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
704 pkts = self.create_stream_in(self.pg0, self.pg1)
705 self.pg0.add_stream(pkts)
706 self.pg_enable_capture(self.pg_interfaces)
708 capture = self.pg1.get_capture(len(pkts))
709 self.verify_capture_out(capture)
712 pkts = self.create_stream_out(self.pg1)
713 self.pg1.add_stream(pkts)
714 self.pg_enable_capture(self.pg_interfaces)
716 capture = self.pg0.get_capture(len(pkts))
717 self.verify_capture_in(capture, self.pg0)
719 def test_dynamic_icmp_errors_in2out_ttl_1(self):
720 """ NAT44 handling of client packets with TTL=1 """
722 self.nat44_add_address(self.nat_addr)
723 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
724 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
727 # Client side - generate traffic
728 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
729 self.pg0.add_stream(pkts)
730 self.pg_enable_capture(self.pg_interfaces)
733 # Client side - verify ICMP type 11 packets
734 capture = self.pg0.get_capture(len(pkts))
735 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
737 def test_dynamic_icmp_errors_out2in_ttl_1(self):
738 """ NAT44 handling of server packets with TTL=1 """
740 self.nat44_add_address(self.nat_addr)
741 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
742 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
745 # Client side - create sessions
746 pkts = self.create_stream_in(self.pg0, self.pg1)
747 self.pg0.add_stream(pkts)
748 self.pg_enable_capture(self.pg_interfaces)
751 # Server side - generate traffic
752 capture = self.pg1.get_capture(len(pkts))
753 self.verify_capture_out(capture)
754 pkts = self.create_stream_out(self.pg1, ttl=1)
755 self.pg1.add_stream(pkts)
756 self.pg_enable_capture(self.pg_interfaces)
759 # Server side - verify ICMP type 11 packets
760 capture = self.pg1.get_capture(len(pkts))
761 self.verify_capture_out_with_icmp_errors(capture,
762 src_ip=self.pg1.local_ip4)
764 def test_dynamic_icmp_errors_in2out_ttl_2(self):
765 """ NAT44 handling of error responses to client packets with TTL=2 """
767 self.nat44_add_address(self.nat_addr)
768 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
769 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
772 # Client side - generate traffic
773 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
774 self.pg0.add_stream(pkts)
775 self.pg_enable_capture(self.pg_interfaces)
778 # Server side - simulate ICMP type 11 response
779 capture = self.pg1.get_capture(len(pkts))
780 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
781 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
782 ICMP(type=11) / packet[IP] for packet in capture]
783 self.pg1.add_stream(pkts)
784 self.pg_enable_capture(self.pg_interfaces)
787 # Client side - verify ICMP type 11 packets
788 capture = self.pg0.get_capture(len(pkts))
789 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
791 def test_dynamic_icmp_errors_out2in_ttl_2(self):
792 """ NAT44 handling of error responses to server packets with TTL=2 """
794 self.nat44_add_address(self.nat_addr)
795 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
796 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
799 # Client side - create sessions
800 pkts = self.create_stream_in(self.pg0, self.pg1)
801 self.pg0.add_stream(pkts)
802 self.pg_enable_capture(self.pg_interfaces)
805 # Server side - generate traffic
806 capture = self.pg1.get_capture(len(pkts))
807 self.verify_capture_out(capture)
808 pkts = self.create_stream_out(self.pg1, ttl=2)
809 self.pg1.add_stream(pkts)
810 self.pg_enable_capture(self.pg_interfaces)
813 # Client side - simulate ICMP type 11 response
814 capture = self.pg0.get_capture(len(pkts))
815 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
816 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
817 ICMP(type=11) / packet[IP] for packet in capture]
818 self.pg0.add_stream(pkts)
819 self.pg_enable_capture(self.pg_interfaces)
822 # Server side - verify ICMP type 11 packets
823 capture = self.pg1.get_capture(len(pkts))
824 self.verify_capture_out_with_icmp_errors(capture)
826 def test_ping_out_interface_from_outside(self):
827 """ Ping NAT44 out interface from outside network """
829 self.nat44_add_address(self.nat_addr)
830 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
831 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
834 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
835 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
836 ICMP(id=self.icmp_id_out, type='echo-request'))
838 self.pg1.add_stream(pkts)
839 self.pg_enable_capture(self.pg_interfaces)
841 capture = self.pg1.get_capture(len(pkts))
842 self.assertEqual(1, len(capture))
845 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
846 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
847 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
848 self.assertEqual(packet[ICMP].type, 0) # echo reply
850 self.logger.error(ppp("Unexpected or invalid packet "
851 "(outside network):", packet))
854 def test_ping_internal_host_from_outside(self):
855 """ Ping internal host from outside network """
857 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
858 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
859 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
863 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
864 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
865 ICMP(id=self.icmp_id_out, type='echo-request'))
866 self.pg1.add_stream(pkt)
867 self.pg_enable_capture(self.pg_interfaces)
869 capture = self.pg0.get_capture(1)
870 self.verify_capture_in(capture, self.pg0, packet_num=1)
871 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
874 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
875 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
876 ICMP(id=self.icmp_id_in, type='echo-reply'))
877 self.pg0.add_stream(pkt)
878 self.pg_enable_capture(self.pg_interfaces)
880 capture = self.pg1.get_capture(1)
881 self.verify_capture_out(capture, same_port=True, packet_num=1)
882 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
884 def test_static_in(self):
885 """ 1:1 NAT initialized from inside network """
888 self.tcp_port_out = 6303
889 self.udp_port_out = 6304
890 self.icmp_id_out = 6305
892 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
893 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
894 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
898 pkts = self.create_stream_in(self.pg0, self.pg1)
899 self.pg0.add_stream(pkts)
900 self.pg_enable_capture(self.pg_interfaces)
902 capture = self.pg1.get_capture(len(pkts))
903 self.verify_capture_out(capture, nat_ip, True)
906 pkts = self.create_stream_out(self.pg1, nat_ip)
907 self.pg1.add_stream(pkts)
908 self.pg_enable_capture(self.pg_interfaces)
910 capture = self.pg0.get_capture(len(pkts))
911 self.verify_capture_in(capture, self.pg0)
913 def test_static_out(self):
914 """ 1:1 NAT initialized from outside network """
917 self.tcp_port_out = 6303
918 self.udp_port_out = 6304
919 self.icmp_id_out = 6305
921 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
922 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
923 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
927 pkts = self.create_stream_out(self.pg1, nat_ip)
928 self.pg1.add_stream(pkts)
929 self.pg_enable_capture(self.pg_interfaces)
931 capture = self.pg0.get_capture(len(pkts))
932 self.verify_capture_in(capture, self.pg0)
935 pkts = self.create_stream_in(self.pg0, self.pg1)
936 self.pg0.add_stream(pkts)
937 self.pg_enable_capture(self.pg_interfaces)
939 capture = self.pg1.get_capture(len(pkts))
940 self.verify_capture_out(capture, nat_ip, True)
942 def test_static_with_port_in(self):
943 """ 1:1 NAPT initialized from inside network """
945 self.tcp_port_out = 3606
946 self.udp_port_out = 3607
947 self.icmp_id_out = 3608
949 self.nat44_add_address(self.nat_addr)
950 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
951 self.tcp_port_in, self.tcp_port_out,
953 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
954 self.udp_port_in, self.udp_port_out,
956 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
957 self.icmp_id_in, self.icmp_id_out,
958 proto=IP_PROTOS.icmp)
959 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
960 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
964 pkts = self.create_stream_in(self.pg0, self.pg1)
965 self.pg0.add_stream(pkts)
966 self.pg_enable_capture(self.pg_interfaces)
968 capture = self.pg1.get_capture(len(pkts))
969 self.verify_capture_out(capture)
972 pkts = self.create_stream_out(self.pg1)
973 self.pg1.add_stream(pkts)
974 self.pg_enable_capture(self.pg_interfaces)
976 capture = self.pg0.get_capture(len(pkts))
977 self.verify_capture_in(capture, self.pg0)
979 def test_static_with_port_out(self):
980 """ 1:1 NAPT initialized from outside network """
982 self.tcp_port_out = 30606
983 self.udp_port_out = 30607
984 self.icmp_id_out = 30608
986 self.nat44_add_address(self.nat_addr)
987 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
988 self.tcp_port_in, self.tcp_port_out,
990 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
991 self.udp_port_in, self.udp_port_out,
993 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
994 self.icmp_id_in, self.icmp_id_out,
995 proto=IP_PROTOS.icmp)
996 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
997 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1001 pkts = self.create_stream_out(self.pg1)
1002 self.pg1.add_stream(pkts)
1003 self.pg_enable_capture(self.pg_interfaces)
1005 capture = self.pg0.get_capture(len(pkts))
1006 self.verify_capture_in(capture, self.pg0)
1009 pkts = self.create_stream_in(self.pg0, self.pg1)
1010 self.pg0.add_stream(pkts)
1011 self.pg_enable_capture(self.pg_interfaces)
1013 capture = self.pg1.get_capture(len(pkts))
1014 self.verify_capture_out(capture)
1016 def test_static_vrf_aware(self):
1017 """ 1:1 NAT VRF awareness """
1019 nat_ip1 = "10.0.0.30"
1020 nat_ip2 = "10.0.0.40"
1021 self.tcp_port_out = 6303
1022 self.udp_port_out = 6304
1023 self.icmp_id_out = 6305
1025 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1027 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1029 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1031 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1032 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1034 # inside interface VRF match NAT44 static mapping VRF
1035 pkts = self.create_stream_in(self.pg4, self.pg3)
1036 self.pg4.add_stream(pkts)
1037 self.pg_enable_capture(self.pg_interfaces)
1039 capture = self.pg3.get_capture(len(pkts))
1040 self.verify_capture_out(capture, nat_ip1, True)
1042 # inside interface VRF don't match NAT44 static mapping VRF (packets
1044 pkts = self.create_stream_in(self.pg0, self.pg3)
1045 self.pg0.add_stream(pkts)
1046 self.pg_enable_capture(self.pg_interfaces)
1048 self.pg3.assert_nothing_captured()
1050 def test_static_lb(self):
1051 """ NAT44 local service load balancing """
1052 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1055 server1 = self.pg0.remote_hosts[0]
1056 server2 = self.pg0.remote_hosts[1]
1058 locals = [{'addr': server1.ip4n,
1061 {'addr': server2.ip4n,
1065 self.nat44_add_address(self.nat_addr)
1066 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1069 local_num=len(locals),
1071 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1072 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1075 # from client to service
1076 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1077 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1078 TCP(sport=12345, dport=external_port))
1079 self.pg1.add_stream(p)
1080 self.pg_enable_capture(self.pg_interfaces)
1082 capture = self.pg0.get_capture(1)
1088 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1089 if ip.dst == server1.ip4:
1093 self.assertEqual(tcp.dport, local_port)
1094 self.check_tcp_checksum(p)
1095 self.check_ip_checksum(p)
1097 self.logger.error(ppp("Unexpected or invalid packet:", p))
1100 # from service back to client
1101 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1102 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1103 TCP(sport=local_port, dport=12345))
1104 self.pg0.add_stream(p)
1105 self.pg_enable_capture(self.pg_interfaces)
1107 capture = self.pg1.get_capture(1)
1112 self.assertEqual(ip.src, self.nat_addr)
1113 self.assertEqual(tcp.sport, external_port)
1114 self.check_tcp_checksum(p)
1115 self.check_ip_checksum(p)
1117 self.logger.error(ppp("Unexpected or invalid packet:", p))
1123 clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1125 for client in clients:
1126 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1127 IP(src=client, dst=self.nat_addr) /
1128 TCP(sport=12345, dport=external_port))
1130 self.pg1.add_stream(pkts)
1131 self.pg_enable_capture(self.pg_interfaces)
1133 capture = self.pg0.get_capture(len(pkts))
1135 if p[IP].dst == server1.ip4:
1139 self.assertTrue(server1_n > server2_n)
1141 def test_multiple_inside_interfaces(self):
1142 """ NAT44 multiple non-overlapping address space inside interfaces """
1144 self.nat44_add_address(self.nat_addr)
1145 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1146 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1147 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1150 # between two NAT44 inside interfaces (no translation)
1151 pkts = self.create_stream_in(self.pg0, self.pg1)
1152 self.pg0.add_stream(pkts)
1153 self.pg_enable_capture(self.pg_interfaces)
1155 capture = self.pg1.get_capture(len(pkts))
1156 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1158 # from NAT44 inside to interface without NAT44 feature (no translation)
1159 pkts = self.create_stream_in(self.pg0, self.pg2)
1160 self.pg0.add_stream(pkts)
1161 self.pg_enable_capture(self.pg_interfaces)
1163 capture = self.pg2.get_capture(len(pkts))
1164 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1166 # in2out 1st interface
1167 pkts = self.create_stream_in(self.pg0, self.pg3)
1168 self.pg0.add_stream(pkts)
1169 self.pg_enable_capture(self.pg_interfaces)
1171 capture = self.pg3.get_capture(len(pkts))
1172 self.verify_capture_out(capture)
1174 # out2in 1st interface
1175 pkts = self.create_stream_out(self.pg3)
1176 self.pg3.add_stream(pkts)
1177 self.pg_enable_capture(self.pg_interfaces)
1179 capture = self.pg0.get_capture(len(pkts))
1180 self.verify_capture_in(capture, self.pg0)
1182 # in2out 2nd interface
1183 pkts = self.create_stream_in(self.pg1, self.pg3)
1184 self.pg1.add_stream(pkts)
1185 self.pg_enable_capture(self.pg_interfaces)
1187 capture = self.pg3.get_capture(len(pkts))
1188 self.verify_capture_out(capture)
1190 # out2in 2nd interface
1191 pkts = self.create_stream_out(self.pg3)
1192 self.pg3.add_stream(pkts)
1193 self.pg_enable_capture(self.pg_interfaces)
1195 capture = self.pg1.get_capture(len(pkts))
1196 self.verify_capture_in(capture, self.pg1)
1198 def test_inside_overlapping_interfaces(self):
1199 """ NAT44 multiple inside interfaces with overlapping address space """
1201 static_nat_ip = "10.0.0.10"
1202 self.nat44_add_address(self.nat_addr)
1203 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1205 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1206 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1207 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1208 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1211 # between NAT44 inside interfaces with same VRF (no translation)
1212 pkts = self.create_stream_in(self.pg4, self.pg5)
1213 self.pg4.add_stream(pkts)
1214 self.pg_enable_capture(self.pg_interfaces)
1216 capture = self.pg5.get_capture(len(pkts))
1217 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1219 # between NAT44 inside interfaces with different VRF (hairpinning)
1220 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1221 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1222 TCP(sport=1234, dport=5678))
1223 self.pg4.add_stream(p)
1224 self.pg_enable_capture(self.pg_interfaces)
1226 capture = self.pg6.get_capture(1)
1231 self.assertEqual(ip.src, self.nat_addr)
1232 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1233 self.assertNotEqual(tcp.sport, 1234)
1234 self.assertEqual(tcp.dport, 5678)
1236 self.logger.error(ppp("Unexpected or invalid packet:", p))
1239 # in2out 1st interface
1240 pkts = self.create_stream_in(self.pg4, self.pg3)
1241 self.pg4.add_stream(pkts)
1242 self.pg_enable_capture(self.pg_interfaces)
1244 capture = self.pg3.get_capture(len(pkts))
1245 self.verify_capture_out(capture)
1247 # out2in 1st interface
1248 pkts = self.create_stream_out(self.pg3)
1249 self.pg3.add_stream(pkts)
1250 self.pg_enable_capture(self.pg_interfaces)
1252 capture = self.pg4.get_capture(len(pkts))
1253 self.verify_capture_in(capture, self.pg4)
1255 # in2out 2nd interface
1256 pkts = self.create_stream_in(self.pg5, self.pg3)
1257 self.pg5.add_stream(pkts)
1258 self.pg_enable_capture(self.pg_interfaces)
1260 capture = self.pg3.get_capture(len(pkts))
1261 self.verify_capture_out(capture)
1263 # out2in 2nd interface
1264 pkts = self.create_stream_out(self.pg3)
1265 self.pg3.add_stream(pkts)
1266 self.pg_enable_capture(self.pg_interfaces)
1268 capture = self.pg5.get_capture(len(pkts))
1269 self.verify_capture_in(capture, self.pg5)
1272 addresses = self.vapi.nat44_address_dump()
1273 self.assertEqual(len(addresses), 1)
1274 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1275 self.assertEqual(len(sessions), 3)
1276 for session in sessions:
1277 self.assertFalse(session.is_static)
1278 self.assertEqual(session.inside_ip_address[0:4],
1279 self.pg5.remote_ip4n)
1280 self.assertEqual(session.outside_ip_address,
1281 addresses[0].ip_address)
1282 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1283 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1284 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1285 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1286 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1287 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1288 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1289 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1290 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1292 # in2out 3rd interface
1293 pkts = self.create_stream_in(self.pg6, self.pg3)
1294 self.pg6.add_stream(pkts)
1295 self.pg_enable_capture(self.pg_interfaces)
1297 capture = self.pg3.get_capture(len(pkts))
1298 self.verify_capture_out(capture, static_nat_ip, True)
1300 # out2in 3rd interface
1301 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1302 self.pg3.add_stream(pkts)
1303 self.pg_enable_capture(self.pg_interfaces)
1305 capture = self.pg6.get_capture(len(pkts))
1306 self.verify_capture_in(capture, self.pg6)
1308 # general user and session dump verifications
1309 users = self.vapi.nat44_user_dump()
1310 self.assertTrue(len(users) >= 3)
1311 addresses = self.vapi.nat44_address_dump()
1312 self.assertEqual(len(addresses), 1)
1314 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1316 for session in sessions:
1317 self.assertEqual(user.ip_address, session.inside_ip_address)
1318 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1319 self.assertTrue(session.protocol in
1320 [IP_PROTOS.tcp, IP_PROTOS.udp,
1324 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1325 self.assertTrue(len(sessions) >= 4)
1326 for session in sessions:
1327 self.assertFalse(session.is_static)
1328 self.assertEqual(session.inside_ip_address[0:4],
1329 self.pg4.remote_ip4n)
1330 self.assertEqual(session.outside_ip_address,
1331 addresses[0].ip_address)
1334 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1335 self.assertTrue(len(sessions) >= 3)
1336 for session in sessions:
1337 self.assertTrue(session.is_static)
1338 self.assertEqual(session.inside_ip_address[0:4],
1339 self.pg6.remote_ip4n)
1340 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1341 map(int, static_nat_ip.split('.')))
1342 self.assertTrue(session.inside_port in
1343 [self.tcp_port_in, self.udp_port_in,
1346 def test_hairpinning(self):
1347 """ NAT44 hairpinning - 1:1 NAPT """
1349 host = self.pg0.remote_hosts[0]
1350 server = self.pg0.remote_hosts[1]
1353 server_in_port = 5678
1354 server_out_port = 8765
1356 self.nat44_add_address(self.nat_addr)
1357 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1358 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1360 # add static mapping for server
1361 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1362 server_in_port, server_out_port,
1363 proto=IP_PROTOS.tcp)
1365 # send packet from host to server
1366 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1367 IP(src=host.ip4, dst=self.nat_addr) /
1368 TCP(sport=host_in_port, dport=server_out_port))
1369 self.pg0.add_stream(p)
1370 self.pg_enable_capture(self.pg_interfaces)
1372 capture = self.pg0.get_capture(1)
1377 self.assertEqual(ip.src, self.nat_addr)
1378 self.assertEqual(ip.dst, server.ip4)
1379 self.assertNotEqual(tcp.sport, host_in_port)
1380 self.assertEqual(tcp.dport, server_in_port)
1381 self.check_tcp_checksum(p)
1382 host_out_port = tcp.sport
1384 self.logger.error(ppp("Unexpected or invalid packet:", p))
1387 # send reply from server to host
1388 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1389 IP(src=server.ip4, dst=self.nat_addr) /
1390 TCP(sport=server_in_port, dport=host_out_port))
1391 self.pg0.add_stream(p)
1392 self.pg_enable_capture(self.pg_interfaces)
1394 capture = self.pg0.get_capture(1)
1399 self.assertEqual(ip.src, self.nat_addr)
1400 self.assertEqual(ip.dst, host.ip4)
1401 self.assertEqual(tcp.sport, server_out_port)
1402 self.assertEqual(tcp.dport, host_in_port)
1403 self.check_tcp_checksum(p)
1405 self.logger.error(ppp("Unexpected or invalid packet:"), p)
1408 def test_hairpinning2(self):
1409 """ NAT44 hairpinning - 1:1 NAT"""
1411 server1_nat_ip = "10.0.0.10"
1412 server2_nat_ip = "10.0.0.11"
1413 host = self.pg0.remote_hosts[0]
1414 server1 = self.pg0.remote_hosts[1]
1415 server2 = self.pg0.remote_hosts[2]
1416 server_tcp_port = 22
1417 server_udp_port = 20
1419 self.nat44_add_address(self.nat_addr)
1420 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1421 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1424 # add static mapping for servers
1425 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1426 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1430 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1431 IP(src=host.ip4, dst=server1_nat_ip) /
1432 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1434 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1435 IP(src=host.ip4, dst=server1_nat_ip) /
1436 UDP(sport=self.udp_port_in, dport=server_udp_port))
1438 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1439 IP(src=host.ip4, dst=server1_nat_ip) /
1440 ICMP(id=self.icmp_id_in, type='echo-request'))
1442 self.pg0.add_stream(pkts)
1443 self.pg_enable_capture(self.pg_interfaces)
1445 capture = self.pg0.get_capture(len(pkts))
1446 for packet in capture:
1448 self.assertEqual(packet[IP].src, self.nat_addr)
1449 self.assertEqual(packet[IP].dst, server1.ip4)
1450 if packet.haslayer(TCP):
1451 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1452 self.assertEqual(packet[TCP].dport, server_tcp_port)
1453 self.tcp_port_out = packet[TCP].sport
1454 self.check_tcp_checksum(packet)
1455 elif packet.haslayer(UDP):
1456 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1457 self.assertEqual(packet[UDP].dport, server_udp_port)
1458 self.udp_port_out = packet[UDP].sport
1460 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1461 self.icmp_id_out = packet[ICMP].id
1463 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1468 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1469 IP(src=server1.ip4, dst=self.nat_addr) /
1470 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1472 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1473 IP(src=server1.ip4, dst=self.nat_addr) /
1474 UDP(sport=server_udp_port, dport=self.udp_port_out))
1476 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1477 IP(src=server1.ip4, dst=self.nat_addr) /
1478 ICMP(id=self.icmp_id_out, type='echo-reply'))
1480 self.pg0.add_stream(pkts)
1481 self.pg_enable_capture(self.pg_interfaces)
1483 capture = self.pg0.get_capture(len(pkts))
1484 for packet in capture:
1486 self.assertEqual(packet[IP].src, server1_nat_ip)
1487 self.assertEqual(packet[IP].dst, host.ip4)
1488 if packet.haslayer(TCP):
1489 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1490 self.assertEqual(packet[TCP].sport, server_tcp_port)
1491 self.check_tcp_checksum(packet)
1492 elif packet.haslayer(UDP):
1493 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1494 self.assertEqual(packet[UDP].sport, server_udp_port)
1496 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1498 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1501 # server2 to server1
1503 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1504 IP(src=server2.ip4, dst=server1_nat_ip) /
1505 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1507 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1508 IP(src=server2.ip4, dst=server1_nat_ip) /
1509 UDP(sport=self.udp_port_in, dport=server_udp_port))
1511 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1512 IP(src=server2.ip4, dst=server1_nat_ip) /
1513 ICMP(id=self.icmp_id_in, type='echo-request'))
1515 self.pg0.add_stream(pkts)
1516 self.pg_enable_capture(self.pg_interfaces)
1518 capture = self.pg0.get_capture(len(pkts))
1519 for packet in capture:
1521 self.assertEqual(packet[IP].src, server2_nat_ip)
1522 self.assertEqual(packet[IP].dst, server1.ip4)
1523 if packet.haslayer(TCP):
1524 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1525 self.assertEqual(packet[TCP].dport, server_tcp_port)
1526 self.tcp_port_out = packet[TCP].sport
1527 self.check_tcp_checksum(packet)
1528 elif packet.haslayer(UDP):
1529 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1530 self.assertEqual(packet[UDP].dport, server_udp_port)
1531 self.udp_port_out = packet[UDP].sport
1533 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1534 self.icmp_id_out = packet[ICMP].id
1536 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1539 # server1 to server2
1541 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1542 IP(src=server1.ip4, dst=server2_nat_ip) /
1543 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1545 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1546 IP(src=server1.ip4, dst=server2_nat_ip) /
1547 UDP(sport=server_udp_port, dport=self.udp_port_out))
1549 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1550 IP(src=server1.ip4, dst=server2_nat_ip) /
1551 ICMP(id=self.icmp_id_out, type='echo-reply'))
1553 self.pg0.add_stream(pkts)
1554 self.pg_enable_capture(self.pg_interfaces)
1556 capture = self.pg0.get_capture(len(pkts))
1557 for packet in capture:
1559 self.assertEqual(packet[IP].src, server1_nat_ip)
1560 self.assertEqual(packet[IP].dst, server2.ip4)
1561 if packet.haslayer(TCP):
1562 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1563 self.assertEqual(packet[TCP].sport, server_tcp_port)
1564 self.check_tcp_checksum(packet)
1565 elif packet.haslayer(UDP):
1566 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1567 self.assertEqual(packet[UDP].sport, server_udp_port)
1569 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1571 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1574 def test_max_translations_per_user(self):
1575 """ MAX translations per user - recycle the least recently used """
1577 self.nat44_add_address(self.nat_addr)
1578 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1579 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1582 # get maximum number of translations per user
1583 nat44_config = self.vapi.nat_show_config()
1585 # send more than maximum number of translations per user packets
1586 pkts_num = nat44_config.max_translations_per_user + 5
1588 for port in range(0, pkts_num):
1589 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1590 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1591 TCP(sport=1025 + port))
1593 self.pg0.add_stream(pkts)
1594 self.pg_enable_capture(self.pg_interfaces)
1597 # verify number of translated packet
1598 self.pg1.get_capture(pkts_num)
1600 def test_interface_addr(self):
1601 """ Acquire NAT44 addresses from interface """
1602 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1604 # no address in NAT pool
1605 adresses = self.vapi.nat44_address_dump()
1606 self.assertEqual(0, len(adresses))
1608 # configure interface address and check NAT address pool
1609 self.pg7.config_ip4()
1610 adresses = self.vapi.nat44_address_dump()
1611 self.assertEqual(1, len(adresses))
1612 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1614 # remove interface address and check NAT address pool
1615 self.pg7.unconfig_ip4()
1616 adresses = self.vapi.nat44_address_dump()
1617 self.assertEqual(0, len(adresses))
1619 def test_interface_addr_static_mapping(self):
1620 """ Static mapping with addresses from interface """
1621 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1622 self.nat44_add_static_mapping(
1624 external_sw_if_index=self.pg7.sw_if_index)
1626 # static mappings with external interface
1627 static_mappings = self.vapi.nat44_static_mapping_dump()
1628 self.assertEqual(1, len(static_mappings))
1629 self.assertEqual(self.pg7.sw_if_index,
1630 static_mappings[0].external_sw_if_index)
1632 # configure interface address and check static mappings
1633 self.pg7.config_ip4()
1634 static_mappings = self.vapi.nat44_static_mapping_dump()
1635 self.assertEqual(1, len(static_mappings))
1636 self.assertEqual(static_mappings[0].external_ip_address[0:4],
1637 self.pg7.local_ip4n)
1638 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1640 # remove interface address and check static mappings
1641 self.pg7.unconfig_ip4()
1642 static_mappings = self.vapi.nat44_static_mapping_dump()
1643 self.assertEqual(0, len(static_mappings))
1645 def test_ipfix_nat44_sess(self):
1646 """ IPFIX logging NAT44 session created/delted """
1647 self.ipfix_domain_id = 10
1648 self.ipfix_src_port = 20202
1649 colector_port = 30303
1650 bind_layers(UDP, IPFIX, dport=30303)
1651 self.nat44_add_address(self.nat_addr)
1652 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1653 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1655 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1656 src_address=self.pg3.local_ip4n,
1658 template_interval=10,
1659 collector_port=colector_port)
1660 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1661 src_port=self.ipfix_src_port)
1663 pkts = self.create_stream_in(self.pg0, self.pg1)
1664 self.pg0.add_stream(pkts)
1665 self.pg_enable_capture(self.pg_interfaces)
1667 capture = self.pg1.get_capture(len(pkts))
1668 self.verify_capture_out(capture)
1669 self.nat44_add_address(self.nat_addr, is_add=0)
1670 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1671 capture = self.pg3.get_capture(3)
1672 ipfix = IPFIXDecoder()
1673 # first load template
1675 self.assertTrue(p.haslayer(IPFIX))
1676 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1677 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1678 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1679 self.assertEqual(p[UDP].dport, colector_port)
1680 self.assertEqual(p[IPFIX].observationDomainID,
1681 self.ipfix_domain_id)
1682 if p.haslayer(Template):
1683 ipfix.add_template(p.getlayer(Template))
1684 # verify events in data set
1686 if p.haslayer(Data):
1687 data = ipfix.decode_data_set(p.getlayer(Set))
1688 self.verify_ipfix_nat44_ses(data)
1690 def test_ipfix_addr_exhausted(self):
1691 """ IPFIX logging NAT addresses exhausted """
1692 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1693 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1695 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1696 src_address=self.pg3.local_ip4n,
1698 template_interval=10)
1699 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1700 src_port=self.ipfix_src_port)
1702 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1703 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1705 self.pg0.add_stream(p)
1706 self.pg_enable_capture(self.pg_interfaces)
1708 capture = self.pg1.get_capture(0)
1709 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1710 capture = self.pg3.get_capture(3)
1711 ipfix = IPFIXDecoder()
1712 # first load template
1714 self.assertTrue(p.haslayer(IPFIX))
1715 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1716 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1717 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1718 self.assertEqual(p[UDP].dport, 4739)
1719 self.assertEqual(p[IPFIX].observationDomainID,
1720 self.ipfix_domain_id)
1721 if p.haslayer(Template):
1722 ipfix.add_template(p.getlayer(Template))
1723 # verify events in data set
1725 if p.haslayer(Data):
1726 data = ipfix.decode_data_set(p.getlayer(Set))
1727 self.verify_ipfix_addr_exhausted(data)
1729 def test_pool_addr_fib(self):
1730 """ NAT44 add pool addresses to FIB """
1731 static_addr = '10.0.0.10'
1732 self.nat44_add_address(self.nat_addr)
1733 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1734 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1736 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
1739 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1740 ARP(op=ARP.who_has, pdst=self.nat_addr,
1741 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1742 self.pg1.add_stream(p)
1743 self.pg_enable_capture(self.pg_interfaces)
1745 capture = self.pg1.get_capture(1)
1746 self.assertTrue(capture[0].haslayer(ARP))
1747 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1750 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1751 ARP(op=ARP.who_has, pdst=static_addr,
1752 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1753 self.pg1.add_stream(p)
1754 self.pg_enable_capture(self.pg_interfaces)
1756 capture = self.pg1.get_capture(1)
1757 self.assertTrue(capture[0].haslayer(ARP))
1758 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1760 # send ARP to non-NAT44 interface
1761 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1762 ARP(op=ARP.who_has, pdst=self.nat_addr,
1763 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1764 self.pg2.add_stream(p)
1765 self.pg_enable_capture(self.pg_interfaces)
1767 capture = self.pg1.get_capture(0)
1769 # remove addresses and verify
1770 self.nat44_add_address(self.nat_addr, is_add=0)
1771 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
1774 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1775 ARP(op=ARP.who_has, pdst=self.nat_addr,
1776 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1777 self.pg1.add_stream(p)
1778 self.pg_enable_capture(self.pg_interfaces)
1780 capture = self.pg1.get_capture(0)
1782 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1783 ARP(op=ARP.who_has, pdst=static_addr,
1784 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1785 self.pg1.add_stream(p)
1786 self.pg_enable_capture(self.pg_interfaces)
1788 capture = self.pg1.get_capture(0)
1790 def test_vrf_mode(self):
1791 """ NAT44 tenant VRF aware address pool mode """
1795 nat_ip1 = "10.0.0.10"
1796 nat_ip2 = "10.0.0.11"
1798 self.pg0.unconfig_ip4()
1799 self.pg1.unconfig_ip4()
1800 self.pg0.set_table_ip4(vrf_id1)
1801 self.pg1.set_table_ip4(vrf_id2)
1802 self.pg0.config_ip4()
1803 self.pg1.config_ip4()
1805 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
1806 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
1807 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1808 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1809 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1813 pkts = self.create_stream_in(self.pg0, self.pg2)
1814 self.pg0.add_stream(pkts)
1815 self.pg_enable_capture(self.pg_interfaces)
1817 capture = self.pg2.get_capture(len(pkts))
1818 self.verify_capture_out(capture, nat_ip1)
1821 pkts = self.create_stream_in(self.pg1, self.pg2)
1822 self.pg1.add_stream(pkts)
1823 self.pg_enable_capture(self.pg_interfaces)
1825 capture = self.pg2.get_capture(len(pkts))
1826 self.verify_capture_out(capture, nat_ip2)
1828 def test_vrf_feature_independent(self):
1829 """ NAT44 tenant VRF independent address pool mode """
1831 nat_ip1 = "10.0.0.10"
1832 nat_ip2 = "10.0.0.11"
1834 self.nat44_add_address(nat_ip1)
1835 self.nat44_add_address(nat_ip2)
1836 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1837 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1838 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1842 pkts = self.create_stream_in(self.pg0, self.pg2)
1843 self.pg0.add_stream(pkts)
1844 self.pg_enable_capture(self.pg_interfaces)
1846 capture = self.pg2.get_capture(len(pkts))
1847 self.verify_capture_out(capture, nat_ip1)
1850 pkts = self.create_stream_in(self.pg1, self.pg2)
1851 self.pg1.add_stream(pkts)
1852 self.pg_enable_capture(self.pg_interfaces)
1854 capture = self.pg2.get_capture(len(pkts))
1855 self.verify_capture_out(capture, nat_ip1)
1857 def test_dynamic_ipless_interfaces(self):
1858 """ NAT44 interfaces without configured IP address """
1860 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1861 self.pg7.remote_mac,
1862 self.pg7.remote_ip4n,
1864 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1865 self.pg8.remote_mac,
1866 self.pg8.remote_ip4n,
1869 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1870 dst_address_length=32,
1871 next_hop_address=self.pg7.remote_ip4n,
1872 next_hop_sw_if_index=self.pg7.sw_if_index)
1873 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1874 dst_address_length=32,
1875 next_hop_address=self.pg8.remote_ip4n,
1876 next_hop_sw_if_index=self.pg8.sw_if_index)
1878 self.nat44_add_address(self.nat_addr)
1879 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1880 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1884 pkts = self.create_stream_in(self.pg7, self.pg8)
1885 self.pg7.add_stream(pkts)
1886 self.pg_enable_capture(self.pg_interfaces)
1888 capture = self.pg8.get_capture(len(pkts))
1889 self.verify_capture_out(capture)
1892 pkts = self.create_stream_out(self.pg8, self.nat_addr)
1893 self.pg8.add_stream(pkts)
1894 self.pg_enable_capture(self.pg_interfaces)
1896 capture = self.pg7.get_capture(len(pkts))
1897 self.verify_capture_in(capture, self.pg7)
1899 def test_static_ipless_interfaces(self):
1900 """ NAT44 interfaces without configured IP address - 1:1 NAT """
1902 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1903 self.pg7.remote_mac,
1904 self.pg7.remote_ip4n,
1906 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1907 self.pg8.remote_mac,
1908 self.pg8.remote_ip4n,
1911 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1912 dst_address_length=32,
1913 next_hop_address=self.pg7.remote_ip4n,
1914 next_hop_sw_if_index=self.pg7.sw_if_index)
1915 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1916 dst_address_length=32,
1917 next_hop_address=self.pg8.remote_ip4n,
1918 next_hop_sw_if_index=self.pg8.sw_if_index)
1920 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
1921 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1922 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1926 pkts = self.create_stream_out(self.pg8)
1927 self.pg8.add_stream(pkts)
1928 self.pg_enable_capture(self.pg_interfaces)
1930 capture = self.pg7.get_capture(len(pkts))
1931 self.verify_capture_in(capture, self.pg7)
1934 pkts = self.create_stream_in(self.pg7, self.pg8)
1935 self.pg7.add_stream(pkts)
1936 self.pg_enable_capture(self.pg_interfaces)
1938 capture = self.pg8.get_capture(len(pkts))
1939 self.verify_capture_out(capture, self.nat_addr, True)
1941 def test_static_with_port_ipless_interfaces(self):
1942 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
1944 self.tcp_port_out = 30606
1945 self.udp_port_out = 30607
1946 self.icmp_id_out = 30608
1948 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1949 self.pg7.remote_mac,
1950 self.pg7.remote_ip4n,
1952 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1953 self.pg8.remote_mac,
1954 self.pg8.remote_ip4n,
1957 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1958 dst_address_length=32,
1959 next_hop_address=self.pg7.remote_ip4n,
1960 next_hop_sw_if_index=self.pg7.sw_if_index)
1961 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1962 dst_address_length=32,
1963 next_hop_address=self.pg8.remote_ip4n,
1964 next_hop_sw_if_index=self.pg8.sw_if_index)
1966 self.nat44_add_address(self.nat_addr)
1967 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1968 self.tcp_port_in, self.tcp_port_out,
1969 proto=IP_PROTOS.tcp)
1970 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1971 self.udp_port_in, self.udp_port_out,
1972 proto=IP_PROTOS.udp)
1973 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1974 self.icmp_id_in, self.icmp_id_out,
1975 proto=IP_PROTOS.icmp)
1976 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1977 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1981 pkts = self.create_stream_out(self.pg8)
1982 self.pg8.add_stream(pkts)
1983 self.pg_enable_capture(self.pg_interfaces)
1985 capture = self.pg7.get_capture(len(pkts))
1986 self.verify_capture_in(capture, self.pg7)
1989 pkts = self.create_stream_in(self.pg7, self.pg8)
1990 self.pg7.add_stream(pkts)
1991 self.pg_enable_capture(self.pg_interfaces)
1993 capture = self.pg8.get_capture(len(pkts))
1994 self.verify_capture_out(capture)
1996 def test_static_unknown_proto(self):
1997 """ 1:1 NAT translate packet with unknown protocol """
1998 nat_ip = "10.0.0.10"
1999 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2000 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2001 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2005 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2006 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2008 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2009 TCP(sport=1234, dport=1234))
2010 self.pg0.add_stream(p)
2011 self.pg_enable_capture(self.pg_interfaces)
2013 p = self.pg1.get_capture(1)
2016 self.assertEqual(packet[IP].src, nat_ip)
2017 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2018 self.assertTrue(packet.haslayer(GRE))
2019 self.check_ip_checksum(packet)
2021 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2025 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2026 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2028 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2029 TCP(sport=1234, dport=1234))
2030 self.pg1.add_stream(p)
2031 self.pg_enable_capture(self.pg_interfaces)
2033 p = self.pg0.get_capture(1)
2036 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2037 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2038 self.assertTrue(packet.haslayer(GRE))
2039 self.check_ip_checksum(packet)
2041 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2044 def test_hairpinning_static_unknown_proto(self):
2045 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2047 host = self.pg0.remote_hosts[0]
2048 server = self.pg0.remote_hosts[1]
2050 host_nat_ip = "10.0.0.10"
2051 server_nat_ip = "10.0.0.11"
2053 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2054 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2055 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2056 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2060 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2061 IP(src=host.ip4, dst=server_nat_ip) /
2063 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2064 TCP(sport=1234, dport=1234))
2065 self.pg0.add_stream(p)
2066 self.pg_enable_capture(self.pg_interfaces)
2068 p = self.pg0.get_capture(1)
2071 self.assertEqual(packet[IP].src, host_nat_ip)
2072 self.assertEqual(packet[IP].dst, server.ip4)
2073 self.assertTrue(packet.haslayer(GRE))
2074 self.check_ip_checksum(packet)
2076 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2080 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2081 IP(src=server.ip4, dst=host_nat_ip) /
2083 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2084 TCP(sport=1234, dport=1234))
2085 self.pg0.add_stream(p)
2086 self.pg_enable_capture(self.pg_interfaces)
2088 p = self.pg0.get_capture(1)
2091 self.assertEqual(packet[IP].src, server_nat_ip)
2092 self.assertEqual(packet[IP].dst, host.ip4)
2093 self.assertTrue(packet.haslayer(GRE))
2094 self.check_ip_checksum(packet)
2096 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2099 def test_unknown_proto(self):
2100 """ NAT44 translate packet with unknown protocol """
2101 self.nat44_add_address(self.nat_addr)
2102 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2103 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2107 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2108 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2109 TCP(sport=self.tcp_port_in, dport=20))
2110 self.pg0.add_stream(p)
2111 self.pg_enable_capture(self.pg_interfaces)
2113 p = self.pg1.get_capture(1)
2115 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2116 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2118 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2119 TCP(sport=1234, dport=1234))
2120 self.pg0.add_stream(p)
2121 self.pg_enable_capture(self.pg_interfaces)
2123 p = self.pg1.get_capture(1)
2126 self.assertEqual(packet[IP].src, self.nat_addr)
2127 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2128 self.assertTrue(packet.haslayer(GRE))
2129 self.check_ip_checksum(packet)
2131 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2135 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2136 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2138 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2139 TCP(sport=1234, dport=1234))
2140 self.pg1.add_stream(p)
2141 self.pg_enable_capture(self.pg_interfaces)
2143 p = self.pg0.get_capture(1)
2146 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2147 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2148 self.assertTrue(packet.haslayer(GRE))
2149 self.check_ip_checksum(packet)
2151 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2154 def test_hairpinning_unknown_proto(self):
2155 """ NAT44 translate packet with unknown protocol - hairpinning """
2156 host = self.pg0.remote_hosts[0]
2157 server = self.pg0.remote_hosts[1]
2160 server_in_port = 5678
2161 server_out_port = 8765
2162 server_nat_ip = "10.0.0.11"
2164 self.nat44_add_address(self.nat_addr)
2165 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2166 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2169 # add static mapping for server
2170 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2173 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2174 IP(src=host.ip4, dst=server_nat_ip) /
2175 TCP(sport=host_in_port, dport=server_out_port))
2176 self.pg0.add_stream(p)
2177 self.pg_enable_capture(self.pg_interfaces)
2179 capture = self.pg0.get_capture(1)
2181 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2182 IP(src=host.ip4, dst=server_nat_ip) /
2184 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2185 TCP(sport=1234, dport=1234))
2186 self.pg0.add_stream(p)
2187 self.pg_enable_capture(self.pg_interfaces)
2189 p = self.pg0.get_capture(1)
2192 self.assertEqual(packet[IP].src, self.nat_addr)
2193 self.assertEqual(packet[IP].dst, server.ip4)
2194 self.assertTrue(packet.haslayer(GRE))
2195 self.check_ip_checksum(packet)
2197 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2201 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2202 IP(src=server.ip4, dst=self.nat_addr) /
2204 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2205 TCP(sport=1234, dport=1234))
2206 self.pg0.add_stream(p)
2207 self.pg_enable_capture(self.pg_interfaces)
2209 p = self.pg0.get_capture(1)
2212 self.assertEqual(packet[IP].src, server_nat_ip)
2213 self.assertEqual(packet[IP].dst, host.ip4)
2214 self.assertTrue(packet.haslayer(GRE))
2215 self.check_ip_checksum(packet)
2217 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2220 def test_output_feature(self):
2221 """ NAT44 interface output feature (in2out postrouting) """
2222 self.nat44_add_address(self.nat_addr)
2223 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2224 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2228 pkts = self.create_stream_in(self.pg0, self.pg1)
2229 self.pg0.add_stream(pkts)
2230 self.pg_enable_capture(self.pg_interfaces)
2232 capture = self.pg1.get_capture(len(pkts))
2233 self.verify_capture_out(capture)
2236 pkts = self.create_stream_out(self.pg1)
2237 self.pg1.add_stream(pkts)
2238 self.pg_enable_capture(self.pg_interfaces)
2240 capture = self.pg0.get_capture(len(pkts))
2241 self.verify_capture_in(capture, self.pg0)
2243 def test_output_feature_vrf_aware(self):
2244 """ NAT44 interface output feature VRF aware (in2out postrouting) """
2245 nat_ip_vrf10 = "10.0.0.10"
2246 nat_ip_vrf20 = "10.0.0.20"
2248 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2249 dst_address_length=32,
2250 next_hop_address=self.pg3.remote_ip4n,
2251 next_hop_sw_if_index=self.pg3.sw_if_index,
2253 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2254 dst_address_length=32,
2255 next_hop_address=self.pg3.remote_ip4n,
2256 next_hop_sw_if_index=self.pg3.sw_if_index,
2259 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2260 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2261 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2262 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2263 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2267 pkts = self.create_stream_in(self.pg4, self.pg3)
2268 self.pg4.add_stream(pkts)
2269 self.pg_enable_capture(self.pg_interfaces)
2271 capture = self.pg3.get_capture(len(pkts))
2272 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2275 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2276 self.pg3.add_stream(pkts)
2277 self.pg_enable_capture(self.pg_interfaces)
2279 capture = self.pg4.get_capture(len(pkts))
2280 self.verify_capture_in(capture, self.pg4)
2283 pkts = self.create_stream_in(self.pg6, self.pg3)
2284 self.pg6.add_stream(pkts)
2285 self.pg_enable_capture(self.pg_interfaces)
2287 capture = self.pg3.get_capture(len(pkts))
2288 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2291 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2292 self.pg3.add_stream(pkts)
2293 self.pg_enable_capture(self.pg_interfaces)
2295 capture = self.pg6.get_capture(len(pkts))
2296 self.verify_capture_in(capture, self.pg6)
2298 def test_output_feature_hairpinning(self):
2299 """ NAT44 interface output feature hairpinning (in2out postrouting) """
2300 host = self.pg0.remote_hosts[0]
2301 server = self.pg0.remote_hosts[1]
2304 server_in_port = 5678
2305 server_out_port = 8765
2307 self.nat44_add_address(self.nat_addr)
2308 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2309 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2312 # add static mapping for server
2313 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2314 server_in_port, server_out_port,
2315 proto=IP_PROTOS.tcp)
2317 # send packet from host to server
2318 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2319 IP(src=host.ip4, dst=self.nat_addr) /
2320 TCP(sport=host_in_port, dport=server_out_port))
2321 self.pg0.add_stream(p)
2322 self.pg_enable_capture(self.pg_interfaces)
2324 capture = self.pg0.get_capture(1)
2329 self.assertEqual(ip.src, self.nat_addr)
2330 self.assertEqual(ip.dst, server.ip4)
2331 self.assertNotEqual(tcp.sport, host_in_port)
2332 self.assertEqual(tcp.dport, server_in_port)
2333 self.check_tcp_checksum(p)
2334 host_out_port = tcp.sport
2336 self.logger.error(ppp("Unexpected or invalid packet:", p))
2339 # send reply from server to host
2340 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2341 IP(src=server.ip4, dst=self.nat_addr) /
2342 TCP(sport=server_in_port, dport=host_out_port))
2343 self.pg0.add_stream(p)
2344 self.pg_enable_capture(self.pg_interfaces)
2346 capture = self.pg0.get_capture(1)
2351 self.assertEqual(ip.src, self.nat_addr)
2352 self.assertEqual(ip.dst, host.ip4)
2353 self.assertEqual(tcp.sport, server_out_port)
2354 self.assertEqual(tcp.dport, host_in_port)
2355 self.check_tcp_checksum(p)
2357 self.logger.error(ppp("Unexpected or invalid packet:"), p)
2361 super(TestNAT44, self).tearDown()
2362 if not self.vpp_dead:
2363 self.logger.info(self.vapi.cli("show nat44 verbose"))
2367 class TestDeterministicNAT(MethodHolder):
2368 """ Deterministic NAT Test Cases """
2371 def setUpConstants(cls):
2372 super(TestDeterministicNAT, cls).setUpConstants()
2373 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
2376 def setUpClass(cls):
2377 super(TestDeterministicNAT, cls).setUpClass()
2380 cls.tcp_port_in = 6303
2381 cls.tcp_external_port = 6303
2382 cls.udp_port_in = 6304
2383 cls.udp_external_port = 6304
2384 cls.icmp_id_in = 6305
2385 cls.nat_addr = '10.0.0.3'
2387 cls.create_pg_interfaces(range(3))
2388 cls.interfaces = list(cls.pg_interfaces)
2390 for i in cls.interfaces:
2395 cls.pg0.generate_remote_hosts(2)
2396 cls.pg0.configure_ipv4_neighbors()
2399 super(TestDeterministicNAT, cls).tearDownClass()
2402 def create_stream_in(self, in_if, out_if, ttl=64):
2404 Create packet stream for inside network
2406 :param in_if: Inside interface
2407 :param out_if: Outside interface
2408 :param ttl: TTL of generated packets
2412 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2413 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2414 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
2418 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2419 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2420 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2424 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2425 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2426 ICMP(id=self.icmp_id_in, type='echo-request'))
2431 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2433 Create packet stream for outside network
2435 :param out_if: Outside interface
2436 :param dst_ip: Destination IP address (Default use global NAT address)
2437 :param ttl: TTL of generated packets
2440 dst_ip = self.nat_addr
2443 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2444 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2445 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2449 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2450 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2451 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2455 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2456 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2457 ICMP(id=self.icmp_external_id, type='echo-reply'))
2462 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2464 Verify captured packets on outside network
2466 :param capture: Captured packets
2467 :param nat_ip: Translated IP address (Default use global NAT address)
2468 :param same_port: Sorce port number is not translated (Default False)
2469 :param packet_num: Expected number of packets (Default 3)
2472 nat_ip = self.nat_addr
2473 self.assertEqual(packet_num, len(capture))
2474 for packet in capture:
2476 self.assertEqual(packet[IP].src, nat_ip)
2477 if packet.haslayer(TCP):
2478 self.tcp_port_out = packet[TCP].sport
2479 elif packet.haslayer(UDP):
2480 self.udp_port_out = packet[UDP].sport
2482 self.icmp_external_id = packet[ICMP].id
2484 self.logger.error(ppp("Unexpected or invalid packet "
2485 "(outside network):", packet))
2488 def initiate_tcp_session(self, in_if, out_if):
2490 Initiates TCP session
2492 :param in_if: Inside interface
2493 :param out_if: Outside interface
2496 # SYN packet in->out
2497 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2498 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2499 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2502 self.pg_enable_capture(self.pg_interfaces)
2504 capture = out_if.get_capture(1)
2506 self.tcp_port_out = p[TCP].sport
2508 # SYN + ACK packet out->in
2509 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2510 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
2511 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2513 out_if.add_stream(p)
2514 self.pg_enable_capture(self.pg_interfaces)
2516 in_if.get_capture(1)
2518 # ACK packet in->out
2519 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2520 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2521 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2524 self.pg_enable_capture(self.pg_interfaces)
2526 out_if.get_capture(1)
2529 self.logger.error("TCP 3 way handshake failed")
2532 def verify_ipfix_max_entries_per_user(self, data):
2534 Verify IPFIX maximum entries per user exceeded event
2536 :param data: Decoded IPFIX data records
2538 self.assertEqual(1, len(data))
2541 self.assertEqual(ord(record[230]), 13)
2542 # natQuotaExceededEvent
2543 self.assertEqual('\x03\x00\x00\x00', record[466])
2545 self.assertEqual(self.pg0.remote_ip4n, record[8])
2547 def test_deterministic_mode(self):
2548 """ NAT plugin run deterministic mode """
2549 in_addr = '172.16.255.0'
2550 out_addr = '172.17.255.50'
2551 in_addr_t = '172.16.255.20'
2552 in_addr_n = socket.inet_aton(in_addr)
2553 out_addr_n = socket.inet_aton(out_addr)
2554 in_addr_t_n = socket.inet_aton(in_addr_t)
2558 nat_config = self.vapi.nat_show_config()
2559 self.assertEqual(1, nat_config.deterministic)
2561 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
2563 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
2564 self.assertEqual(rep1.out_addr[:4], out_addr_n)
2565 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
2566 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2568 deterministic_mappings = self.vapi.nat_det_map_dump()
2569 self.assertEqual(len(deterministic_mappings), 1)
2570 dsm = deterministic_mappings[0]
2571 self.assertEqual(in_addr_n, dsm.in_addr[:4])
2572 self.assertEqual(in_plen, dsm.in_plen)
2573 self.assertEqual(out_addr_n, dsm.out_addr[:4])
2574 self.assertEqual(out_plen, dsm.out_plen)
2576 self.clear_nat_det()
2577 deterministic_mappings = self.vapi.nat_det_map_dump()
2578 self.assertEqual(len(deterministic_mappings), 0)
2580 def test_set_timeouts(self):
2581 """ Set deterministic NAT timeouts """
2582 timeouts_before = self.vapi.nat_det_get_timeouts()
2584 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
2585 timeouts_before.tcp_established + 10,
2586 timeouts_before.tcp_transitory + 10,
2587 timeouts_before.icmp + 10)
2589 timeouts_after = self.vapi.nat_det_get_timeouts()
2591 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
2592 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
2593 self.assertNotEqual(timeouts_before.tcp_established,
2594 timeouts_after.tcp_established)
2595 self.assertNotEqual(timeouts_before.tcp_transitory,
2596 timeouts_after.tcp_transitory)
2598 def test_det_in(self):
2599 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
2601 nat_ip = "10.0.0.10"
2603 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2605 socket.inet_aton(nat_ip),
2607 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2608 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2612 pkts = self.create_stream_in(self.pg0, self.pg1)
2613 self.pg0.add_stream(pkts)
2614 self.pg_enable_capture(self.pg_interfaces)
2616 capture = self.pg1.get_capture(len(pkts))
2617 self.verify_capture_out(capture, nat_ip)
2620 pkts = self.create_stream_out(self.pg1, nat_ip)
2621 self.pg1.add_stream(pkts)
2622 self.pg_enable_capture(self.pg_interfaces)
2624 capture = self.pg0.get_capture(len(pkts))
2625 self.verify_capture_in(capture, self.pg0)
2628 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
2629 self.assertEqual(len(sessions), 3)
2633 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2634 self.assertEqual(s.in_port, self.tcp_port_in)
2635 self.assertEqual(s.out_port, self.tcp_port_out)
2636 self.assertEqual(s.ext_port, self.tcp_external_port)
2640 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2641 self.assertEqual(s.in_port, self.udp_port_in)
2642 self.assertEqual(s.out_port, self.udp_port_out)
2643 self.assertEqual(s.ext_port, self.udp_external_port)
2647 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2648 self.assertEqual(s.in_port, self.icmp_id_in)
2649 self.assertEqual(s.out_port, self.icmp_external_id)
2651 def test_multiple_users(self):
2652 """ Deterministic NAT multiple users """
2654 nat_ip = "10.0.0.10"
2656 external_port = 6303
2658 host0 = self.pg0.remote_hosts[0]
2659 host1 = self.pg0.remote_hosts[1]
2661 self.vapi.nat_det_add_del_map(host0.ip4n,
2663 socket.inet_aton(nat_ip),
2665 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2666 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2670 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2671 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2672 TCP(sport=port_in, dport=external_port))
2673 self.pg0.add_stream(p)
2674 self.pg_enable_capture(self.pg_interfaces)
2676 capture = self.pg1.get_capture(1)
2681 self.assertEqual(ip.src, nat_ip)
2682 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2683 self.assertEqual(tcp.dport, external_port)
2684 port_out0 = tcp.sport
2686 self.logger.error(ppp("Unexpected or invalid packet:", p))
2690 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2691 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2692 TCP(sport=port_in, dport=external_port))
2693 self.pg0.add_stream(p)
2694 self.pg_enable_capture(self.pg_interfaces)
2696 capture = self.pg1.get_capture(1)
2701 self.assertEqual(ip.src, nat_ip)
2702 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2703 self.assertEqual(tcp.dport, external_port)
2704 port_out1 = tcp.sport
2706 self.logger.error(ppp("Unexpected or invalid packet:", p))
2709 dms = self.vapi.nat_det_map_dump()
2710 self.assertEqual(1, len(dms))
2711 self.assertEqual(2, dms[0].ses_num)
2714 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2715 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2716 TCP(sport=external_port, dport=port_out0))
2717 self.pg1.add_stream(p)
2718 self.pg_enable_capture(self.pg_interfaces)
2720 capture = self.pg0.get_capture(1)
2725 self.assertEqual(ip.src, self.pg1.remote_ip4)
2726 self.assertEqual(ip.dst, host0.ip4)
2727 self.assertEqual(tcp.dport, port_in)
2728 self.assertEqual(tcp.sport, external_port)
2730 self.logger.error(ppp("Unexpected or invalid packet:", p))
2734 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2735 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2736 TCP(sport=external_port, dport=port_out1))
2737 self.pg1.add_stream(p)
2738 self.pg_enable_capture(self.pg_interfaces)
2740 capture = self.pg0.get_capture(1)
2745 self.assertEqual(ip.src, self.pg1.remote_ip4)
2746 self.assertEqual(ip.dst, host1.ip4)
2747 self.assertEqual(tcp.dport, port_in)
2748 self.assertEqual(tcp.sport, external_port)
2750 self.logger.error(ppp("Unexpected or invalid packet", p))
2753 # session close api test
2754 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
2756 self.pg1.remote_ip4n,
2758 dms = self.vapi.nat_det_map_dump()
2759 self.assertEqual(dms[0].ses_num, 1)
2761 self.vapi.nat_det_close_session_in(host0.ip4n,
2763 self.pg1.remote_ip4n,
2765 dms = self.vapi.nat_det_map_dump()
2766 self.assertEqual(dms[0].ses_num, 0)
2768 def test_tcp_session_close_detection_in(self):
2769 """ Deterministic NAT TCP session close from inside network """
2770 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2772 socket.inet_aton(self.nat_addr),
2774 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2775 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2778 self.initiate_tcp_session(self.pg0, self.pg1)
2780 # close the session from inside
2782 # FIN packet in -> out
2783 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2784 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2785 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2787 self.pg0.add_stream(p)
2788 self.pg_enable_capture(self.pg_interfaces)
2790 self.pg1.get_capture(1)
2794 # ACK packet out -> in
2795 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2796 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2797 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2801 # FIN packet out -> in
2802 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2803 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2804 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2808 self.pg1.add_stream(pkts)
2809 self.pg_enable_capture(self.pg_interfaces)
2811 self.pg0.get_capture(2)
2813 # ACK packet in -> out
2814 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2815 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2816 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2818 self.pg0.add_stream(p)
2819 self.pg_enable_capture(self.pg_interfaces)
2821 self.pg1.get_capture(1)
2823 # Check if deterministic NAT44 closed the session
2824 dms = self.vapi.nat_det_map_dump()
2825 self.assertEqual(0, dms[0].ses_num)
2827 self.logger.error("TCP session termination failed")
2830 def test_tcp_session_close_detection_out(self):
2831 """ Deterministic NAT TCP session close from outside network """
2832 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2834 socket.inet_aton(self.nat_addr),
2836 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2837 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2840 self.initiate_tcp_session(self.pg0, self.pg1)
2842 # close the session from outside
2844 # FIN packet out -> in
2845 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2846 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2847 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2849 self.pg1.add_stream(p)
2850 self.pg_enable_capture(self.pg_interfaces)
2852 self.pg0.get_capture(1)
2856 # ACK packet in -> out
2857 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2858 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2859 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2863 # ACK packet in -> out
2864 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2865 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2866 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2870 self.pg0.add_stream(pkts)
2871 self.pg_enable_capture(self.pg_interfaces)
2873 self.pg1.get_capture(2)
2875 # ACK packet out -> in
2876 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2877 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2878 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2880 self.pg1.add_stream(p)
2881 self.pg_enable_capture(self.pg_interfaces)
2883 self.pg0.get_capture(1)
2885 # Check if deterministic NAT44 closed the session
2886 dms = self.vapi.nat_det_map_dump()
2887 self.assertEqual(0, dms[0].ses_num)
2889 self.logger.error("TCP session termination failed")
2892 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2893 def test_session_timeout(self):
2894 """ Deterministic NAT session timeouts """
2895 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2897 socket.inet_aton(self.nat_addr),
2899 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2900 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2903 self.initiate_tcp_session(self.pg0, self.pg1)
2904 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
2905 pkts = self.create_stream_in(self.pg0, self.pg1)
2906 self.pg0.add_stream(pkts)
2907 self.pg_enable_capture(self.pg_interfaces)
2909 capture = self.pg1.get_capture(len(pkts))
2912 dms = self.vapi.nat_det_map_dump()
2913 self.assertEqual(0, dms[0].ses_num)
2915 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2916 def test_session_limit_per_user(self):
2917 """ Deterministic NAT maximum sessions per user limit """
2918 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2920 socket.inet_aton(self.nat_addr),
2922 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2923 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2925 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2926 src_address=self.pg2.local_ip4n,
2928 template_interval=10)
2929 self.vapi.nat_ipfix()
2932 for port in range(1025, 2025):
2933 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2934 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2935 UDP(sport=port, dport=port))
2938 self.pg0.add_stream(pkts)
2939 self.pg_enable_capture(self.pg_interfaces)
2941 capture = self.pg1.get_capture(len(pkts))
2943 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2944 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2945 UDP(sport=3001, dport=3002))
2946 self.pg0.add_stream(p)
2947 self.pg_enable_capture(self.pg_interfaces)
2949 capture = self.pg1.assert_nothing_captured()
2951 # verify ICMP error packet
2952 capture = self.pg0.get_capture(1)
2954 self.assertTrue(p.haslayer(ICMP))
2956 self.assertEqual(icmp.type, 3)
2957 self.assertEqual(icmp.code, 1)
2958 self.assertTrue(icmp.haslayer(IPerror))
2959 inner_ip = icmp[IPerror]
2960 self.assertEqual(inner_ip[UDPerror].sport, 3001)
2961 self.assertEqual(inner_ip[UDPerror].dport, 3002)
2963 dms = self.vapi.nat_det_map_dump()
2965 self.assertEqual(1000, dms[0].ses_num)
2967 # verify IPFIX logging
2968 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2970 capture = self.pg2.get_capture(2)
2971 ipfix = IPFIXDecoder()
2972 # first load template
2974 self.assertTrue(p.haslayer(IPFIX))
2975 if p.haslayer(Template):
2976 ipfix.add_template(p.getlayer(Template))
2977 # verify events in data set
2979 if p.haslayer(Data):
2980 data = ipfix.decode_data_set(p.getlayer(Set))
2981 self.verify_ipfix_max_entries_per_user(data)
2983 def clear_nat_det(self):
2985 Clear deterministic NAT configuration.
2987 self.vapi.nat_ipfix(enable=0)
2988 self.vapi.nat_det_set_timeouts()
2989 deterministic_mappings = self.vapi.nat_det_map_dump()
2990 for dsm in deterministic_mappings:
2991 self.vapi.nat_det_add_del_map(dsm.in_addr,
2997 interfaces = self.vapi.nat44_interface_dump()
2998 for intf in interfaces:
2999 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
3004 super(TestDeterministicNAT, self).tearDown()
3005 if not self.vpp_dead:
3006 self.logger.info(self.vapi.cli("show nat44 detail"))
3007 self.clear_nat_det()
3010 class TestNAT64(MethodHolder):
3011 """ NAT64 Test Cases """
3014 def setUpClass(cls):
3015 super(TestNAT64, cls).setUpClass()
3018 cls.tcp_port_in = 6303
3019 cls.tcp_port_out = 6303
3020 cls.udp_port_in = 6304
3021 cls.udp_port_out = 6304
3022 cls.icmp_id_in = 6305
3023 cls.icmp_id_out = 6305
3024 cls.nat_addr = '10.0.0.3'
3025 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3027 cls.vrf1_nat_addr = '10.0.10.3'
3028 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
3031 cls.create_pg_interfaces(range(3))
3032 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
3033 cls.ip6_interfaces.append(cls.pg_interfaces[2])
3034 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
3036 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
3038 cls.pg0.generate_remote_hosts(2)
3040 for i in cls.ip6_interfaces:
3043 i.configure_ipv6_neighbors()
3045 for i in cls.ip4_interfaces:
3051 super(TestNAT64, cls).tearDownClass()
3054 def test_pool(self):
3055 """ Add/delete address to NAT64 pool """
3056 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
3058 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
3060 addresses = self.vapi.nat64_pool_addr_dump()
3061 self.assertEqual(len(addresses), 1)
3062 self.assertEqual(addresses[0].address, nat_addr)
3064 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
3066 addresses = self.vapi.nat64_pool_addr_dump()
3067 self.assertEqual(len(addresses), 0)
3069 def test_interface(self):
3070 """ Enable/disable NAT64 feature on the interface """
3071 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3072 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3074 interfaces = self.vapi.nat64_interface_dump()
3075 self.assertEqual(len(interfaces), 2)
3078 for intf in interfaces:
3079 if intf.sw_if_index == self.pg0.sw_if_index:
3080 self.assertEqual(intf.is_inside, 1)
3082 elif intf.sw_if_index == self.pg1.sw_if_index:
3083 self.assertEqual(intf.is_inside, 0)
3085 self.assertTrue(pg0_found)
3086 self.assertTrue(pg1_found)
3088 features = self.vapi.cli("show interface features pg0")
3089 self.assertNotEqual(features.find('nat64-in2out'), -1)
3090 features = self.vapi.cli("show interface features pg1")
3091 self.assertNotEqual(features.find('nat64-out2in'), -1)
3093 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
3094 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
3096 interfaces = self.vapi.nat64_interface_dump()
3097 self.assertEqual(len(interfaces), 0)
3099 def test_static_bib(self):
3100 """ Add/delete static BIB entry """
3101 in_addr = socket.inet_pton(socket.AF_INET6,
3102 '2001:db8:85a3::8a2e:370:7334')
3103 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
3106 proto = IP_PROTOS.tcp
3108 self.vapi.nat64_add_del_static_bib(in_addr,
3113 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3118 self.assertEqual(bibe.i_addr, in_addr)
3119 self.assertEqual(bibe.o_addr, out_addr)
3120 self.assertEqual(bibe.i_port, in_port)
3121 self.assertEqual(bibe.o_port, out_port)
3122 self.assertEqual(static_bib_num, 1)
3124 self.vapi.nat64_add_del_static_bib(in_addr,
3130 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3135 self.assertEqual(static_bib_num, 0)
3137 def test_set_timeouts(self):
3138 """ Set NAT64 timeouts """
3139 # verify default values
3140 timeouts = self.vapi.nat64_get_timeouts()
3141 self.assertEqual(timeouts.udp, 300)
3142 self.assertEqual(timeouts.icmp, 60)
3143 self.assertEqual(timeouts.tcp_trans, 240)
3144 self.assertEqual(timeouts.tcp_est, 7440)
3145 self.assertEqual(timeouts.tcp_incoming_syn, 6)
3147 # set and verify custom values
3148 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
3149 tcp_est=7450, tcp_incoming_syn=10)
3150 timeouts = self.vapi.nat64_get_timeouts()
3151 self.assertEqual(timeouts.udp, 200)
3152 self.assertEqual(timeouts.icmp, 30)
3153 self.assertEqual(timeouts.tcp_trans, 250)
3154 self.assertEqual(timeouts.tcp_est, 7450)
3155 self.assertEqual(timeouts.tcp_incoming_syn, 10)
3157 def test_dynamic(self):
3158 """ NAT64 dynamic translation test """
3159 self.tcp_port_in = 6303
3160 self.udp_port_in = 6304
3161 self.icmp_id_in = 6305
3163 ses_num_start = self.nat64_get_ses_num()
3165 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3167 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3168 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3171 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3172 self.pg0.add_stream(pkts)
3173 self.pg_enable_capture(self.pg_interfaces)
3175 capture = self.pg1.get_capture(len(pkts))
3176 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3177 dst_ip=self.pg1.remote_ip4)
3180 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3181 self.pg1.add_stream(pkts)
3182 self.pg_enable_capture(self.pg_interfaces)
3184 capture = self.pg0.get_capture(len(pkts))
3185 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3186 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3189 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3190 self.pg0.add_stream(pkts)
3191 self.pg_enable_capture(self.pg_interfaces)
3193 capture = self.pg1.get_capture(len(pkts))
3194 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3195 dst_ip=self.pg1.remote_ip4)
3198 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3199 self.pg1.add_stream(pkts)
3200 self.pg_enable_capture(self.pg_interfaces)
3202 capture = self.pg0.get_capture(len(pkts))
3203 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3205 ses_num_end = self.nat64_get_ses_num()
3207 self.assertEqual(ses_num_end - ses_num_start, 3)
3209 # tenant with specific VRF
3210 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3211 self.vrf1_nat_addr_n,
3212 vrf_id=self.vrf1_id)
3213 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3215 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
3216 self.pg2.add_stream(pkts)
3217 self.pg_enable_capture(self.pg_interfaces)
3219 capture = self.pg1.get_capture(len(pkts))
3220 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3221 dst_ip=self.pg1.remote_ip4)
3223 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3224 self.pg1.add_stream(pkts)
3225 self.pg_enable_capture(self.pg_interfaces)
3227 capture = self.pg2.get_capture(len(pkts))
3228 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
3230 def test_static(self):
3231 """ NAT64 static translation test """
3232 self.tcp_port_in = 60303
3233 self.udp_port_in = 60304
3234 self.icmp_id_in = 60305
3235 self.tcp_port_out = 60303
3236 self.udp_port_out = 60304
3237 self.icmp_id_out = 60305
3239 ses_num_start = self.nat64_get_ses_num()
3241 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3243 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3244 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3246 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3251 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3256 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3263 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3264 self.pg0.add_stream(pkts)
3265 self.pg_enable_capture(self.pg_interfaces)
3267 capture = self.pg1.get_capture(len(pkts))
3268 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3269 dst_ip=self.pg1.remote_ip4, same_port=True)
3272 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3273 self.pg1.add_stream(pkts)
3274 self.pg_enable_capture(self.pg_interfaces)
3276 capture = self.pg0.get_capture(len(pkts))
3277 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3278 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3280 ses_num_end = self.nat64_get_ses_num()
3282 self.assertEqual(ses_num_end - ses_num_start, 3)
3284 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3285 def test_session_timeout(self):
3286 """ NAT64 session timeout """
3287 self.icmp_id_in = 1234
3288 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3290 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3291 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3292 self.vapi.nat64_set_timeouts(icmp=5)
3294 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3295 self.pg0.add_stream(pkts)
3296 self.pg_enable_capture(self.pg_interfaces)
3298 capture = self.pg1.get_capture(len(pkts))
3300 ses_num_before_timeout = self.nat64_get_ses_num()
3304 # ICMP session after timeout
3305 ses_num_after_timeout = self.nat64_get_ses_num()
3306 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
3308 def test_icmp_error(self):
3309 """ NAT64 ICMP Error message translation """
3310 self.tcp_port_in = 6303
3311 self.udp_port_in = 6304
3312 self.icmp_id_in = 6305
3314 ses_num_start = self.nat64_get_ses_num()
3316 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3318 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3319 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3321 # send some packets to create sessions
3322 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3323 self.pg0.add_stream(pkts)
3324 self.pg_enable_capture(self.pg_interfaces)
3326 capture_ip4 = self.pg1.get_capture(len(pkts))
3327 self.verify_capture_out(capture_ip4,
3328 nat_ip=self.nat_addr,
3329 dst_ip=self.pg1.remote_ip4)
3331 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3332 self.pg1.add_stream(pkts)
3333 self.pg_enable_capture(self.pg_interfaces)
3335 capture_ip6 = self.pg0.get_capture(len(pkts))
3336 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3337 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
3338 self.pg0.remote_ip6)
3341 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3342 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
3343 ICMPv6DestUnreach(code=1) /
3344 packet[IPv6] for packet in capture_ip6]
3345 self.pg0.add_stream(pkts)
3346 self.pg_enable_capture(self.pg_interfaces)
3348 capture = self.pg1.get_capture(len(pkts))
3349 for packet in capture:
3351 self.assertEqual(packet[IP].src, self.nat_addr)
3352 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3353 self.assertEqual(packet[ICMP].type, 3)
3354 self.assertEqual(packet[ICMP].code, 13)
3355 inner = packet[IPerror]
3356 self.assertEqual(inner.src, self.pg1.remote_ip4)
3357 self.assertEqual(inner.dst, self.nat_addr)
3358 self.check_icmp_checksum(packet)
3359 if inner.haslayer(TCPerror):
3360 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
3361 elif inner.haslayer(UDPerror):
3362 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
3364 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
3366 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3370 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3371 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3372 ICMP(type=3, code=13) /
3373 packet[IP] for packet in capture_ip4]
3374 self.pg1.add_stream(pkts)
3375 self.pg_enable_capture(self.pg_interfaces)
3377 capture = self.pg0.get_capture(len(pkts))
3378 for packet in capture:
3380 self.assertEqual(packet[IPv6].src, ip.src)
3381 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3382 icmp = packet[ICMPv6DestUnreach]
3383 self.assertEqual(icmp.code, 1)
3384 inner = icmp[IPerror6]
3385 self.assertEqual(inner.src, self.pg0.remote_ip6)
3386 self.assertEqual(inner.dst, ip.src)
3387 self.check_icmpv6_checksum(packet)
3388 if inner.haslayer(TCPerror):
3389 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
3390 elif inner.haslayer(UDPerror):
3391 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
3393 self.assertEqual(inner[ICMPv6EchoRequest].id,
3396 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3399 def test_hairpinning(self):
3400 """ NAT64 hairpinning """
3402 client = self.pg0.remote_hosts[0]
3403 server = self.pg0.remote_hosts[1]
3404 server_tcp_in_port = 22
3405 server_tcp_out_port = 4022
3406 server_udp_in_port = 23
3407 server_udp_out_port = 4023
3408 client_tcp_in_port = 1234
3409 client_udp_in_port = 1235
3410 client_tcp_out_port = 0
3411 client_udp_out_port = 0
3412 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
3413 nat_addr_ip6 = ip.src
3415 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3417 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3418 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3420 self.vapi.nat64_add_del_static_bib(server.ip6n,
3423 server_tcp_out_port,
3425 self.vapi.nat64_add_del_static_bib(server.ip6n,
3428 server_udp_out_port,
3433 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3434 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3435 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3437 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3438 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3439 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3441 self.pg0.add_stream(pkts)
3442 self.pg_enable_capture(self.pg_interfaces)
3444 capture = self.pg0.get_capture(len(pkts))
3445 for packet in capture:
3447 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3448 self.assertEqual(packet[IPv6].dst, server.ip6)
3449 if packet.haslayer(TCP):
3450 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3451 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3452 self.check_tcp_checksum(packet)
3453 client_tcp_out_port = packet[TCP].sport
3455 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3456 self.assertEqual(packet[UDP].dport, server_udp_in_port)
3457 self.check_udp_checksum(packet)
3458 client_udp_out_port = packet[UDP].sport
3460 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3465 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3466 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3467 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3469 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3470 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3471 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3473 self.pg0.add_stream(pkts)
3474 self.pg_enable_capture(self.pg_interfaces)
3476 capture = self.pg0.get_capture(len(pkts))
3477 for packet in capture:
3479 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3480 self.assertEqual(packet[IPv6].dst, client.ip6)
3481 if packet.haslayer(TCP):
3482 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3483 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3484 self.check_tcp_checksum(packet)
3486 self.assertEqual(packet[UDP].sport, server_udp_out_port)
3487 self.assertEqual(packet[UDP].dport, client_udp_in_port)
3488 self.check_udp_checksum(packet)
3490 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3495 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3496 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3497 ICMPv6DestUnreach(code=1) /
3498 packet[IPv6] for packet in capture]
3499 self.pg0.add_stream(pkts)
3500 self.pg_enable_capture(self.pg_interfaces)
3502 capture = self.pg0.get_capture(len(pkts))
3503 for packet in capture:
3505 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3506 self.assertEqual(packet[IPv6].dst, server.ip6)
3507 icmp = packet[ICMPv6DestUnreach]
3508 self.assertEqual(icmp.code, 1)
3509 inner = icmp[IPerror6]
3510 self.assertEqual(inner.src, server.ip6)
3511 self.assertEqual(inner.dst, nat_addr_ip6)
3512 self.check_icmpv6_checksum(packet)
3513 if inner.haslayer(TCPerror):
3514 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3515 self.assertEqual(inner[TCPerror].dport,
3516 client_tcp_out_port)
3518 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3519 self.assertEqual(inner[UDPerror].dport,
3520 client_udp_out_port)
3522 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3525 def test_prefix(self):
3526 """ NAT64 Network-Specific Prefix """
3528 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3530 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3531 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3532 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3533 self.vrf1_nat_addr_n,
3534 vrf_id=self.vrf1_id)
3535 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3538 global_pref64 = "2001:db8::"
3539 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
3540 global_pref64_len = 32
3541 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
3543 prefix = self.vapi.nat64_prefix_dump()
3544 self.assertEqual(len(prefix), 1)
3545 self.assertEqual(prefix[0].prefix, global_pref64_n)
3546 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
3547 self.assertEqual(prefix[0].vrf_id, 0)
3549 # Add tenant specific prefix
3550 vrf1_pref64 = "2001:db8:122:300::"
3551 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
3552 vrf1_pref64_len = 56
3553 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
3555 vrf_id=self.vrf1_id)
3556 prefix = self.vapi.nat64_prefix_dump()
3557 self.assertEqual(len(prefix), 2)
3560 pkts = self.create_stream_in_ip6(self.pg0,
3563 plen=global_pref64_len)
3564 self.pg0.add_stream(pkts)
3565 self.pg_enable_capture(self.pg_interfaces)
3567 capture = self.pg1.get_capture(len(pkts))
3568 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3569 dst_ip=self.pg1.remote_ip4)
3571 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3572 self.pg1.add_stream(pkts)
3573 self.pg_enable_capture(self.pg_interfaces)
3575 capture = self.pg0.get_capture(len(pkts))
3576 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3579 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
3581 # Tenant specific prefix
3582 pkts = self.create_stream_in_ip6(self.pg2,
3585 plen=vrf1_pref64_len)
3586 self.pg2.add_stream(pkts)
3587 self.pg_enable_capture(self.pg_interfaces)
3589 capture = self.pg1.get_capture(len(pkts))
3590 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3591 dst_ip=self.pg1.remote_ip4)
3593 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3594 self.pg1.add_stream(pkts)
3595 self.pg_enable_capture(self.pg_interfaces)
3597 capture = self.pg2.get_capture(len(pkts))
3598 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3601 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
3603 def test_unknown_proto(self):
3604 """ NAT64 translate packet with unknown protocol """
3606 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3608 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3609 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3610 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
3613 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3614 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
3615 TCP(sport=self.tcp_port_in, dport=20))
3616 self.pg0.add_stream(p)
3617 self.pg_enable_capture(self.pg_interfaces)
3619 p = self.pg1.get_capture(1)
3621 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3622 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
3624 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3625 TCP(sport=1234, dport=1234))
3626 self.pg0.add_stream(p)
3627 self.pg_enable_capture(self.pg_interfaces)
3629 p = self.pg1.get_capture(1)
3632 self.assertEqual(packet[IP].src, self.nat_addr)
3633 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3634 self.assertTrue(packet.haslayer(GRE))
3635 self.check_ip_checksum(packet)
3637 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3641 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3642 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3644 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3645 TCP(sport=1234, dport=1234))
3646 self.pg1.add_stream(p)
3647 self.pg_enable_capture(self.pg_interfaces)
3649 p = self.pg0.get_capture(1)
3652 self.assertEqual(packet[IPv6].src, remote_ip6)
3653 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3654 self.assertEqual(packet[IPv6].nh, 47)
3656 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3659 def test_hairpinning_unknown_proto(self):
3660 """ NAT64 translate packet with unknown protocol - hairpinning """
3662 client = self.pg0.remote_hosts[0]
3663 server = self.pg0.remote_hosts[1]
3664 server_tcp_in_port = 22
3665 server_tcp_out_port = 4022
3666 client_tcp_in_port = 1234
3667 client_tcp_out_port = 1235
3668 server_nat_ip = "10.0.0.100"
3669 client_nat_ip = "10.0.0.110"
3670 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
3671 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
3672 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
3673 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
3675 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
3677 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3678 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3680 self.vapi.nat64_add_del_static_bib(server.ip6n,
3683 server_tcp_out_port,
3686 self.vapi.nat64_add_del_static_bib(server.ip6n,
3692 self.vapi.nat64_add_del_static_bib(client.ip6n,
3695 client_tcp_out_port,
3699 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3700 IPv6(src=client.ip6, dst=server_nat_ip6) /
3701 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3702 self.pg0.add_stream(p)
3703 self.pg_enable_capture(self.pg_interfaces)
3705 p = self.pg0.get_capture(1)
3707 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3708 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
3710 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3711 TCP(sport=1234, dport=1234))
3712 self.pg0.add_stream(p)
3713 self.pg_enable_capture(self.pg_interfaces)
3715 p = self.pg0.get_capture(1)
3718 self.assertEqual(packet[IPv6].src, client_nat_ip6)
3719 self.assertEqual(packet[IPv6].dst, server.ip6)
3720 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3722 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3726 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3727 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
3729 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3730 TCP(sport=1234, dport=1234))
3731 self.pg0.add_stream(p)
3732 self.pg_enable_capture(self.pg_interfaces)
3734 p = self.pg0.get_capture(1)
3737 self.assertEqual(packet[IPv6].src, server_nat_ip6)
3738 self.assertEqual(packet[IPv6].dst, client.ip6)
3739 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3741 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3744 def nat64_get_ses_num(self):
3746 Return number of active NAT64 sessions.
3748 st = self.vapi.nat64_st_dump()
3751 def clear_nat64(self):
3753 Clear NAT64 configuration.
3755 self.vapi.nat64_set_timeouts()
3757 interfaces = self.vapi.nat64_interface_dump()
3758 for intf in interfaces:
3759 self.vapi.nat64_add_del_interface(intf.sw_if_index,
3763 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3766 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3774 bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
3777 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3785 bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
3788 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3796 adresses = self.vapi.nat64_pool_addr_dump()
3797 for addr in adresses:
3798 self.vapi.nat64_add_del_pool_addr_range(addr.address,
3803 prefixes = self.vapi.nat64_prefix_dump()
3804 for prefix in prefixes:
3805 self.vapi.nat64_add_del_prefix(prefix.prefix,
3807 vrf_id=prefix.vrf_id,
3811 super(TestNAT64, self).tearDown()
3812 if not self.vpp_dead:
3813 self.logger.info(self.vapi.cli("show nat64 pool"))
3814 self.logger.info(self.vapi.cli("show nat64 interfaces"))
3815 self.logger.info(self.vapi.cli("show nat64 prefix"))
3816 self.logger.info(self.vapi.cli("show nat64 bib all"))
3817 self.logger.info(self.vapi.cli("show nat64 session table all"))
3820 if __name__ == '__main__':
3821 unittest.main(testRunner=VppTestRunner)