7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
12 from scapy.layers.l2 import Ether, ARP, GRE
13 from scapy.data import IP_PROTOS
14 from scapy.packet import bind_layers
16 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
17 from time import sleep
18 from util import ip4_range
19 from util import mactobinary
22 class MethodHolder(VppTestCase):
23 """ NAT create capture and verify method holder """
27 super(MethodHolder, cls).setUpClass()
30 super(MethodHolder, self).tearDown()
32 def check_ip_checksum(self, pkt):
34 Check IP checksum of the packet
36 :param pkt: Packet to check IP checksum
38 new = pkt.__class__(str(pkt))
40 new = new.__class__(str(new))
41 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
43 def check_tcp_checksum(self, pkt):
45 Check TCP checksum in IP packet
47 :param pkt: Packet to check TCP checksum
49 new = pkt.__class__(str(pkt))
51 new = new.__class__(str(new))
52 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
54 def check_udp_checksum(self, pkt):
56 Check UDP checksum in IP packet
58 :param pkt: Packet to check UDP checksum
60 new = pkt.__class__(str(pkt))
62 new = new.__class__(str(new))
63 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
65 def check_icmp_errror_embedded(self, pkt):
67 Check ICMP error embeded packet checksum
69 :param pkt: Packet to check ICMP error embeded packet checksum
71 if pkt.haslayer(IPerror):
72 new = pkt.__class__(str(pkt))
73 del new['IPerror'].chksum
74 new = new.__class__(str(new))
75 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
77 if pkt.haslayer(TCPerror):
78 new = pkt.__class__(str(pkt))
79 del new['TCPerror'].chksum
80 new = new.__class__(str(new))
81 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
83 if pkt.haslayer(UDPerror):
84 if pkt['UDPerror'].chksum != 0:
85 new = pkt.__class__(str(pkt))
86 del new['UDPerror'].chksum
87 new = new.__class__(str(new))
88 self.assertEqual(new['UDPerror'].chksum,
89 pkt['UDPerror'].chksum)
91 if pkt.haslayer(ICMPerror):
92 del new['ICMPerror'].chksum
93 new = new.__class__(str(new))
94 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
96 def check_icmp_checksum(self, pkt):
98 Check ICMP checksum in IPv4 packet
100 :param pkt: Packet to check ICMP checksum
102 new = pkt.__class__(str(pkt))
103 del new['ICMP'].chksum
104 new = new.__class__(str(new))
105 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
106 if pkt.haslayer(IPerror):
107 self.check_icmp_errror_embedded(pkt)
109 def check_icmpv6_checksum(self, pkt):
111 Check ICMPv6 checksum in IPv4 packet
113 :param pkt: Packet to check ICMPv6 checksum
115 new = pkt.__class__(str(pkt))
116 if pkt.haslayer(ICMPv6DestUnreach):
117 del new['ICMPv6DestUnreach'].cksum
118 new = new.__class__(str(new))
119 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
120 pkt['ICMPv6DestUnreach'].cksum)
121 self.check_icmp_errror_embedded(pkt)
122 if pkt.haslayer(ICMPv6EchoRequest):
123 del new['ICMPv6EchoRequest'].cksum
124 new = new.__class__(str(new))
125 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
126 pkt['ICMPv6EchoRequest'].cksum)
127 if pkt.haslayer(ICMPv6EchoReply):
128 del new['ICMPv6EchoReply'].cksum
129 new = new.__class__(str(new))
130 self.assertEqual(new['ICMPv6EchoReply'].cksum,
131 pkt['ICMPv6EchoReply'].cksum)
133 def create_stream_in(self, in_if, out_if, ttl=64):
135 Create packet stream for inside network
137 :param in_if: Inside interface
138 :param out_if: Outside interface
139 :param ttl: TTL of generated packets
143 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
144 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
145 TCP(sport=self.tcp_port_in, dport=20))
149 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
150 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
151 UDP(sport=self.udp_port_in, dport=20))
155 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
156 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
157 ICMP(id=self.icmp_id_in, type='echo-request'))
162 def compose_ip6(self, ip4, pref, plen):
164 Compose IPv4-embedded IPv6 addresses
166 :param ip4: IPv4 address
167 :param pref: IPv6 prefix
168 :param plen: IPv6 prefix length
169 :returns: IPv4-embedded IPv6 addresses
171 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
172 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
187 pref_n[10] = ip4_n[3]
191 pref_n[10] = ip4_n[2]
192 pref_n[11] = ip4_n[3]
195 pref_n[10] = ip4_n[1]
196 pref_n[11] = ip4_n[2]
197 pref_n[12] = ip4_n[3]
199 pref_n[12] = ip4_n[0]
200 pref_n[13] = ip4_n[1]
201 pref_n[14] = ip4_n[2]
202 pref_n[15] = ip4_n[3]
203 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
205 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
207 Create IPv6 packet stream for inside network
209 :param in_if: Inside interface
210 :param out_if: Outside interface
211 :param ttl: Hop Limit of generated packets
212 :param pref: NAT64 prefix
213 :param plen: NAT64 prefix length
217 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
219 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
222 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
223 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
224 TCP(sport=self.tcp_port_in, dport=20))
228 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
229 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
230 UDP(sport=self.udp_port_in, dport=20))
234 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
235 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
236 ICMPv6EchoRequest(id=self.icmp_id_in))
241 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
243 Create packet stream for outside network
245 :param out_if: Outside interface
246 :param dst_ip: Destination IP address (Default use global NAT address)
247 :param ttl: TTL of generated packets
250 dst_ip = self.nat_addr
253 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
254 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
255 TCP(dport=self.tcp_port_out, sport=20))
259 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
260 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
261 UDP(dport=self.udp_port_out, sport=20))
265 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
266 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
267 ICMP(id=self.icmp_id_out, type='echo-reply'))
272 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
273 packet_num=3, dst_ip=None):
275 Verify captured packets on outside network
277 :param capture: Captured packets
278 :param nat_ip: Translated IP address (Default use global NAT address)
279 :param same_port: Sorce port number is not translated (Default False)
280 :param packet_num: Expected number of packets (Default 3)
281 :param dst_ip: Destination IP address (Default do not verify)
284 nat_ip = self.nat_addr
285 self.assertEqual(packet_num, len(capture))
286 for packet in capture:
288 self.check_ip_checksum(packet)
289 self.assertEqual(packet[IP].src, nat_ip)
290 if dst_ip is not None:
291 self.assertEqual(packet[IP].dst, dst_ip)
292 if packet.haslayer(TCP):
294 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
297 packet[TCP].sport, self.tcp_port_in)
298 self.tcp_port_out = packet[TCP].sport
299 self.check_tcp_checksum(packet)
300 elif packet.haslayer(UDP):
302 self.assertEqual(packet[UDP].sport, self.udp_port_in)
305 packet[UDP].sport, self.udp_port_in)
306 self.udp_port_out = packet[UDP].sport
309 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
311 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
312 self.icmp_id_out = packet[ICMP].id
313 self.check_icmp_checksum(packet)
315 self.logger.error(ppp("Unexpected or invalid packet "
316 "(outside network):", packet))
319 def verify_capture_in(self, capture, in_if, packet_num=3):
321 Verify captured packets on inside network
323 :param capture: Captured packets
324 :param in_if: Inside interface
325 :param packet_num: Expected number of packets (Default 3)
327 self.assertEqual(packet_num, len(capture))
328 for packet in capture:
330 self.check_ip_checksum(packet)
331 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
332 if packet.haslayer(TCP):
333 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
334 self.check_tcp_checksum(packet)
335 elif packet.haslayer(UDP):
336 self.assertEqual(packet[UDP].dport, self.udp_port_in)
338 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
339 self.check_icmp_checksum(packet)
341 self.logger.error(ppp("Unexpected or invalid packet "
342 "(inside network):", packet))
345 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
347 Verify captured IPv6 packets on inside network
349 :param capture: Captured packets
350 :param src_ip: Source IP
351 :param dst_ip: Destination IP address
352 :param packet_num: Expected number of packets (Default 3)
354 self.assertEqual(packet_num, len(capture))
355 for packet in capture:
357 self.assertEqual(packet[IPv6].src, src_ip)
358 self.assertEqual(packet[IPv6].dst, dst_ip)
359 if packet.haslayer(TCP):
360 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
361 self.check_tcp_checksum(packet)
362 elif packet.haslayer(UDP):
363 self.assertEqual(packet[UDP].dport, self.udp_port_in)
364 self.check_udp_checksum(packet)
366 self.assertEqual(packet[ICMPv6EchoReply].id,
368 self.check_icmpv6_checksum(packet)
370 self.logger.error(ppp("Unexpected or invalid packet "
371 "(inside network):", packet))
374 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
376 Verify captured packet that don't have to be translated
378 :param capture: Captured packets
379 :param ingress_if: Ingress interface
380 :param egress_if: Egress interface
382 for packet in capture:
384 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
385 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
386 if packet.haslayer(TCP):
387 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
388 elif packet.haslayer(UDP):
389 self.assertEqual(packet[UDP].sport, self.udp_port_in)
391 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
393 self.logger.error(ppp("Unexpected or invalid packet "
394 "(inside network):", packet))
397 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
398 packet_num=3, icmp_type=11):
400 Verify captured packets with ICMP errors on outside network
402 :param capture: Captured packets
403 :param src_ip: Translated IP address or IP address of VPP
404 (Default use global NAT address)
405 :param packet_num: Expected number of packets (Default 3)
406 :param icmp_type: Type of error ICMP packet
407 we are expecting (Default 11)
410 src_ip = self.nat_addr
411 self.assertEqual(packet_num, len(capture))
412 for packet in capture:
414 self.assertEqual(packet[IP].src, src_ip)
415 self.assertTrue(packet.haslayer(ICMP))
417 self.assertEqual(icmp.type, icmp_type)
418 self.assertTrue(icmp.haslayer(IPerror))
419 inner_ip = icmp[IPerror]
420 if inner_ip.haslayer(TCPerror):
421 self.assertEqual(inner_ip[TCPerror].dport,
423 elif inner_ip.haslayer(UDPerror):
424 self.assertEqual(inner_ip[UDPerror].dport,
427 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
429 self.logger.error(ppp("Unexpected or invalid packet "
430 "(outside network):", packet))
433 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
436 Verify captured packets with ICMP errors on inside network
438 :param capture: Captured packets
439 :param in_if: Inside interface
440 :param packet_num: Expected number of packets (Default 3)
441 :param icmp_type: Type of error ICMP packet
442 we are expecting (Default 11)
444 self.assertEqual(packet_num, len(capture))
445 for packet in capture:
447 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
448 self.assertTrue(packet.haslayer(ICMP))
450 self.assertEqual(icmp.type, icmp_type)
451 self.assertTrue(icmp.haslayer(IPerror))
452 inner_ip = icmp[IPerror]
453 if inner_ip.haslayer(TCPerror):
454 self.assertEqual(inner_ip[TCPerror].sport,
456 elif inner_ip.haslayer(UDPerror):
457 self.assertEqual(inner_ip[UDPerror].sport,
460 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
462 self.logger.error(ppp("Unexpected or invalid packet "
463 "(inside network):", packet))
466 def verify_ipfix_nat44_ses(self, data):
468 Verify IPFIX NAT44 session create/delete event
470 :param data: Decoded IPFIX data records
472 nat44_ses_create_num = 0
473 nat44_ses_delete_num = 0
474 self.assertEqual(6, len(data))
477 self.assertIn(ord(record[230]), [4, 5])
478 if ord(record[230]) == 4:
479 nat44_ses_create_num += 1
481 nat44_ses_delete_num += 1
483 self.assertEqual(self.pg0.remote_ip4n, record[8])
484 # postNATSourceIPv4Address
485 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
488 self.assertEqual(struct.pack("!I", 0), record[234])
489 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
490 if IP_PROTOS.icmp == ord(record[4]):
491 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
492 self.assertEqual(struct.pack("!H", self.icmp_id_out),
494 elif IP_PROTOS.tcp == ord(record[4]):
495 self.assertEqual(struct.pack("!H", self.tcp_port_in),
497 self.assertEqual(struct.pack("!H", self.tcp_port_out),
499 elif IP_PROTOS.udp == ord(record[4]):
500 self.assertEqual(struct.pack("!H", self.udp_port_in),
502 self.assertEqual(struct.pack("!H", self.udp_port_out),
505 self.fail("Invalid protocol")
506 self.assertEqual(3, nat44_ses_create_num)
507 self.assertEqual(3, nat44_ses_delete_num)
509 def verify_ipfix_addr_exhausted(self, data):
511 Verify IPFIX NAT addresses event
513 :param data: Decoded IPFIX data records
515 self.assertEqual(1, len(data))
518 self.assertEqual(ord(record[230]), 3)
520 self.assertEqual(struct.pack("!I", 0), record[283])
523 class TestNAT44(MethodHolder):
524 """ NAT44 Test Cases """
528 super(TestNAT44, cls).setUpClass()
531 cls.tcp_port_in = 6303
532 cls.tcp_port_out = 6303
533 cls.udp_port_in = 6304
534 cls.udp_port_out = 6304
535 cls.icmp_id_in = 6305
536 cls.icmp_id_out = 6305
537 cls.nat_addr = '10.0.0.3'
538 cls.ipfix_src_port = 4739
539 cls.ipfix_domain_id = 1
541 cls.create_pg_interfaces(range(10))
542 cls.interfaces = list(cls.pg_interfaces[0:4])
544 for i in cls.interfaces:
549 cls.pg0.generate_remote_hosts(3)
550 cls.pg0.configure_ipv4_neighbors()
552 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
553 cls.vapi.ip_table_add_del(10, is_add=1)
554 cls.vapi.ip_table_add_del(20, is_add=1)
556 cls.pg4._local_ip4 = "172.16.255.1"
557 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
558 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
559 cls.pg4.set_table_ip4(10)
560 cls.pg5._local_ip4 = "172.17.255.3"
561 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
562 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
563 cls.pg5.set_table_ip4(10)
564 cls.pg6._local_ip4 = "172.16.255.1"
565 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
566 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
567 cls.pg6.set_table_ip4(20)
568 for i in cls.overlapping_interfaces:
576 cls.pg9.generate_remote_hosts(2)
578 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
579 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
583 cls.pg9.resolve_arp()
584 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
585 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
586 cls.pg9.resolve_arp()
589 super(TestNAT44, cls).tearDownClass()
592 def clear_nat44(self):
594 Clear NAT44 configuration.
596 # I found no elegant way to do this
597 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
598 dst_address_length=32,
599 next_hop_address=self.pg7.remote_ip4n,
600 next_hop_sw_if_index=self.pg7.sw_if_index,
602 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
603 dst_address_length=32,
604 next_hop_address=self.pg8.remote_ip4n,
605 next_hop_sw_if_index=self.pg8.sw_if_index,
608 for intf in [self.pg7, self.pg8]:
609 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
611 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
616 if self.pg7.has_ip4_config:
617 self.pg7.unconfig_ip4()
619 interfaces = self.vapi.nat44_interface_addr_dump()
620 for intf in interfaces:
621 self.vapi.nat44_add_interface_addr(intf.sw_if_index, is_add=0)
623 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
624 domain_id=self.ipfix_domain_id)
625 self.ipfix_src_port = 4739
626 self.ipfix_domain_id = 1
628 interfaces = self.vapi.nat44_interface_dump()
629 for intf in interfaces:
630 if intf.is_inside > 1:
631 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
634 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
638 interfaces = self.vapi.nat44_interface_output_feature_dump()
639 for intf in interfaces:
640 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
644 static_mappings = self.vapi.nat44_static_mapping_dump()
645 for sm in static_mappings:
646 self.vapi.nat44_add_del_static_mapping(
648 sm.external_ip_address,
649 local_port=sm.local_port,
650 external_port=sm.external_port,
651 addr_only=sm.addr_only,
653 protocol=sm.protocol,
656 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
657 for lb_sm in lb_static_mappings:
658 self.vapi.nat44_add_del_lb_static_mapping(
667 adresses = self.vapi.nat44_address_dump()
668 for addr in adresses:
669 self.vapi.nat44_add_del_address_range(addr.ip_address,
673 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
674 local_port=0, external_port=0, vrf_id=0,
675 is_add=1, external_sw_if_index=0xFFFFFFFF,
678 Add/delete NAT44 static mapping
680 :param local_ip: Local IP address
681 :param external_ip: External IP address
682 :param local_port: Local port number (Optional)
683 :param external_port: External port number (Optional)
684 :param vrf_id: VRF ID (Default 0)
685 :param is_add: 1 if add, 0 if delete (Default add)
686 :param external_sw_if_index: External interface instead of IP address
687 :param proto: IP protocol (Mandatory if port specified)
690 if local_port and external_port:
692 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
693 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
694 self.vapi.nat44_add_del_static_mapping(
697 external_sw_if_index,
705 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
707 Add/delete NAT44 address
709 :param ip: IP address
710 :param is_add: 1 if add, 0 if delete (Default add)
712 nat_addr = socket.inet_pton(socket.AF_INET, ip)
713 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
716 def test_dynamic(self):
717 """ NAT44 dynamic translation test """
719 self.nat44_add_address(self.nat_addr)
720 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
721 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
725 pkts = self.create_stream_in(self.pg0, self.pg1)
726 self.pg0.add_stream(pkts)
727 self.pg_enable_capture(self.pg_interfaces)
729 capture = self.pg1.get_capture(len(pkts))
730 self.verify_capture_out(capture)
733 pkts = self.create_stream_out(self.pg1)
734 self.pg1.add_stream(pkts)
735 self.pg_enable_capture(self.pg_interfaces)
737 capture = self.pg0.get_capture(len(pkts))
738 self.verify_capture_in(capture, self.pg0)
740 def test_dynamic_icmp_errors_in2out_ttl_1(self):
741 """ NAT44 handling of client packets with TTL=1 """
743 self.nat44_add_address(self.nat_addr)
744 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
745 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
748 # Client side - generate traffic
749 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
750 self.pg0.add_stream(pkts)
751 self.pg_enable_capture(self.pg_interfaces)
754 # Client side - verify ICMP type 11 packets
755 capture = self.pg0.get_capture(len(pkts))
756 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
758 def test_dynamic_icmp_errors_out2in_ttl_1(self):
759 """ NAT44 handling of server packets with TTL=1 """
761 self.nat44_add_address(self.nat_addr)
762 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
763 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
766 # Client side - create sessions
767 pkts = self.create_stream_in(self.pg0, self.pg1)
768 self.pg0.add_stream(pkts)
769 self.pg_enable_capture(self.pg_interfaces)
772 # Server side - generate traffic
773 capture = self.pg1.get_capture(len(pkts))
774 self.verify_capture_out(capture)
775 pkts = self.create_stream_out(self.pg1, ttl=1)
776 self.pg1.add_stream(pkts)
777 self.pg_enable_capture(self.pg_interfaces)
780 # Server side - verify ICMP type 11 packets
781 capture = self.pg1.get_capture(len(pkts))
782 self.verify_capture_out_with_icmp_errors(capture,
783 src_ip=self.pg1.local_ip4)
785 def test_dynamic_icmp_errors_in2out_ttl_2(self):
786 """ NAT44 handling of error responses to client packets with TTL=2 """
788 self.nat44_add_address(self.nat_addr)
789 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
790 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
793 # Client side - generate traffic
794 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
795 self.pg0.add_stream(pkts)
796 self.pg_enable_capture(self.pg_interfaces)
799 # Server side - simulate ICMP type 11 response
800 capture = self.pg1.get_capture(len(pkts))
801 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
802 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
803 ICMP(type=11) / packet[IP] for packet in capture]
804 self.pg1.add_stream(pkts)
805 self.pg_enable_capture(self.pg_interfaces)
808 # Client side - verify ICMP type 11 packets
809 capture = self.pg0.get_capture(len(pkts))
810 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
812 def test_dynamic_icmp_errors_out2in_ttl_2(self):
813 """ NAT44 handling of error responses to server packets with TTL=2 """
815 self.nat44_add_address(self.nat_addr)
816 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
817 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
820 # Client side - create sessions
821 pkts = self.create_stream_in(self.pg0, self.pg1)
822 self.pg0.add_stream(pkts)
823 self.pg_enable_capture(self.pg_interfaces)
826 # Server side - generate traffic
827 capture = self.pg1.get_capture(len(pkts))
828 self.verify_capture_out(capture)
829 pkts = self.create_stream_out(self.pg1, ttl=2)
830 self.pg1.add_stream(pkts)
831 self.pg_enable_capture(self.pg_interfaces)
834 # Client side - simulate ICMP type 11 response
835 capture = self.pg0.get_capture(len(pkts))
836 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
837 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
838 ICMP(type=11) / packet[IP] for packet in capture]
839 self.pg0.add_stream(pkts)
840 self.pg_enable_capture(self.pg_interfaces)
843 # Server side - verify ICMP type 11 packets
844 capture = self.pg1.get_capture(len(pkts))
845 self.verify_capture_out_with_icmp_errors(capture)
847 def test_ping_out_interface_from_outside(self):
848 """ Ping NAT44 out interface from outside network """
850 self.nat44_add_address(self.nat_addr)
851 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
852 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
855 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
856 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
857 ICMP(id=self.icmp_id_out, type='echo-request'))
859 self.pg1.add_stream(pkts)
860 self.pg_enable_capture(self.pg_interfaces)
862 capture = self.pg1.get_capture(len(pkts))
863 self.assertEqual(1, len(capture))
866 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
867 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
868 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
869 self.assertEqual(packet[ICMP].type, 0) # echo reply
871 self.logger.error(ppp("Unexpected or invalid packet "
872 "(outside network):", packet))
875 def test_ping_internal_host_from_outside(self):
876 """ Ping internal host from outside network """
878 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
879 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
880 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
884 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
885 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
886 ICMP(id=self.icmp_id_out, type='echo-request'))
887 self.pg1.add_stream(pkt)
888 self.pg_enable_capture(self.pg_interfaces)
890 capture = self.pg0.get_capture(1)
891 self.verify_capture_in(capture, self.pg0, packet_num=1)
892 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
895 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
896 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
897 ICMP(id=self.icmp_id_in, type='echo-reply'))
898 self.pg0.add_stream(pkt)
899 self.pg_enable_capture(self.pg_interfaces)
901 capture = self.pg1.get_capture(1)
902 self.verify_capture_out(capture, same_port=True, packet_num=1)
903 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
905 def test_static_in(self):
906 """ 1:1 NAT initialized from inside network """
909 self.tcp_port_out = 6303
910 self.udp_port_out = 6304
911 self.icmp_id_out = 6305
913 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
914 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
915 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
919 pkts = self.create_stream_in(self.pg0, self.pg1)
920 self.pg0.add_stream(pkts)
921 self.pg_enable_capture(self.pg_interfaces)
923 capture = self.pg1.get_capture(len(pkts))
924 self.verify_capture_out(capture, nat_ip, True)
927 pkts = self.create_stream_out(self.pg1, nat_ip)
928 self.pg1.add_stream(pkts)
929 self.pg_enable_capture(self.pg_interfaces)
931 capture = self.pg0.get_capture(len(pkts))
932 self.verify_capture_in(capture, self.pg0)
934 def test_static_out(self):
935 """ 1:1 NAT initialized from outside network """
938 self.tcp_port_out = 6303
939 self.udp_port_out = 6304
940 self.icmp_id_out = 6305
942 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
943 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
944 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
948 pkts = self.create_stream_out(self.pg1, nat_ip)
949 self.pg1.add_stream(pkts)
950 self.pg_enable_capture(self.pg_interfaces)
952 capture = self.pg0.get_capture(len(pkts))
953 self.verify_capture_in(capture, self.pg0)
956 pkts = self.create_stream_in(self.pg0, self.pg1)
957 self.pg0.add_stream(pkts)
958 self.pg_enable_capture(self.pg_interfaces)
960 capture = self.pg1.get_capture(len(pkts))
961 self.verify_capture_out(capture, nat_ip, True)
963 def test_static_with_port_in(self):
964 """ 1:1 NAPT initialized from inside network """
966 self.tcp_port_out = 3606
967 self.udp_port_out = 3607
968 self.icmp_id_out = 3608
970 self.nat44_add_address(self.nat_addr)
971 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
972 self.tcp_port_in, self.tcp_port_out,
974 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
975 self.udp_port_in, self.udp_port_out,
977 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
978 self.icmp_id_in, self.icmp_id_out,
979 proto=IP_PROTOS.icmp)
980 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
981 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
985 pkts = self.create_stream_in(self.pg0, self.pg1)
986 self.pg0.add_stream(pkts)
987 self.pg_enable_capture(self.pg_interfaces)
989 capture = self.pg1.get_capture(len(pkts))
990 self.verify_capture_out(capture)
993 pkts = self.create_stream_out(self.pg1)
994 self.pg1.add_stream(pkts)
995 self.pg_enable_capture(self.pg_interfaces)
997 capture = self.pg0.get_capture(len(pkts))
998 self.verify_capture_in(capture, self.pg0)
1000 def test_static_with_port_out(self):
1001 """ 1:1 NAPT initialized from outside network """
1003 self.tcp_port_out = 30606
1004 self.udp_port_out = 30607
1005 self.icmp_id_out = 30608
1007 self.nat44_add_address(self.nat_addr)
1008 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1009 self.tcp_port_in, self.tcp_port_out,
1010 proto=IP_PROTOS.tcp)
1011 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1012 self.udp_port_in, self.udp_port_out,
1013 proto=IP_PROTOS.udp)
1014 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1015 self.icmp_id_in, self.icmp_id_out,
1016 proto=IP_PROTOS.icmp)
1017 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1018 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1022 pkts = self.create_stream_out(self.pg1)
1023 self.pg1.add_stream(pkts)
1024 self.pg_enable_capture(self.pg_interfaces)
1026 capture = self.pg0.get_capture(len(pkts))
1027 self.verify_capture_in(capture, self.pg0)
1030 pkts = self.create_stream_in(self.pg0, self.pg1)
1031 self.pg0.add_stream(pkts)
1032 self.pg_enable_capture(self.pg_interfaces)
1034 capture = self.pg1.get_capture(len(pkts))
1035 self.verify_capture_out(capture)
1037 def test_static_vrf_aware(self):
1038 """ 1:1 NAT VRF awareness """
1040 nat_ip1 = "10.0.0.30"
1041 nat_ip2 = "10.0.0.40"
1042 self.tcp_port_out = 6303
1043 self.udp_port_out = 6304
1044 self.icmp_id_out = 6305
1046 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1048 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1050 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1052 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1053 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1055 # inside interface VRF match NAT44 static mapping VRF
1056 pkts = self.create_stream_in(self.pg4, self.pg3)
1057 self.pg4.add_stream(pkts)
1058 self.pg_enable_capture(self.pg_interfaces)
1060 capture = self.pg3.get_capture(len(pkts))
1061 self.verify_capture_out(capture, nat_ip1, True)
1063 # inside interface VRF don't match NAT44 static mapping VRF (packets
1065 pkts = self.create_stream_in(self.pg0, self.pg3)
1066 self.pg0.add_stream(pkts)
1067 self.pg_enable_capture(self.pg_interfaces)
1069 self.pg3.assert_nothing_captured()
1071 def test_static_lb(self):
1072 """ NAT44 local service load balancing """
1073 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1076 server1 = self.pg0.remote_hosts[0]
1077 server2 = self.pg0.remote_hosts[1]
1079 locals = [{'addr': server1.ip4n,
1082 {'addr': server2.ip4n,
1086 self.nat44_add_address(self.nat_addr)
1087 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1090 local_num=len(locals),
1092 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1093 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1096 # from client to service
1097 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1098 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1099 TCP(sport=12345, dport=external_port))
1100 self.pg1.add_stream(p)
1101 self.pg_enable_capture(self.pg_interfaces)
1103 capture = self.pg0.get_capture(1)
1109 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1110 if ip.dst == server1.ip4:
1114 self.assertEqual(tcp.dport, local_port)
1115 self.check_tcp_checksum(p)
1116 self.check_ip_checksum(p)
1118 self.logger.error(ppp("Unexpected or invalid packet:", p))
1121 # from service back to client
1122 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1123 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1124 TCP(sport=local_port, dport=12345))
1125 self.pg0.add_stream(p)
1126 self.pg_enable_capture(self.pg_interfaces)
1128 capture = self.pg1.get_capture(1)
1133 self.assertEqual(ip.src, self.nat_addr)
1134 self.assertEqual(tcp.sport, external_port)
1135 self.check_tcp_checksum(p)
1136 self.check_ip_checksum(p)
1138 self.logger.error(ppp("Unexpected or invalid packet:", p))
1144 clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1146 for client in clients:
1147 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1148 IP(src=client, dst=self.nat_addr) /
1149 TCP(sport=12345, dport=external_port))
1151 self.pg1.add_stream(pkts)
1152 self.pg_enable_capture(self.pg_interfaces)
1154 capture = self.pg0.get_capture(len(pkts))
1156 if p[IP].dst == server1.ip4:
1160 self.assertTrue(server1_n > server2_n)
1162 def test_multiple_inside_interfaces(self):
1163 """ NAT44 multiple non-overlapping address space inside interfaces """
1165 self.nat44_add_address(self.nat_addr)
1166 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1167 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1168 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1171 # between two NAT44 inside interfaces (no translation)
1172 pkts = self.create_stream_in(self.pg0, self.pg1)
1173 self.pg0.add_stream(pkts)
1174 self.pg_enable_capture(self.pg_interfaces)
1176 capture = self.pg1.get_capture(len(pkts))
1177 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1179 # from NAT44 inside to interface without NAT44 feature (no translation)
1180 pkts = self.create_stream_in(self.pg0, self.pg2)
1181 self.pg0.add_stream(pkts)
1182 self.pg_enable_capture(self.pg_interfaces)
1184 capture = self.pg2.get_capture(len(pkts))
1185 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1187 # in2out 1st interface
1188 pkts = self.create_stream_in(self.pg0, self.pg3)
1189 self.pg0.add_stream(pkts)
1190 self.pg_enable_capture(self.pg_interfaces)
1192 capture = self.pg3.get_capture(len(pkts))
1193 self.verify_capture_out(capture)
1195 # out2in 1st interface
1196 pkts = self.create_stream_out(self.pg3)
1197 self.pg3.add_stream(pkts)
1198 self.pg_enable_capture(self.pg_interfaces)
1200 capture = self.pg0.get_capture(len(pkts))
1201 self.verify_capture_in(capture, self.pg0)
1203 # in2out 2nd interface
1204 pkts = self.create_stream_in(self.pg1, self.pg3)
1205 self.pg1.add_stream(pkts)
1206 self.pg_enable_capture(self.pg_interfaces)
1208 capture = self.pg3.get_capture(len(pkts))
1209 self.verify_capture_out(capture)
1211 # out2in 2nd interface
1212 pkts = self.create_stream_out(self.pg3)
1213 self.pg3.add_stream(pkts)
1214 self.pg_enable_capture(self.pg_interfaces)
1216 capture = self.pg1.get_capture(len(pkts))
1217 self.verify_capture_in(capture, self.pg1)
1219 def test_inside_overlapping_interfaces(self):
1220 """ NAT44 multiple inside interfaces with overlapping address space """
1222 static_nat_ip = "10.0.0.10"
1223 self.nat44_add_address(self.nat_addr)
1224 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1226 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1227 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1228 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1229 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1232 # between NAT44 inside interfaces with same VRF (no translation)
1233 pkts = self.create_stream_in(self.pg4, self.pg5)
1234 self.pg4.add_stream(pkts)
1235 self.pg_enable_capture(self.pg_interfaces)
1237 capture = self.pg5.get_capture(len(pkts))
1238 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1240 # between NAT44 inside interfaces with different VRF (hairpinning)
1241 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1242 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1243 TCP(sport=1234, dport=5678))
1244 self.pg4.add_stream(p)
1245 self.pg_enable_capture(self.pg_interfaces)
1247 capture = self.pg6.get_capture(1)
1252 self.assertEqual(ip.src, self.nat_addr)
1253 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1254 self.assertNotEqual(tcp.sport, 1234)
1255 self.assertEqual(tcp.dport, 5678)
1257 self.logger.error(ppp("Unexpected or invalid packet:", p))
1260 # in2out 1st interface
1261 pkts = self.create_stream_in(self.pg4, self.pg3)
1262 self.pg4.add_stream(pkts)
1263 self.pg_enable_capture(self.pg_interfaces)
1265 capture = self.pg3.get_capture(len(pkts))
1266 self.verify_capture_out(capture)
1268 # out2in 1st interface
1269 pkts = self.create_stream_out(self.pg3)
1270 self.pg3.add_stream(pkts)
1271 self.pg_enable_capture(self.pg_interfaces)
1273 capture = self.pg4.get_capture(len(pkts))
1274 self.verify_capture_in(capture, self.pg4)
1276 # in2out 2nd interface
1277 pkts = self.create_stream_in(self.pg5, self.pg3)
1278 self.pg5.add_stream(pkts)
1279 self.pg_enable_capture(self.pg_interfaces)
1281 capture = self.pg3.get_capture(len(pkts))
1282 self.verify_capture_out(capture)
1284 # out2in 2nd interface
1285 pkts = self.create_stream_out(self.pg3)
1286 self.pg3.add_stream(pkts)
1287 self.pg_enable_capture(self.pg_interfaces)
1289 capture = self.pg5.get_capture(len(pkts))
1290 self.verify_capture_in(capture, self.pg5)
1293 addresses = self.vapi.nat44_address_dump()
1294 self.assertEqual(len(addresses), 1)
1295 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1296 self.assertEqual(len(sessions), 3)
1297 for session in sessions:
1298 self.assertFalse(session.is_static)
1299 self.assertEqual(session.inside_ip_address[0:4],
1300 self.pg5.remote_ip4n)
1301 self.assertEqual(session.outside_ip_address,
1302 addresses[0].ip_address)
1303 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1304 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1305 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1306 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1307 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1308 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1309 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1310 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1311 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1313 # in2out 3rd interface
1314 pkts = self.create_stream_in(self.pg6, self.pg3)
1315 self.pg6.add_stream(pkts)
1316 self.pg_enable_capture(self.pg_interfaces)
1318 capture = self.pg3.get_capture(len(pkts))
1319 self.verify_capture_out(capture, static_nat_ip, True)
1321 # out2in 3rd interface
1322 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1323 self.pg3.add_stream(pkts)
1324 self.pg_enable_capture(self.pg_interfaces)
1326 capture = self.pg6.get_capture(len(pkts))
1327 self.verify_capture_in(capture, self.pg6)
1329 # general user and session dump verifications
1330 users = self.vapi.nat44_user_dump()
1331 self.assertTrue(len(users) >= 3)
1332 addresses = self.vapi.nat44_address_dump()
1333 self.assertEqual(len(addresses), 1)
1335 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1337 for session in sessions:
1338 self.assertEqual(user.ip_address, session.inside_ip_address)
1339 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1340 self.assertTrue(session.protocol in
1341 [IP_PROTOS.tcp, IP_PROTOS.udp,
1345 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1346 self.assertTrue(len(sessions) >= 4)
1347 for session in sessions:
1348 self.assertFalse(session.is_static)
1349 self.assertEqual(session.inside_ip_address[0:4],
1350 self.pg4.remote_ip4n)
1351 self.assertEqual(session.outside_ip_address,
1352 addresses[0].ip_address)
1355 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1356 self.assertTrue(len(sessions) >= 3)
1357 for session in sessions:
1358 self.assertTrue(session.is_static)
1359 self.assertEqual(session.inside_ip_address[0:4],
1360 self.pg6.remote_ip4n)
1361 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1362 map(int, static_nat_ip.split('.')))
1363 self.assertTrue(session.inside_port in
1364 [self.tcp_port_in, self.udp_port_in,
1367 def test_hairpinning(self):
1368 """ NAT44 hairpinning - 1:1 NAPT """
1370 host = self.pg0.remote_hosts[0]
1371 server = self.pg0.remote_hosts[1]
1374 server_in_port = 5678
1375 server_out_port = 8765
1377 self.nat44_add_address(self.nat_addr)
1378 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1379 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1381 # add static mapping for server
1382 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1383 server_in_port, server_out_port,
1384 proto=IP_PROTOS.tcp)
1386 # send packet from host to server
1387 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1388 IP(src=host.ip4, dst=self.nat_addr) /
1389 TCP(sport=host_in_port, dport=server_out_port))
1390 self.pg0.add_stream(p)
1391 self.pg_enable_capture(self.pg_interfaces)
1393 capture = self.pg0.get_capture(1)
1398 self.assertEqual(ip.src, self.nat_addr)
1399 self.assertEqual(ip.dst, server.ip4)
1400 self.assertNotEqual(tcp.sport, host_in_port)
1401 self.assertEqual(tcp.dport, server_in_port)
1402 self.check_tcp_checksum(p)
1403 host_out_port = tcp.sport
1405 self.logger.error(ppp("Unexpected or invalid packet:", p))
1408 # send reply from server to host
1409 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1410 IP(src=server.ip4, dst=self.nat_addr) /
1411 TCP(sport=server_in_port, dport=host_out_port))
1412 self.pg0.add_stream(p)
1413 self.pg_enable_capture(self.pg_interfaces)
1415 capture = self.pg0.get_capture(1)
1420 self.assertEqual(ip.src, self.nat_addr)
1421 self.assertEqual(ip.dst, host.ip4)
1422 self.assertEqual(tcp.sport, server_out_port)
1423 self.assertEqual(tcp.dport, host_in_port)
1424 self.check_tcp_checksum(p)
1426 self.logger.error(ppp("Unexpected or invalid packet:"), p)
1429 def test_hairpinning2(self):
1430 """ NAT44 hairpinning - 1:1 NAT"""
1432 server1_nat_ip = "10.0.0.10"
1433 server2_nat_ip = "10.0.0.11"
1434 host = self.pg0.remote_hosts[0]
1435 server1 = self.pg0.remote_hosts[1]
1436 server2 = self.pg0.remote_hosts[2]
1437 server_tcp_port = 22
1438 server_udp_port = 20
1440 self.nat44_add_address(self.nat_addr)
1441 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1442 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1445 # add static mapping for servers
1446 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1447 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1451 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1452 IP(src=host.ip4, dst=server1_nat_ip) /
1453 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1455 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1456 IP(src=host.ip4, dst=server1_nat_ip) /
1457 UDP(sport=self.udp_port_in, dport=server_udp_port))
1459 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1460 IP(src=host.ip4, dst=server1_nat_ip) /
1461 ICMP(id=self.icmp_id_in, type='echo-request'))
1463 self.pg0.add_stream(pkts)
1464 self.pg_enable_capture(self.pg_interfaces)
1466 capture = self.pg0.get_capture(len(pkts))
1467 for packet in capture:
1469 self.assertEqual(packet[IP].src, self.nat_addr)
1470 self.assertEqual(packet[IP].dst, server1.ip4)
1471 if packet.haslayer(TCP):
1472 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1473 self.assertEqual(packet[TCP].dport, server_tcp_port)
1474 self.tcp_port_out = packet[TCP].sport
1475 self.check_tcp_checksum(packet)
1476 elif packet.haslayer(UDP):
1477 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1478 self.assertEqual(packet[UDP].dport, server_udp_port)
1479 self.udp_port_out = packet[UDP].sport
1481 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1482 self.icmp_id_out = packet[ICMP].id
1484 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1489 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1490 IP(src=server1.ip4, dst=self.nat_addr) /
1491 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1493 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1494 IP(src=server1.ip4, dst=self.nat_addr) /
1495 UDP(sport=server_udp_port, dport=self.udp_port_out))
1497 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1498 IP(src=server1.ip4, dst=self.nat_addr) /
1499 ICMP(id=self.icmp_id_out, type='echo-reply'))
1501 self.pg0.add_stream(pkts)
1502 self.pg_enable_capture(self.pg_interfaces)
1504 capture = self.pg0.get_capture(len(pkts))
1505 for packet in capture:
1507 self.assertEqual(packet[IP].src, server1_nat_ip)
1508 self.assertEqual(packet[IP].dst, host.ip4)
1509 if packet.haslayer(TCP):
1510 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1511 self.assertEqual(packet[TCP].sport, server_tcp_port)
1512 self.check_tcp_checksum(packet)
1513 elif packet.haslayer(UDP):
1514 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1515 self.assertEqual(packet[UDP].sport, server_udp_port)
1517 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1519 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1522 # server2 to server1
1524 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1525 IP(src=server2.ip4, dst=server1_nat_ip) /
1526 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1528 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1529 IP(src=server2.ip4, dst=server1_nat_ip) /
1530 UDP(sport=self.udp_port_in, dport=server_udp_port))
1532 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1533 IP(src=server2.ip4, dst=server1_nat_ip) /
1534 ICMP(id=self.icmp_id_in, type='echo-request'))
1536 self.pg0.add_stream(pkts)
1537 self.pg_enable_capture(self.pg_interfaces)
1539 capture = self.pg0.get_capture(len(pkts))
1540 for packet in capture:
1542 self.assertEqual(packet[IP].src, server2_nat_ip)
1543 self.assertEqual(packet[IP].dst, server1.ip4)
1544 if packet.haslayer(TCP):
1545 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1546 self.assertEqual(packet[TCP].dport, server_tcp_port)
1547 self.tcp_port_out = packet[TCP].sport
1548 self.check_tcp_checksum(packet)
1549 elif packet.haslayer(UDP):
1550 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1551 self.assertEqual(packet[UDP].dport, server_udp_port)
1552 self.udp_port_out = packet[UDP].sport
1554 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1555 self.icmp_id_out = packet[ICMP].id
1557 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1560 # server1 to server2
1562 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1563 IP(src=server1.ip4, dst=server2_nat_ip) /
1564 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1566 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1567 IP(src=server1.ip4, dst=server2_nat_ip) /
1568 UDP(sport=server_udp_port, dport=self.udp_port_out))
1570 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1571 IP(src=server1.ip4, dst=server2_nat_ip) /
1572 ICMP(id=self.icmp_id_out, type='echo-reply'))
1574 self.pg0.add_stream(pkts)
1575 self.pg_enable_capture(self.pg_interfaces)
1577 capture = self.pg0.get_capture(len(pkts))
1578 for packet in capture:
1580 self.assertEqual(packet[IP].src, server1_nat_ip)
1581 self.assertEqual(packet[IP].dst, server2.ip4)
1582 if packet.haslayer(TCP):
1583 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1584 self.assertEqual(packet[TCP].sport, server_tcp_port)
1585 self.check_tcp_checksum(packet)
1586 elif packet.haslayer(UDP):
1587 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1588 self.assertEqual(packet[UDP].sport, server_udp_port)
1590 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1592 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1595 def test_max_translations_per_user(self):
1596 """ MAX translations per user - recycle the least recently used """
1598 self.nat44_add_address(self.nat_addr)
1599 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1600 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1603 # get maximum number of translations per user
1604 nat44_config = self.vapi.nat_show_config()
1606 # send more than maximum number of translations per user packets
1607 pkts_num = nat44_config.max_translations_per_user + 5
1609 for port in range(0, pkts_num):
1610 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1611 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1612 TCP(sport=1025 + port))
1614 self.pg0.add_stream(pkts)
1615 self.pg_enable_capture(self.pg_interfaces)
1618 # verify number of translated packet
1619 self.pg1.get_capture(pkts_num)
1621 def test_interface_addr(self):
1622 """ Acquire NAT44 addresses from interface """
1623 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1625 # no address in NAT pool
1626 adresses = self.vapi.nat44_address_dump()
1627 self.assertEqual(0, len(adresses))
1629 # configure interface address and check NAT address pool
1630 self.pg7.config_ip4()
1631 adresses = self.vapi.nat44_address_dump()
1632 self.assertEqual(1, len(adresses))
1633 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1635 # remove interface address and check NAT address pool
1636 self.pg7.unconfig_ip4()
1637 adresses = self.vapi.nat44_address_dump()
1638 self.assertEqual(0, len(adresses))
1640 def test_interface_addr_static_mapping(self):
1641 """ Static mapping with addresses from interface """
1642 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1643 self.nat44_add_static_mapping(
1645 external_sw_if_index=self.pg7.sw_if_index)
1647 # static mappings with external interface
1648 static_mappings = self.vapi.nat44_static_mapping_dump()
1649 self.assertEqual(1, len(static_mappings))
1650 self.assertEqual(self.pg7.sw_if_index,
1651 static_mappings[0].external_sw_if_index)
1653 # configure interface address and check static mappings
1654 self.pg7.config_ip4()
1655 static_mappings = self.vapi.nat44_static_mapping_dump()
1656 self.assertEqual(1, len(static_mappings))
1657 self.assertEqual(static_mappings[0].external_ip_address[0:4],
1658 self.pg7.local_ip4n)
1659 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1661 # remove interface address and check static mappings
1662 self.pg7.unconfig_ip4()
1663 static_mappings = self.vapi.nat44_static_mapping_dump()
1664 self.assertEqual(0, len(static_mappings))
1666 def test_ipfix_nat44_sess(self):
1667 """ IPFIX logging NAT44 session created/delted """
1668 self.ipfix_domain_id = 10
1669 self.ipfix_src_port = 20202
1670 colector_port = 30303
1671 bind_layers(UDP, IPFIX, dport=30303)
1672 self.nat44_add_address(self.nat_addr)
1673 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1674 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1676 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1677 src_address=self.pg3.local_ip4n,
1679 template_interval=10,
1680 collector_port=colector_port)
1681 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1682 src_port=self.ipfix_src_port)
1684 pkts = self.create_stream_in(self.pg0, self.pg1)
1685 self.pg0.add_stream(pkts)
1686 self.pg_enable_capture(self.pg_interfaces)
1688 capture = self.pg1.get_capture(len(pkts))
1689 self.verify_capture_out(capture)
1690 self.nat44_add_address(self.nat_addr, is_add=0)
1691 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1692 capture = self.pg3.get_capture(3)
1693 ipfix = IPFIXDecoder()
1694 # first load template
1696 self.assertTrue(p.haslayer(IPFIX))
1697 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1698 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1699 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1700 self.assertEqual(p[UDP].dport, colector_port)
1701 self.assertEqual(p[IPFIX].observationDomainID,
1702 self.ipfix_domain_id)
1703 if p.haslayer(Template):
1704 ipfix.add_template(p.getlayer(Template))
1705 # verify events in data set
1707 if p.haslayer(Data):
1708 data = ipfix.decode_data_set(p.getlayer(Set))
1709 self.verify_ipfix_nat44_ses(data)
1711 def test_ipfix_addr_exhausted(self):
1712 """ IPFIX logging NAT addresses exhausted """
1713 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1714 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1716 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1717 src_address=self.pg3.local_ip4n,
1719 template_interval=10)
1720 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1721 src_port=self.ipfix_src_port)
1723 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1724 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1726 self.pg0.add_stream(p)
1727 self.pg_enable_capture(self.pg_interfaces)
1729 capture = self.pg1.get_capture(0)
1730 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1731 capture = self.pg3.get_capture(3)
1732 ipfix = IPFIXDecoder()
1733 # first load template
1735 self.assertTrue(p.haslayer(IPFIX))
1736 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1737 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1738 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1739 self.assertEqual(p[UDP].dport, 4739)
1740 self.assertEqual(p[IPFIX].observationDomainID,
1741 self.ipfix_domain_id)
1742 if p.haslayer(Template):
1743 ipfix.add_template(p.getlayer(Template))
1744 # verify events in data set
1746 if p.haslayer(Data):
1747 data = ipfix.decode_data_set(p.getlayer(Set))
1748 self.verify_ipfix_addr_exhausted(data)
1750 def test_pool_addr_fib(self):
1751 """ NAT44 add pool addresses to FIB """
1752 static_addr = '10.0.0.10'
1753 self.nat44_add_address(self.nat_addr)
1754 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1755 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1757 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
1760 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1761 ARP(op=ARP.who_has, pdst=self.nat_addr,
1762 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1763 self.pg1.add_stream(p)
1764 self.pg_enable_capture(self.pg_interfaces)
1766 capture = self.pg1.get_capture(1)
1767 self.assertTrue(capture[0].haslayer(ARP))
1768 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1771 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1772 ARP(op=ARP.who_has, pdst=static_addr,
1773 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1774 self.pg1.add_stream(p)
1775 self.pg_enable_capture(self.pg_interfaces)
1777 capture = self.pg1.get_capture(1)
1778 self.assertTrue(capture[0].haslayer(ARP))
1779 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1781 # send ARP to non-NAT44 interface
1782 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1783 ARP(op=ARP.who_has, pdst=self.nat_addr,
1784 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1785 self.pg2.add_stream(p)
1786 self.pg_enable_capture(self.pg_interfaces)
1788 capture = self.pg1.get_capture(0)
1790 # remove addresses and verify
1791 self.nat44_add_address(self.nat_addr, is_add=0)
1792 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
1795 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1796 ARP(op=ARP.who_has, pdst=self.nat_addr,
1797 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1798 self.pg1.add_stream(p)
1799 self.pg_enable_capture(self.pg_interfaces)
1801 capture = self.pg1.get_capture(0)
1803 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1804 ARP(op=ARP.who_has, pdst=static_addr,
1805 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1806 self.pg1.add_stream(p)
1807 self.pg_enable_capture(self.pg_interfaces)
1809 capture = self.pg1.get_capture(0)
1811 def test_vrf_mode(self):
1812 """ NAT44 tenant VRF aware address pool mode """
1816 nat_ip1 = "10.0.0.10"
1817 nat_ip2 = "10.0.0.11"
1819 self.pg0.unconfig_ip4()
1820 self.pg1.unconfig_ip4()
1821 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
1822 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
1823 self.pg0.set_table_ip4(vrf_id1)
1824 self.pg1.set_table_ip4(vrf_id2)
1825 self.pg0.config_ip4()
1826 self.pg1.config_ip4()
1828 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
1829 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
1830 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1831 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1832 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1836 pkts = self.create_stream_in(self.pg0, self.pg2)
1837 self.pg0.add_stream(pkts)
1838 self.pg_enable_capture(self.pg_interfaces)
1840 capture = self.pg2.get_capture(len(pkts))
1841 self.verify_capture_out(capture, nat_ip1)
1844 pkts = self.create_stream_in(self.pg1, self.pg2)
1845 self.pg1.add_stream(pkts)
1846 self.pg_enable_capture(self.pg_interfaces)
1848 capture = self.pg2.get_capture(len(pkts))
1849 self.verify_capture_out(capture, nat_ip2)
1851 self.pg0.unconfig_ip4()
1852 self.pg1.unconfig_ip4()
1853 self.pg0.set_table_ip4(0)
1854 self.pg1.set_table_ip4(0)
1855 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
1856 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
1858 def test_vrf_feature_independent(self):
1859 """ NAT44 tenant VRF independent address pool mode """
1861 nat_ip1 = "10.0.0.10"
1862 nat_ip2 = "10.0.0.11"
1864 self.nat44_add_address(nat_ip1)
1865 self.nat44_add_address(nat_ip2)
1866 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1867 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1868 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1872 pkts = self.create_stream_in(self.pg0, self.pg2)
1873 self.pg0.add_stream(pkts)
1874 self.pg_enable_capture(self.pg_interfaces)
1876 capture = self.pg2.get_capture(len(pkts))
1877 self.verify_capture_out(capture, nat_ip1)
1880 pkts = self.create_stream_in(self.pg1, self.pg2)
1881 self.pg1.add_stream(pkts)
1882 self.pg_enable_capture(self.pg_interfaces)
1884 capture = self.pg2.get_capture(len(pkts))
1885 self.verify_capture_out(capture, nat_ip1)
1887 def test_dynamic_ipless_interfaces(self):
1888 """ NAT44 interfaces without configured IP address """
1890 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1891 mactobinary(self.pg7.remote_mac),
1892 self.pg7.remote_ip4n,
1894 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1895 mactobinary(self.pg8.remote_mac),
1896 self.pg8.remote_ip4n,
1899 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1900 dst_address_length=32,
1901 next_hop_address=self.pg7.remote_ip4n,
1902 next_hop_sw_if_index=self.pg7.sw_if_index)
1903 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1904 dst_address_length=32,
1905 next_hop_address=self.pg8.remote_ip4n,
1906 next_hop_sw_if_index=self.pg8.sw_if_index)
1908 self.nat44_add_address(self.nat_addr)
1909 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1910 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1914 pkts = self.create_stream_in(self.pg7, self.pg8)
1915 self.pg7.add_stream(pkts)
1916 self.pg_enable_capture(self.pg_interfaces)
1918 capture = self.pg8.get_capture(len(pkts))
1919 self.verify_capture_out(capture)
1922 pkts = self.create_stream_out(self.pg8, self.nat_addr)
1923 self.pg8.add_stream(pkts)
1924 self.pg_enable_capture(self.pg_interfaces)
1926 capture = self.pg7.get_capture(len(pkts))
1927 self.verify_capture_in(capture, self.pg7)
1929 def test_static_ipless_interfaces(self):
1930 """ NAT44 interfaces without configured IP address - 1:1 NAT """
1932 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1933 mactobinary(self.pg7.remote_mac),
1934 self.pg7.remote_ip4n,
1936 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1937 mactobinary(self.pg8.remote_mac),
1938 self.pg8.remote_ip4n,
1941 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1942 dst_address_length=32,
1943 next_hop_address=self.pg7.remote_ip4n,
1944 next_hop_sw_if_index=self.pg7.sw_if_index)
1945 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1946 dst_address_length=32,
1947 next_hop_address=self.pg8.remote_ip4n,
1948 next_hop_sw_if_index=self.pg8.sw_if_index)
1950 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
1951 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1952 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1956 pkts = self.create_stream_out(self.pg8)
1957 self.pg8.add_stream(pkts)
1958 self.pg_enable_capture(self.pg_interfaces)
1960 capture = self.pg7.get_capture(len(pkts))
1961 self.verify_capture_in(capture, self.pg7)
1964 pkts = self.create_stream_in(self.pg7, self.pg8)
1965 self.pg7.add_stream(pkts)
1966 self.pg_enable_capture(self.pg_interfaces)
1968 capture = self.pg8.get_capture(len(pkts))
1969 self.verify_capture_out(capture, self.nat_addr, True)
1971 def test_static_with_port_ipless_interfaces(self):
1972 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
1974 self.tcp_port_out = 30606
1975 self.udp_port_out = 30607
1976 self.icmp_id_out = 30608
1978 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1979 mactobinary(self.pg7.remote_mac),
1980 self.pg7.remote_ip4n,
1982 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1983 mactobinary(self.pg8.remote_mac),
1984 self.pg8.remote_ip4n,
1987 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1988 dst_address_length=32,
1989 next_hop_address=self.pg7.remote_ip4n,
1990 next_hop_sw_if_index=self.pg7.sw_if_index)
1991 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1992 dst_address_length=32,
1993 next_hop_address=self.pg8.remote_ip4n,
1994 next_hop_sw_if_index=self.pg8.sw_if_index)
1996 self.nat44_add_address(self.nat_addr)
1997 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1998 self.tcp_port_in, self.tcp_port_out,
1999 proto=IP_PROTOS.tcp)
2000 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2001 self.udp_port_in, self.udp_port_out,
2002 proto=IP_PROTOS.udp)
2003 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2004 self.icmp_id_in, self.icmp_id_out,
2005 proto=IP_PROTOS.icmp)
2006 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2007 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2011 pkts = self.create_stream_out(self.pg8)
2012 self.pg8.add_stream(pkts)
2013 self.pg_enable_capture(self.pg_interfaces)
2015 capture = self.pg7.get_capture(len(pkts))
2016 self.verify_capture_in(capture, self.pg7)
2019 pkts = self.create_stream_in(self.pg7, self.pg8)
2020 self.pg7.add_stream(pkts)
2021 self.pg_enable_capture(self.pg_interfaces)
2023 capture = self.pg8.get_capture(len(pkts))
2024 self.verify_capture_out(capture)
2026 def test_static_unknown_proto(self):
2027 """ 1:1 NAT translate packet with unknown protocol """
2028 nat_ip = "10.0.0.10"
2029 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2030 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2031 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2035 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2036 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2038 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2039 TCP(sport=1234, dport=1234))
2040 self.pg0.add_stream(p)
2041 self.pg_enable_capture(self.pg_interfaces)
2043 p = self.pg1.get_capture(1)
2046 self.assertEqual(packet[IP].src, nat_ip)
2047 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2048 self.assertTrue(packet.haslayer(GRE))
2049 self.check_ip_checksum(packet)
2051 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2055 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2056 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2058 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2059 TCP(sport=1234, dport=1234))
2060 self.pg1.add_stream(p)
2061 self.pg_enable_capture(self.pg_interfaces)
2063 p = self.pg0.get_capture(1)
2066 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2067 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2068 self.assertTrue(packet.haslayer(GRE))
2069 self.check_ip_checksum(packet)
2071 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2074 def test_hairpinning_static_unknown_proto(self):
2075 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2077 host = self.pg0.remote_hosts[0]
2078 server = self.pg0.remote_hosts[1]
2080 host_nat_ip = "10.0.0.10"
2081 server_nat_ip = "10.0.0.11"
2083 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2084 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2085 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2086 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2090 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2091 IP(src=host.ip4, dst=server_nat_ip) /
2093 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2094 TCP(sport=1234, dport=1234))
2095 self.pg0.add_stream(p)
2096 self.pg_enable_capture(self.pg_interfaces)
2098 p = self.pg0.get_capture(1)
2101 self.assertEqual(packet[IP].src, host_nat_ip)
2102 self.assertEqual(packet[IP].dst, server.ip4)
2103 self.assertTrue(packet.haslayer(GRE))
2104 self.check_ip_checksum(packet)
2106 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2110 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2111 IP(src=server.ip4, dst=host_nat_ip) /
2113 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2114 TCP(sport=1234, dport=1234))
2115 self.pg0.add_stream(p)
2116 self.pg_enable_capture(self.pg_interfaces)
2118 p = self.pg0.get_capture(1)
2121 self.assertEqual(packet[IP].src, server_nat_ip)
2122 self.assertEqual(packet[IP].dst, host.ip4)
2123 self.assertTrue(packet.haslayer(GRE))
2124 self.check_ip_checksum(packet)
2126 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2129 def test_unknown_proto(self):
2130 """ NAT44 translate packet with unknown protocol """
2131 self.nat44_add_address(self.nat_addr)
2132 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2133 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2137 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2138 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2139 TCP(sport=self.tcp_port_in, dport=20))
2140 self.pg0.add_stream(p)
2141 self.pg_enable_capture(self.pg_interfaces)
2143 p = self.pg1.get_capture(1)
2145 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2146 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2148 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2149 TCP(sport=1234, dport=1234))
2150 self.pg0.add_stream(p)
2151 self.pg_enable_capture(self.pg_interfaces)
2153 p = self.pg1.get_capture(1)
2156 self.assertEqual(packet[IP].src, self.nat_addr)
2157 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2158 self.assertTrue(packet.haslayer(GRE))
2159 self.check_ip_checksum(packet)
2161 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2165 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2166 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2168 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2169 TCP(sport=1234, dport=1234))
2170 self.pg1.add_stream(p)
2171 self.pg_enable_capture(self.pg_interfaces)
2173 p = self.pg0.get_capture(1)
2176 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2177 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2178 self.assertTrue(packet.haslayer(GRE))
2179 self.check_ip_checksum(packet)
2181 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2184 def test_hairpinning_unknown_proto(self):
2185 """ NAT44 translate packet with unknown protocol - hairpinning """
2186 host = self.pg0.remote_hosts[0]
2187 server = self.pg0.remote_hosts[1]
2190 server_in_port = 5678
2191 server_out_port = 8765
2192 server_nat_ip = "10.0.0.11"
2194 self.nat44_add_address(self.nat_addr)
2195 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2196 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2199 # add static mapping for server
2200 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2203 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2204 IP(src=host.ip4, dst=server_nat_ip) /
2205 TCP(sport=host_in_port, dport=server_out_port))
2206 self.pg0.add_stream(p)
2207 self.pg_enable_capture(self.pg_interfaces)
2209 capture = self.pg0.get_capture(1)
2211 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2212 IP(src=host.ip4, dst=server_nat_ip) /
2214 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2215 TCP(sport=1234, dport=1234))
2216 self.pg0.add_stream(p)
2217 self.pg_enable_capture(self.pg_interfaces)
2219 p = self.pg0.get_capture(1)
2222 self.assertEqual(packet[IP].src, self.nat_addr)
2223 self.assertEqual(packet[IP].dst, server.ip4)
2224 self.assertTrue(packet.haslayer(GRE))
2225 self.check_ip_checksum(packet)
2227 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2231 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2232 IP(src=server.ip4, dst=self.nat_addr) /
2234 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2235 TCP(sport=1234, dport=1234))
2236 self.pg0.add_stream(p)
2237 self.pg_enable_capture(self.pg_interfaces)
2239 p = self.pg0.get_capture(1)
2242 self.assertEqual(packet[IP].src, server_nat_ip)
2243 self.assertEqual(packet[IP].dst, host.ip4)
2244 self.assertTrue(packet.haslayer(GRE))
2245 self.check_ip_checksum(packet)
2247 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2250 def test_output_feature(self):
2251 """ NAT44 interface output feature (in2out postrouting) """
2252 self.nat44_add_address(self.nat_addr)
2253 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2254 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2255 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2259 pkts = self.create_stream_in(self.pg0, self.pg3)
2260 self.pg0.add_stream(pkts)
2261 self.pg_enable_capture(self.pg_interfaces)
2263 capture = self.pg3.get_capture(len(pkts))
2264 self.verify_capture_out(capture)
2267 pkts = self.create_stream_out(self.pg3)
2268 self.pg3.add_stream(pkts)
2269 self.pg_enable_capture(self.pg_interfaces)
2271 capture = self.pg0.get_capture(len(pkts))
2272 self.verify_capture_in(capture, self.pg0)
2274 # from non-NAT interface to NAT inside interface
2275 pkts = self.create_stream_in(self.pg2, self.pg0)
2276 self.pg2.add_stream(pkts)
2277 self.pg_enable_capture(self.pg_interfaces)
2279 capture = self.pg0.get_capture(len(pkts))
2280 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2282 def test_output_feature_vrf_aware(self):
2283 """ NAT44 interface output feature VRF aware (in2out postrouting) """
2284 nat_ip_vrf10 = "10.0.0.10"
2285 nat_ip_vrf20 = "10.0.0.20"
2287 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2288 dst_address_length=32,
2289 next_hop_address=self.pg3.remote_ip4n,
2290 next_hop_sw_if_index=self.pg3.sw_if_index,
2292 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2293 dst_address_length=32,
2294 next_hop_address=self.pg3.remote_ip4n,
2295 next_hop_sw_if_index=self.pg3.sw_if_index,
2298 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2299 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2300 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2301 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2302 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2306 pkts = self.create_stream_in(self.pg4, self.pg3)
2307 self.pg4.add_stream(pkts)
2308 self.pg_enable_capture(self.pg_interfaces)
2310 capture = self.pg3.get_capture(len(pkts))
2311 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2314 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2315 self.pg3.add_stream(pkts)
2316 self.pg_enable_capture(self.pg_interfaces)
2318 capture = self.pg4.get_capture(len(pkts))
2319 self.verify_capture_in(capture, self.pg4)
2322 pkts = self.create_stream_in(self.pg6, self.pg3)
2323 self.pg6.add_stream(pkts)
2324 self.pg_enable_capture(self.pg_interfaces)
2326 capture = self.pg3.get_capture(len(pkts))
2327 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2330 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2331 self.pg3.add_stream(pkts)
2332 self.pg_enable_capture(self.pg_interfaces)
2334 capture = self.pg6.get_capture(len(pkts))
2335 self.verify_capture_in(capture, self.pg6)
2337 def test_output_feature_hairpinning(self):
2338 """ NAT44 interface output feature hairpinning (in2out postrouting) """
2339 host = self.pg0.remote_hosts[0]
2340 server = self.pg0.remote_hosts[1]
2343 server_in_port = 5678
2344 server_out_port = 8765
2346 self.nat44_add_address(self.nat_addr)
2347 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2348 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2351 # add static mapping for server
2352 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2353 server_in_port, server_out_port,
2354 proto=IP_PROTOS.tcp)
2356 # send packet from host to server
2357 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2358 IP(src=host.ip4, dst=self.nat_addr) /
2359 TCP(sport=host_in_port, dport=server_out_port))
2360 self.pg0.add_stream(p)
2361 self.pg_enable_capture(self.pg_interfaces)
2363 capture = self.pg0.get_capture(1)
2368 self.assertEqual(ip.src, self.nat_addr)
2369 self.assertEqual(ip.dst, server.ip4)
2370 self.assertNotEqual(tcp.sport, host_in_port)
2371 self.assertEqual(tcp.dport, server_in_port)
2372 self.check_tcp_checksum(p)
2373 host_out_port = tcp.sport
2375 self.logger.error(ppp("Unexpected or invalid packet:", p))
2378 # send reply from server to host
2379 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2380 IP(src=server.ip4, dst=self.nat_addr) /
2381 TCP(sport=server_in_port, dport=host_out_port))
2382 self.pg0.add_stream(p)
2383 self.pg_enable_capture(self.pg_interfaces)
2385 capture = self.pg0.get_capture(1)
2390 self.assertEqual(ip.src, self.nat_addr)
2391 self.assertEqual(ip.dst, host.ip4)
2392 self.assertEqual(tcp.sport, server_out_port)
2393 self.assertEqual(tcp.dport, host_in_port)
2394 self.check_tcp_checksum(p)
2396 self.logger.error(ppp("Unexpected or invalid packet:"), p)
2399 def test_one_armed_nat44(self):
2400 """ One armed NAT44 """
2401 remote_host = self.pg9.remote_hosts[0]
2402 local_host = self.pg9.remote_hosts[1]
2405 self.nat44_add_address(self.nat_addr)
2406 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2407 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2411 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2412 IP(src=local_host.ip4, dst=remote_host.ip4) /
2413 TCP(sport=12345, dport=80))
2414 self.pg9.add_stream(p)
2415 self.pg_enable_capture(self.pg_interfaces)
2417 capture = self.pg9.get_capture(1)
2422 self.assertEqual(ip.src, self.nat_addr)
2423 self.assertEqual(ip.dst, remote_host.ip4)
2424 self.assertNotEqual(tcp.sport, 12345)
2425 external_port = tcp.sport
2426 self.assertEqual(tcp.dport, 80)
2427 self.check_tcp_checksum(p)
2428 self.check_ip_checksum(p)
2430 self.logger.error(ppp("Unexpected or invalid packet:", p))
2434 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2435 IP(src=remote_host.ip4, dst=self.nat_addr) /
2436 TCP(sport=80, dport=external_port))
2437 self.pg9.add_stream(p)
2438 self.pg_enable_capture(self.pg_interfaces)
2440 capture = self.pg9.get_capture(1)
2445 self.assertEqual(ip.src, remote_host.ip4)
2446 self.assertEqual(ip.dst, local_host.ip4)
2447 self.assertEqual(tcp.sport, 80)
2448 self.assertEqual(tcp.dport, 12345)
2449 self.check_tcp_checksum(p)
2450 self.check_ip_checksum(p)
2452 self.logger.error(ppp("Unexpected or invalid packet:", p))
2455 def test_del_session(self):
2456 """ Delete NAT44 session """
2457 self.nat44_add_address(self.nat_addr)
2458 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2459 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2462 pkts = self.create_stream_in(self.pg0, self.pg1)
2463 self.pg0.add_stream(pkts)
2464 self.pg_enable_capture(self.pg_interfaces)
2466 capture = self.pg1.get_capture(len(pkts))
2468 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2469 nsessions = len(sessions)
2471 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2472 sessions[0].inside_port,
2473 sessions[0].protocol)
2474 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2475 sessions[1].outside_port,
2476 sessions[1].protocol,
2479 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2480 self.assertEqual(nsessions - len(sessions), 2)
2483 super(TestNAT44, self).tearDown()
2484 if not self.vpp_dead:
2485 self.logger.info(self.vapi.cli("show nat44 verbose"))
2489 class TestDeterministicNAT(MethodHolder):
2490 """ Deterministic NAT Test Cases """
2493 def setUpConstants(cls):
2494 super(TestDeterministicNAT, cls).setUpConstants()
2495 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
2498 def setUpClass(cls):
2499 super(TestDeterministicNAT, cls).setUpClass()
2502 cls.tcp_port_in = 6303
2503 cls.tcp_external_port = 6303
2504 cls.udp_port_in = 6304
2505 cls.udp_external_port = 6304
2506 cls.icmp_id_in = 6305
2507 cls.nat_addr = '10.0.0.3'
2509 cls.create_pg_interfaces(range(3))
2510 cls.interfaces = list(cls.pg_interfaces)
2512 for i in cls.interfaces:
2517 cls.pg0.generate_remote_hosts(2)
2518 cls.pg0.configure_ipv4_neighbors()
2521 super(TestDeterministicNAT, cls).tearDownClass()
2524 def create_stream_in(self, in_if, out_if, ttl=64):
2526 Create packet stream for inside network
2528 :param in_if: Inside interface
2529 :param out_if: Outside interface
2530 :param ttl: TTL of generated packets
2534 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2535 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2536 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
2540 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2541 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2542 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2546 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2547 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2548 ICMP(id=self.icmp_id_in, type='echo-request'))
2553 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2555 Create packet stream for outside network
2557 :param out_if: Outside interface
2558 :param dst_ip: Destination IP address (Default use global NAT address)
2559 :param ttl: TTL of generated packets
2562 dst_ip = self.nat_addr
2565 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2566 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2567 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2571 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2572 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2573 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2577 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2578 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2579 ICMP(id=self.icmp_external_id, type='echo-reply'))
2584 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2586 Verify captured packets on outside network
2588 :param capture: Captured packets
2589 :param nat_ip: Translated IP address (Default use global NAT address)
2590 :param same_port: Sorce port number is not translated (Default False)
2591 :param packet_num: Expected number of packets (Default 3)
2594 nat_ip = self.nat_addr
2595 self.assertEqual(packet_num, len(capture))
2596 for packet in capture:
2598 self.assertEqual(packet[IP].src, nat_ip)
2599 if packet.haslayer(TCP):
2600 self.tcp_port_out = packet[TCP].sport
2601 elif packet.haslayer(UDP):
2602 self.udp_port_out = packet[UDP].sport
2604 self.icmp_external_id = packet[ICMP].id
2606 self.logger.error(ppp("Unexpected or invalid packet "
2607 "(outside network):", packet))
2610 def initiate_tcp_session(self, in_if, out_if):
2612 Initiates TCP session
2614 :param in_if: Inside interface
2615 :param out_if: Outside interface
2618 # SYN packet in->out
2619 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2620 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2621 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2624 self.pg_enable_capture(self.pg_interfaces)
2626 capture = out_if.get_capture(1)
2628 self.tcp_port_out = p[TCP].sport
2630 # SYN + ACK packet out->in
2631 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2632 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
2633 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2635 out_if.add_stream(p)
2636 self.pg_enable_capture(self.pg_interfaces)
2638 in_if.get_capture(1)
2640 # ACK packet in->out
2641 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2642 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2643 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2646 self.pg_enable_capture(self.pg_interfaces)
2648 out_if.get_capture(1)
2651 self.logger.error("TCP 3 way handshake failed")
2654 def verify_ipfix_max_entries_per_user(self, data):
2656 Verify IPFIX maximum entries per user exceeded event
2658 :param data: Decoded IPFIX data records
2660 self.assertEqual(1, len(data))
2663 self.assertEqual(ord(record[230]), 13)
2664 # natQuotaExceededEvent
2665 self.assertEqual('\x03\x00\x00\x00', record[466])
2667 self.assertEqual(self.pg0.remote_ip4n, record[8])
2669 def test_deterministic_mode(self):
2670 """ NAT plugin run deterministic mode """
2671 in_addr = '172.16.255.0'
2672 out_addr = '172.17.255.50'
2673 in_addr_t = '172.16.255.20'
2674 in_addr_n = socket.inet_aton(in_addr)
2675 out_addr_n = socket.inet_aton(out_addr)
2676 in_addr_t_n = socket.inet_aton(in_addr_t)
2680 nat_config = self.vapi.nat_show_config()
2681 self.assertEqual(1, nat_config.deterministic)
2683 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
2685 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
2686 self.assertEqual(rep1.out_addr[:4], out_addr_n)
2687 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
2688 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2690 deterministic_mappings = self.vapi.nat_det_map_dump()
2691 self.assertEqual(len(deterministic_mappings), 1)
2692 dsm = deterministic_mappings[0]
2693 self.assertEqual(in_addr_n, dsm.in_addr[:4])
2694 self.assertEqual(in_plen, dsm.in_plen)
2695 self.assertEqual(out_addr_n, dsm.out_addr[:4])
2696 self.assertEqual(out_plen, dsm.out_plen)
2698 self.clear_nat_det()
2699 deterministic_mappings = self.vapi.nat_det_map_dump()
2700 self.assertEqual(len(deterministic_mappings), 0)
2702 def test_set_timeouts(self):
2703 """ Set deterministic NAT timeouts """
2704 timeouts_before = self.vapi.nat_det_get_timeouts()
2706 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
2707 timeouts_before.tcp_established + 10,
2708 timeouts_before.tcp_transitory + 10,
2709 timeouts_before.icmp + 10)
2711 timeouts_after = self.vapi.nat_det_get_timeouts()
2713 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
2714 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
2715 self.assertNotEqual(timeouts_before.tcp_established,
2716 timeouts_after.tcp_established)
2717 self.assertNotEqual(timeouts_before.tcp_transitory,
2718 timeouts_after.tcp_transitory)
2720 def test_det_in(self):
2721 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
2723 nat_ip = "10.0.0.10"
2725 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2727 socket.inet_aton(nat_ip),
2729 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2730 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2734 pkts = self.create_stream_in(self.pg0, self.pg1)
2735 self.pg0.add_stream(pkts)
2736 self.pg_enable_capture(self.pg_interfaces)
2738 capture = self.pg1.get_capture(len(pkts))
2739 self.verify_capture_out(capture, nat_ip)
2742 pkts = self.create_stream_out(self.pg1, nat_ip)
2743 self.pg1.add_stream(pkts)
2744 self.pg_enable_capture(self.pg_interfaces)
2746 capture = self.pg0.get_capture(len(pkts))
2747 self.verify_capture_in(capture, self.pg0)
2750 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
2751 self.assertEqual(len(sessions), 3)
2755 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2756 self.assertEqual(s.in_port, self.tcp_port_in)
2757 self.assertEqual(s.out_port, self.tcp_port_out)
2758 self.assertEqual(s.ext_port, self.tcp_external_port)
2762 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2763 self.assertEqual(s.in_port, self.udp_port_in)
2764 self.assertEqual(s.out_port, self.udp_port_out)
2765 self.assertEqual(s.ext_port, self.udp_external_port)
2769 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2770 self.assertEqual(s.in_port, self.icmp_id_in)
2771 self.assertEqual(s.out_port, self.icmp_external_id)
2773 def test_multiple_users(self):
2774 """ Deterministic NAT multiple users """
2776 nat_ip = "10.0.0.10"
2778 external_port = 6303
2780 host0 = self.pg0.remote_hosts[0]
2781 host1 = self.pg0.remote_hosts[1]
2783 self.vapi.nat_det_add_del_map(host0.ip4n,
2785 socket.inet_aton(nat_ip),
2787 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2788 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2792 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2793 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2794 TCP(sport=port_in, dport=external_port))
2795 self.pg0.add_stream(p)
2796 self.pg_enable_capture(self.pg_interfaces)
2798 capture = self.pg1.get_capture(1)
2803 self.assertEqual(ip.src, nat_ip)
2804 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2805 self.assertEqual(tcp.dport, external_port)
2806 port_out0 = tcp.sport
2808 self.logger.error(ppp("Unexpected or invalid packet:", p))
2812 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2813 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2814 TCP(sport=port_in, dport=external_port))
2815 self.pg0.add_stream(p)
2816 self.pg_enable_capture(self.pg_interfaces)
2818 capture = self.pg1.get_capture(1)
2823 self.assertEqual(ip.src, nat_ip)
2824 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2825 self.assertEqual(tcp.dport, external_port)
2826 port_out1 = tcp.sport
2828 self.logger.error(ppp("Unexpected or invalid packet:", p))
2831 dms = self.vapi.nat_det_map_dump()
2832 self.assertEqual(1, len(dms))
2833 self.assertEqual(2, dms[0].ses_num)
2836 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2837 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2838 TCP(sport=external_port, dport=port_out0))
2839 self.pg1.add_stream(p)
2840 self.pg_enable_capture(self.pg_interfaces)
2842 capture = self.pg0.get_capture(1)
2847 self.assertEqual(ip.src, self.pg1.remote_ip4)
2848 self.assertEqual(ip.dst, host0.ip4)
2849 self.assertEqual(tcp.dport, port_in)
2850 self.assertEqual(tcp.sport, external_port)
2852 self.logger.error(ppp("Unexpected or invalid packet:", p))
2856 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2857 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2858 TCP(sport=external_port, dport=port_out1))
2859 self.pg1.add_stream(p)
2860 self.pg_enable_capture(self.pg_interfaces)
2862 capture = self.pg0.get_capture(1)
2867 self.assertEqual(ip.src, self.pg1.remote_ip4)
2868 self.assertEqual(ip.dst, host1.ip4)
2869 self.assertEqual(tcp.dport, port_in)
2870 self.assertEqual(tcp.sport, external_port)
2872 self.logger.error(ppp("Unexpected or invalid packet", p))
2875 # session close api test
2876 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
2878 self.pg1.remote_ip4n,
2880 dms = self.vapi.nat_det_map_dump()
2881 self.assertEqual(dms[0].ses_num, 1)
2883 self.vapi.nat_det_close_session_in(host0.ip4n,
2885 self.pg1.remote_ip4n,
2887 dms = self.vapi.nat_det_map_dump()
2888 self.assertEqual(dms[0].ses_num, 0)
2890 def test_tcp_session_close_detection_in(self):
2891 """ Deterministic NAT TCP session close from inside network """
2892 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2894 socket.inet_aton(self.nat_addr),
2896 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2897 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2900 self.initiate_tcp_session(self.pg0, self.pg1)
2902 # close the session from inside
2904 # FIN packet in -> out
2905 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2906 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2907 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2909 self.pg0.add_stream(p)
2910 self.pg_enable_capture(self.pg_interfaces)
2912 self.pg1.get_capture(1)
2916 # ACK packet out -> in
2917 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2918 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2919 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2923 # FIN packet out -> in
2924 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2925 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2926 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2930 self.pg1.add_stream(pkts)
2931 self.pg_enable_capture(self.pg_interfaces)
2933 self.pg0.get_capture(2)
2935 # ACK packet in -> out
2936 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2937 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2938 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2940 self.pg0.add_stream(p)
2941 self.pg_enable_capture(self.pg_interfaces)
2943 self.pg1.get_capture(1)
2945 # Check if deterministic NAT44 closed the session
2946 dms = self.vapi.nat_det_map_dump()
2947 self.assertEqual(0, dms[0].ses_num)
2949 self.logger.error("TCP session termination failed")
2952 def test_tcp_session_close_detection_out(self):
2953 """ Deterministic NAT TCP session close from outside network """
2954 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2956 socket.inet_aton(self.nat_addr),
2958 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2959 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2962 self.initiate_tcp_session(self.pg0, self.pg1)
2964 # close the session from outside
2966 # FIN packet out -> in
2967 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2968 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2969 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2971 self.pg1.add_stream(p)
2972 self.pg_enable_capture(self.pg_interfaces)
2974 self.pg0.get_capture(1)
2978 # ACK packet in -> out
2979 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2980 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2981 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2985 # ACK packet in -> out
2986 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2987 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2988 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2992 self.pg0.add_stream(pkts)
2993 self.pg_enable_capture(self.pg_interfaces)
2995 self.pg1.get_capture(2)
2997 # ACK packet out -> in
2998 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2999 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3000 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3002 self.pg1.add_stream(p)
3003 self.pg_enable_capture(self.pg_interfaces)
3005 self.pg0.get_capture(1)
3007 # Check if deterministic NAT44 closed the session
3008 dms = self.vapi.nat_det_map_dump()
3009 self.assertEqual(0, dms[0].ses_num)
3011 self.logger.error("TCP session termination failed")
3014 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3015 def test_session_timeout(self):
3016 """ Deterministic NAT session timeouts """
3017 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3019 socket.inet_aton(self.nat_addr),
3021 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3022 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3025 self.initiate_tcp_session(self.pg0, self.pg1)
3026 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
3027 pkts = self.create_stream_in(self.pg0, self.pg1)
3028 self.pg0.add_stream(pkts)
3029 self.pg_enable_capture(self.pg_interfaces)
3031 capture = self.pg1.get_capture(len(pkts))
3034 dms = self.vapi.nat_det_map_dump()
3035 self.assertEqual(0, dms[0].ses_num)
3037 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3038 def test_session_limit_per_user(self):
3039 """ Deterministic NAT maximum sessions per user limit """
3040 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3042 socket.inet_aton(self.nat_addr),
3044 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3045 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3047 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
3048 src_address=self.pg2.local_ip4n,
3050 template_interval=10)
3051 self.vapi.nat_ipfix()
3054 for port in range(1025, 2025):
3055 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3056 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3057 UDP(sport=port, dport=port))
3060 self.pg0.add_stream(pkts)
3061 self.pg_enable_capture(self.pg_interfaces)
3063 capture = self.pg1.get_capture(len(pkts))
3065 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3066 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3067 UDP(sport=3001, dport=3002))
3068 self.pg0.add_stream(p)
3069 self.pg_enable_capture(self.pg_interfaces)
3071 capture = self.pg1.assert_nothing_captured()
3073 # verify ICMP error packet
3074 capture = self.pg0.get_capture(1)
3076 self.assertTrue(p.haslayer(ICMP))
3078 self.assertEqual(icmp.type, 3)
3079 self.assertEqual(icmp.code, 1)
3080 self.assertTrue(icmp.haslayer(IPerror))
3081 inner_ip = icmp[IPerror]
3082 self.assertEqual(inner_ip[UDPerror].sport, 3001)
3083 self.assertEqual(inner_ip[UDPerror].dport, 3002)
3085 dms = self.vapi.nat_det_map_dump()
3087 self.assertEqual(1000, dms[0].ses_num)
3089 # verify IPFIX logging
3090 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3092 capture = self.pg2.get_capture(2)
3093 ipfix = IPFIXDecoder()
3094 # first load template
3096 self.assertTrue(p.haslayer(IPFIX))
3097 if p.haslayer(Template):
3098 ipfix.add_template(p.getlayer(Template))
3099 # verify events in data set
3101 if p.haslayer(Data):
3102 data = ipfix.decode_data_set(p.getlayer(Set))
3103 self.verify_ipfix_max_entries_per_user(data)
3105 def clear_nat_det(self):
3107 Clear deterministic NAT configuration.
3109 self.vapi.nat_ipfix(enable=0)
3110 self.vapi.nat_det_set_timeouts()
3111 deterministic_mappings = self.vapi.nat_det_map_dump()
3112 for dsm in deterministic_mappings:
3113 self.vapi.nat_det_add_del_map(dsm.in_addr,
3119 interfaces = self.vapi.nat44_interface_dump()
3120 for intf in interfaces:
3121 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
3126 super(TestDeterministicNAT, self).tearDown()
3127 if not self.vpp_dead:
3128 self.logger.info(self.vapi.cli("show nat44 detail"))
3129 self.clear_nat_det()
3132 class TestNAT64(MethodHolder):
3133 """ NAT64 Test Cases """
3136 def setUpClass(cls):
3137 super(TestNAT64, cls).setUpClass()
3140 cls.tcp_port_in = 6303
3141 cls.tcp_port_out = 6303
3142 cls.udp_port_in = 6304
3143 cls.udp_port_out = 6304
3144 cls.icmp_id_in = 6305
3145 cls.icmp_id_out = 6305
3146 cls.nat_addr = '10.0.0.3'
3147 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3149 cls.vrf1_nat_addr = '10.0.10.3'
3150 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
3153 cls.create_pg_interfaces(range(4))
3154 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
3155 cls.ip6_interfaces.append(cls.pg_interfaces[2])
3156 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
3158 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
3160 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
3162 cls.pg0.generate_remote_hosts(2)
3164 for i in cls.ip6_interfaces:
3167 i.configure_ipv6_neighbors()
3169 for i in cls.ip4_interfaces:
3175 cls.pg3.config_ip4()
3176 cls.pg3.resolve_arp()
3177 cls.pg3.config_ip6()
3178 cls.pg3.configure_ipv6_neighbors()
3181 super(TestNAT64, cls).tearDownClass()
3184 def test_pool(self):
3185 """ Add/delete address to NAT64 pool """
3186 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
3188 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
3190 addresses = self.vapi.nat64_pool_addr_dump()
3191 self.assertEqual(len(addresses), 1)
3192 self.assertEqual(addresses[0].address, nat_addr)
3194 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
3196 addresses = self.vapi.nat64_pool_addr_dump()
3197 self.assertEqual(len(addresses), 0)
3199 def test_interface(self):
3200 """ Enable/disable NAT64 feature on the interface """
3201 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3202 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3204 interfaces = self.vapi.nat64_interface_dump()
3205 self.assertEqual(len(interfaces), 2)
3208 for intf in interfaces:
3209 if intf.sw_if_index == self.pg0.sw_if_index:
3210 self.assertEqual(intf.is_inside, 1)
3212 elif intf.sw_if_index == self.pg1.sw_if_index:
3213 self.assertEqual(intf.is_inside, 0)
3215 self.assertTrue(pg0_found)
3216 self.assertTrue(pg1_found)
3218 features = self.vapi.cli("show interface features pg0")
3219 self.assertNotEqual(features.find('nat64-in2out'), -1)
3220 features = self.vapi.cli("show interface features pg1")
3221 self.assertNotEqual(features.find('nat64-out2in'), -1)
3223 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
3224 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
3226 interfaces = self.vapi.nat64_interface_dump()
3227 self.assertEqual(len(interfaces), 0)
3229 def test_static_bib(self):
3230 """ Add/delete static BIB entry """
3231 in_addr = socket.inet_pton(socket.AF_INET6,
3232 '2001:db8:85a3::8a2e:370:7334')
3233 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
3236 proto = IP_PROTOS.tcp
3238 self.vapi.nat64_add_del_static_bib(in_addr,
3243 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3248 self.assertEqual(bibe.i_addr, in_addr)
3249 self.assertEqual(bibe.o_addr, out_addr)
3250 self.assertEqual(bibe.i_port, in_port)
3251 self.assertEqual(bibe.o_port, out_port)
3252 self.assertEqual(static_bib_num, 1)
3254 self.vapi.nat64_add_del_static_bib(in_addr,
3260 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3265 self.assertEqual(static_bib_num, 0)
3267 def test_set_timeouts(self):
3268 """ Set NAT64 timeouts """
3269 # verify default values
3270 timeouts = self.vapi.nat64_get_timeouts()
3271 self.assertEqual(timeouts.udp, 300)
3272 self.assertEqual(timeouts.icmp, 60)
3273 self.assertEqual(timeouts.tcp_trans, 240)
3274 self.assertEqual(timeouts.tcp_est, 7440)
3275 self.assertEqual(timeouts.tcp_incoming_syn, 6)
3277 # set and verify custom values
3278 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
3279 tcp_est=7450, tcp_incoming_syn=10)
3280 timeouts = self.vapi.nat64_get_timeouts()
3281 self.assertEqual(timeouts.udp, 200)
3282 self.assertEqual(timeouts.icmp, 30)
3283 self.assertEqual(timeouts.tcp_trans, 250)
3284 self.assertEqual(timeouts.tcp_est, 7450)
3285 self.assertEqual(timeouts.tcp_incoming_syn, 10)
3287 def test_dynamic(self):
3288 """ NAT64 dynamic translation test """
3289 self.tcp_port_in = 6303
3290 self.udp_port_in = 6304
3291 self.icmp_id_in = 6305
3293 ses_num_start = self.nat64_get_ses_num()
3295 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3297 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3298 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3301 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3302 self.pg0.add_stream(pkts)
3303 self.pg_enable_capture(self.pg_interfaces)
3305 capture = self.pg1.get_capture(len(pkts))
3306 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3307 dst_ip=self.pg1.remote_ip4)
3310 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3311 self.pg1.add_stream(pkts)
3312 self.pg_enable_capture(self.pg_interfaces)
3314 capture = self.pg0.get_capture(len(pkts))
3315 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3316 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3319 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3320 self.pg0.add_stream(pkts)
3321 self.pg_enable_capture(self.pg_interfaces)
3323 capture = self.pg1.get_capture(len(pkts))
3324 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3325 dst_ip=self.pg1.remote_ip4)
3328 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3329 self.pg1.add_stream(pkts)
3330 self.pg_enable_capture(self.pg_interfaces)
3332 capture = self.pg0.get_capture(len(pkts))
3333 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3335 ses_num_end = self.nat64_get_ses_num()
3337 self.assertEqual(ses_num_end - ses_num_start, 3)
3339 # tenant with specific VRF
3340 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3341 self.vrf1_nat_addr_n,
3342 vrf_id=self.vrf1_id)
3343 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3345 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
3346 self.pg2.add_stream(pkts)
3347 self.pg_enable_capture(self.pg_interfaces)
3349 capture = self.pg1.get_capture(len(pkts))
3350 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3351 dst_ip=self.pg1.remote_ip4)
3353 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3354 self.pg1.add_stream(pkts)
3355 self.pg_enable_capture(self.pg_interfaces)
3357 capture = self.pg2.get_capture(len(pkts))
3358 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
3360 def test_static(self):
3361 """ NAT64 static translation test """
3362 self.tcp_port_in = 60303
3363 self.udp_port_in = 60304
3364 self.icmp_id_in = 60305
3365 self.tcp_port_out = 60303
3366 self.udp_port_out = 60304
3367 self.icmp_id_out = 60305
3369 ses_num_start = self.nat64_get_ses_num()
3371 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3373 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3374 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3376 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3381 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3386 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3393 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3394 self.pg0.add_stream(pkts)
3395 self.pg_enable_capture(self.pg_interfaces)
3397 capture = self.pg1.get_capture(len(pkts))
3398 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3399 dst_ip=self.pg1.remote_ip4, same_port=True)
3402 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3403 self.pg1.add_stream(pkts)
3404 self.pg_enable_capture(self.pg_interfaces)
3406 capture = self.pg0.get_capture(len(pkts))
3407 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3408 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3410 ses_num_end = self.nat64_get_ses_num()
3412 self.assertEqual(ses_num_end - ses_num_start, 3)
3414 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3415 def test_session_timeout(self):
3416 """ NAT64 session timeout """
3417 self.icmp_id_in = 1234
3418 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3420 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3421 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3422 self.vapi.nat64_set_timeouts(icmp=5)
3424 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3425 self.pg0.add_stream(pkts)
3426 self.pg_enable_capture(self.pg_interfaces)
3428 capture = self.pg1.get_capture(len(pkts))
3430 ses_num_before_timeout = self.nat64_get_ses_num()
3434 # ICMP session after timeout
3435 ses_num_after_timeout = self.nat64_get_ses_num()
3436 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
3438 def test_icmp_error(self):
3439 """ NAT64 ICMP Error message translation """
3440 self.tcp_port_in = 6303
3441 self.udp_port_in = 6304
3442 self.icmp_id_in = 6305
3444 ses_num_start = self.nat64_get_ses_num()
3446 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3448 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3449 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3451 # send some packets to create sessions
3452 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3453 self.pg0.add_stream(pkts)
3454 self.pg_enable_capture(self.pg_interfaces)
3456 capture_ip4 = self.pg1.get_capture(len(pkts))
3457 self.verify_capture_out(capture_ip4,
3458 nat_ip=self.nat_addr,
3459 dst_ip=self.pg1.remote_ip4)
3461 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3462 self.pg1.add_stream(pkts)
3463 self.pg_enable_capture(self.pg_interfaces)
3465 capture_ip6 = self.pg0.get_capture(len(pkts))
3466 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3467 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
3468 self.pg0.remote_ip6)
3471 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3472 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
3473 ICMPv6DestUnreach(code=1) /
3474 packet[IPv6] for packet in capture_ip6]
3475 self.pg0.add_stream(pkts)
3476 self.pg_enable_capture(self.pg_interfaces)
3478 capture = self.pg1.get_capture(len(pkts))
3479 for packet in capture:
3481 self.assertEqual(packet[IP].src, self.nat_addr)
3482 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3483 self.assertEqual(packet[ICMP].type, 3)
3484 self.assertEqual(packet[ICMP].code, 13)
3485 inner = packet[IPerror]
3486 self.assertEqual(inner.src, self.pg1.remote_ip4)
3487 self.assertEqual(inner.dst, self.nat_addr)
3488 self.check_icmp_checksum(packet)
3489 if inner.haslayer(TCPerror):
3490 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
3491 elif inner.haslayer(UDPerror):
3492 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
3494 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
3496 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3500 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3501 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3502 ICMP(type=3, code=13) /
3503 packet[IP] for packet in capture_ip4]
3504 self.pg1.add_stream(pkts)
3505 self.pg_enable_capture(self.pg_interfaces)
3507 capture = self.pg0.get_capture(len(pkts))
3508 for packet in capture:
3510 self.assertEqual(packet[IPv6].src, ip.src)
3511 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3512 icmp = packet[ICMPv6DestUnreach]
3513 self.assertEqual(icmp.code, 1)
3514 inner = icmp[IPerror6]
3515 self.assertEqual(inner.src, self.pg0.remote_ip6)
3516 self.assertEqual(inner.dst, ip.src)
3517 self.check_icmpv6_checksum(packet)
3518 if inner.haslayer(TCPerror):
3519 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
3520 elif inner.haslayer(UDPerror):
3521 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
3523 self.assertEqual(inner[ICMPv6EchoRequest].id,
3526 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3529 def test_hairpinning(self):
3530 """ NAT64 hairpinning """
3532 client = self.pg0.remote_hosts[0]
3533 server = self.pg0.remote_hosts[1]
3534 server_tcp_in_port = 22
3535 server_tcp_out_port = 4022
3536 server_udp_in_port = 23
3537 server_udp_out_port = 4023
3538 client_tcp_in_port = 1234
3539 client_udp_in_port = 1235
3540 client_tcp_out_port = 0
3541 client_udp_out_port = 0
3542 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
3543 nat_addr_ip6 = ip.src
3545 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3547 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3548 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3550 self.vapi.nat64_add_del_static_bib(server.ip6n,
3553 server_tcp_out_port,
3555 self.vapi.nat64_add_del_static_bib(server.ip6n,
3558 server_udp_out_port,
3563 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3564 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3565 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3567 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3568 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3569 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3571 self.pg0.add_stream(pkts)
3572 self.pg_enable_capture(self.pg_interfaces)
3574 capture = self.pg0.get_capture(len(pkts))
3575 for packet in capture:
3577 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3578 self.assertEqual(packet[IPv6].dst, server.ip6)
3579 if packet.haslayer(TCP):
3580 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3581 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3582 self.check_tcp_checksum(packet)
3583 client_tcp_out_port = packet[TCP].sport
3585 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3586 self.assertEqual(packet[UDP].dport, server_udp_in_port)
3587 self.check_udp_checksum(packet)
3588 client_udp_out_port = packet[UDP].sport
3590 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3595 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3596 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3597 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3599 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3600 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3601 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3603 self.pg0.add_stream(pkts)
3604 self.pg_enable_capture(self.pg_interfaces)
3606 capture = self.pg0.get_capture(len(pkts))
3607 for packet in capture:
3609 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3610 self.assertEqual(packet[IPv6].dst, client.ip6)
3611 if packet.haslayer(TCP):
3612 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3613 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3614 self.check_tcp_checksum(packet)
3616 self.assertEqual(packet[UDP].sport, server_udp_out_port)
3617 self.assertEqual(packet[UDP].dport, client_udp_in_port)
3618 self.check_udp_checksum(packet)
3620 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3625 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3626 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3627 ICMPv6DestUnreach(code=1) /
3628 packet[IPv6] for packet in capture]
3629 self.pg0.add_stream(pkts)
3630 self.pg_enable_capture(self.pg_interfaces)
3632 capture = self.pg0.get_capture(len(pkts))
3633 for packet in capture:
3635 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3636 self.assertEqual(packet[IPv6].dst, server.ip6)
3637 icmp = packet[ICMPv6DestUnreach]
3638 self.assertEqual(icmp.code, 1)
3639 inner = icmp[IPerror6]
3640 self.assertEqual(inner.src, server.ip6)
3641 self.assertEqual(inner.dst, nat_addr_ip6)
3642 self.check_icmpv6_checksum(packet)
3643 if inner.haslayer(TCPerror):
3644 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3645 self.assertEqual(inner[TCPerror].dport,
3646 client_tcp_out_port)
3648 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3649 self.assertEqual(inner[UDPerror].dport,
3650 client_udp_out_port)
3652 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3655 def test_prefix(self):
3656 """ NAT64 Network-Specific Prefix """
3658 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3660 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3661 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3662 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3663 self.vrf1_nat_addr_n,
3664 vrf_id=self.vrf1_id)
3665 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3668 global_pref64 = "2001:db8::"
3669 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
3670 global_pref64_len = 32
3671 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
3673 prefix = self.vapi.nat64_prefix_dump()
3674 self.assertEqual(len(prefix), 1)
3675 self.assertEqual(prefix[0].prefix, global_pref64_n)
3676 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
3677 self.assertEqual(prefix[0].vrf_id, 0)
3679 # Add tenant specific prefix
3680 vrf1_pref64 = "2001:db8:122:300::"
3681 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
3682 vrf1_pref64_len = 56
3683 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
3685 vrf_id=self.vrf1_id)
3686 prefix = self.vapi.nat64_prefix_dump()
3687 self.assertEqual(len(prefix), 2)
3690 pkts = self.create_stream_in_ip6(self.pg0,
3693 plen=global_pref64_len)
3694 self.pg0.add_stream(pkts)
3695 self.pg_enable_capture(self.pg_interfaces)
3697 capture = self.pg1.get_capture(len(pkts))
3698 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3699 dst_ip=self.pg1.remote_ip4)
3701 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3702 self.pg1.add_stream(pkts)
3703 self.pg_enable_capture(self.pg_interfaces)
3705 capture = self.pg0.get_capture(len(pkts))
3706 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3709 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
3711 # Tenant specific prefix
3712 pkts = self.create_stream_in_ip6(self.pg2,
3715 plen=vrf1_pref64_len)
3716 self.pg2.add_stream(pkts)
3717 self.pg_enable_capture(self.pg_interfaces)
3719 capture = self.pg1.get_capture(len(pkts))
3720 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3721 dst_ip=self.pg1.remote_ip4)
3723 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3724 self.pg1.add_stream(pkts)
3725 self.pg_enable_capture(self.pg_interfaces)
3727 capture = self.pg2.get_capture(len(pkts))
3728 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3731 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
3733 def test_unknown_proto(self):
3734 """ NAT64 translate packet with unknown protocol """
3736 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3738 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3739 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3740 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
3743 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3744 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
3745 TCP(sport=self.tcp_port_in, dport=20))
3746 self.pg0.add_stream(p)
3747 self.pg_enable_capture(self.pg_interfaces)
3749 p = self.pg1.get_capture(1)
3751 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3752 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
3754 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3755 TCP(sport=1234, dport=1234))
3756 self.pg0.add_stream(p)
3757 self.pg_enable_capture(self.pg_interfaces)
3759 p = self.pg1.get_capture(1)
3762 self.assertEqual(packet[IP].src, self.nat_addr)
3763 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3764 self.assertTrue(packet.haslayer(GRE))
3765 self.check_ip_checksum(packet)
3767 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3771 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3772 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3774 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3775 TCP(sport=1234, dport=1234))
3776 self.pg1.add_stream(p)
3777 self.pg_enable_capture(self.pg_interfaces)
3779 p = self.pg0.get_capture(1)
3782 self.assertEqual(packet[IPv6].src, remote_ip6)
3783 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3784 self.assertEqual(packet[IPv6].nh, 47)
3786 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3789 def test_hairpinning_unknown_proto(self):
3790 """ NAT64 translate packet with unknown protocol - hairpinning """
3792 client = self.pg0.remote_hosts[0]
3793 server = self.pg0.remote_hosts[1]
3794 server_tcp_in_port = 22
3795 server_tcp_out_port = 4022
3796 client_tcp_in_port = 1234
3797 client_tcp_out_port = 1235
3798 server_nat_ip = "10.0.0.100"
3799 client_nat_ip = "10.0.0.110"
3800 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
3801 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
3802 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
3803 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
3805 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
3807 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3808 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3810 self.vapi.nat64_add_del_static_bib(server.ip6n,
3813 server_tcp_out_port,
3816 self.vapi.nat64_add_del_static_bib(server.ip6n,
3822 self.vapi.nat64_add_del_static_bib(client.ip6n,
3825 client_tcp_out_port,
3829 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3830 IPv6(src=client.ip6, dst=server_nat_ip6) /
3831 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3832 self.pg0.add_stream(p)
3833 self.pg_enable_capture(self.pg_interfaces)
3835 p = self.pg0.get_capture(1)
3837 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3838 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
3840 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3841 TCP(sport=1234, dport=1234))
3842 self.pg0.add_stream(p)
3843 self.pg_enable_capture(self.pg_interfaces)
3845 p = self.pg0.get_capture(1)
3848 self.assertEqual(packet[IPv6].src, client_nat_ip6)
3849 self.assertEqual(packet[IPv6].dst, server.ip6)
3850 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3852 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3856 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3857 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
3859 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3860 TCP(sport=1234, dport=1234))
3861 self.pg0.add_stream(p)
3862 self.pg_enable_capture(self.pg_interfaces)
3864 p = self.pg0.get_capture(1)
3867 self.assertEqual(packet[IPv6].src, server_nat_ip6)
3868 self.assertEqual(packet[IPv6].dst, client.ip6)
3869 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3871 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3874 def test_one_armed_nat64(self):
3875 """ One armed NAT64 """
3877 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
3881 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3883 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
3884 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
3887 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
3888 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
3889 TCP(sport=12345, dport=80))
3890 self.pg3.add_stream(p)
3891 self.pg_enable_capture(self.pg_interfaces)
3893 capture = self.pg3.get_capture(1)
3898 self.assertEqual(ip.src, self.nat_addr)
3899 self.assertEqual(ip.dst, self.pg3.remote_ip4)
3900 self.assertNotEqual(tcp.sport, 12345)
3901 external_port = tcp.sport
3902 self.assertEqual(tcp.dport, 80)
3903 self.check_tcp_checksum(p)
3904 self.check_ip_checksum(p)
3906 self.logger.error(ppp("Unexpected or invalid packet:", p))
3910 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
3911 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
3912 TCP(sport=80, dport=external_port))
3913 self.pg3.add_stream(p)
3914 self.pg_enable_capture(self.pg_interfaces)
3916 capture = self.pg3.get_capture(1)
3921 self.assertEqual(ip.src, remote_host_ip6)
3922 self.assertEqual(ip.dst, self.pg3.remote_ip6)
3923 self.assertEqual(tcp.sport, 80)
3924 self.assertEqual(tcp.dport, 12345)
3925 self.check_tcp_checksum(p)
3927 self.logger.error(ppp("Unexpected or invalid packet:", p))
3930 def nat64_get_ses_num(self):
3932 Return number of active NAT64 sessions.
3934 st = self.vapi.nat64_st_dump()
3937 def clear_nat64(self):
3939 Clear NAT64 configuration.
3941 self.vapi.nat64_set_timeouts()
3943 interfaces = self.vapi.nat64_interface_dump()
3944 for intf in interfaces:
3945 if intf.is_inside > 1:
3946 self.vapi.nat64_add_del_interface(intf.sw_if_index,
3949 self.vapi.nat64_add_del_interface(intf.sw_if_index,
3953 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3956 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3964 bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
3967 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3975 bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
3978 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3986 adresses = self.vapi.nat64_pool_addr_dump()
3987 for addr in adresses:
3988 self.vapi.nat64_add_del_pool_addr_range(addr.address,
3993 prefixes = self.vapi.nat64_prefix_dump()
3994 for prefix in prefixes:
3995 self.vapi.nat64_add_del_prefix(prefix.prefix,
3997 vrf_id=prefix.vrf_id,
4001 super(TestNAT64, self).tearDown()
4002 if not self.vpp_dead:
4003 self.logger.info(self.vapi.cli("show nat64 pool"))
4004 self.logger.info(self.vapi.cli("show nat64 interfaces"))
4005 self.logger.info(self.vapi.cli("show nat64 prefix"))
4006 self.logger.info(self.vapi.cli("show nat64 bib all"))
4007 self.logger.info(self.vapi.cli("show nat64 session table all"))
4010 if __name__ == '__main__':
4011 unittest.main(testRunner=VppTestRunner)