7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
12 from scapy.layers.l2 import Ether, ARP, GRE
13 from scapy.data import IP_PROTOS
14 from scapy.packet import bind_layers
16 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
17 from time import sleep
18 from util import ip4_range
19 from util import mactobinary
22 class MethodHolder(VppTestCase):
23 """ NAT create capture and verify method holder """
27 super(MethodHolder, cls).setUpClass()
30 super(MethodHolder, self).tearDown()
32 def check_ip_checksum(self, pkt):
34 Check IP checksum of the packet
36 :param pkt: Packet to check IP checksum
38 new = pkt.__class__(str(pkt))
40 new = new.__class__(str(new))
41 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
43 def check_tcp_checksum(self, pkt):
45 Check TCP checksum in IP packet
47 :param pkt: Packet to check TCP checksum
49 new = pkt.__class__(str(pkt))
51 new = new.__class__(str(new))
52 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
54 def check_udp_checksum(self, pkt):
56 Check UDP checksum in IP packet
58 :param pkt: Packet to check UDP checksum
60 new = pkt.__class__(str(pkt))
62 new = new.__class__(str(new))
63 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
65 def check_icmp_errror_embedded(self, pkt):
67 Check ICMP error embeded packet checksum
69 :param pkt: Packet to check ICMP error embeded packet checksum
71 if pkt.haslayer(IPerror):
72 new = pkt.__class__(str(pkt))
73 del new['IPerror'].chksum
74 new = new.__class__(str(new))
75 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
77 if pkt.haslayer(TCPerror):
78 new = pkt.__class__(str(pkt))
79 del new['TCPerror'].chksum
80 new = new.__class__(str(new))
81 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
83 if pkt.haslayer(UDPerror):
84 if pkt['UDPerror'].chksum != 0:
85 new = pkt.__class__(str(pkt))
86 del new['UDPerror'].chksum
87 new = new.__class__(str(new))
88 self.assertEqual(new['UDPerror'].chksum,
89 pkt['UDPerror'].chksum)
91 if pkt.haslayer(ICMPerror):
92 del new['ICMPerror'].chksum
93 new = new.__class__(str(new))
94 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
96 def check_icmp_checksum(self, pkt):
98 Check ICMP checksum in IPv4 packet
100 :param pkt: Packet to check ICMP checksum
102 new = pkt.__class__(str(pkt))
103 del new['ICMP'].chksum
104 new = new.__class__(str(new))
105 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
106 if pkt.haslayer(IPerror):
107 self.check_icmp_errror_embedded(pkt)
109 def check_icmpv6_checksum(self, pkt):
111 Check ICMPv6 checksum in IPv4 packet
113 :param pkt: Packet to check ICMPv6 checksum
115 new = pkt.__class__(str(pkt))
116 if pkt.haslayer(ICMPv6DestUnreach):
117 del new['ICMPv6DestUnreach'].cksum
118 new = new.__class__(str(new))
119 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
120 pkt['ICMPv6DestUnreach'].cksum)
121 self.check_icmp_errror_embedded(pkt)
122 if pkt.haslayer(ICMPv6EchoRequest):
123 del new['ICMPv6EchoRequest'].cksum
124 new = new.__class__(str(new))
125 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
126 pkt['ICMPv6EchoRequest'].cksum)
127 if pkt.haslayer(ICMPv6EchoReply):
128 del new['ICMPv6EchoReply'].cksum
129 new = new.__class__(str(new))
130 self.assertEqual(new['ICMPv6EchoReply'].cksum,
131 pkt['ICMPv6EchoReply'].cksum)
133 def create_stream_in(self, in_if, out_if, ttl=64):
135 Create packet stream for inside network
137 :param in_if: Inside interface
138 :param out_if: Outside interface
139 :param ttl: TTL of generated packets
143 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
144 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
145 TCP(sport=self.tcp_port_in, dport=20))
149 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
150 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
151 UDP(sport=self.udp_port_in, dport=20))
155 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
156 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
157 ICMP(id=self.icmp_id_in, type='echo-request'))
162 def compose_ip6(self, ip4, pref, plen):
164 Compose IPv4-embedded IPv6 addresses
166 :param ip4: IPv4 address
167 :param pref: IPv6 prefix
168 :param plen: IPv6 prefix length
169 :returns: IPv4-embedded IPv6 addresses
171 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
172 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
187 pref_n[10] = ip4_n[3]
191 pref_n[10] = ip4_n[2]
192 pref_n[11] = ip4_n[3]
195 pref_n[10] = ip4_n[1]
196 pref_n[11] = ip4_n[2]
197 pref_n[12] = ip4_n[3]
199 pref_n[12] = ip4_n[0]
200 pref_n[13] = ip4_n[1]
201 pref_n[14] = ip4_n[2]
202 pref_n[15] = ip4_n[3]
203 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
205 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
207 Create IPv6 packet stream for inside network
209 :param in_if: Inside interface
210 :param out_if: Outside interface
211 :param ttl: Hop Limit of generated packets
212 :param pref: NAT64 prefix
213 :param plen: NAT64 prefix length
217 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
219 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
222 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
223 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
224 TCP(sport=self.tcp_port_in, dport=20))
228 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
229 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
230 UDP(sport=self.udp_port_in, dport=20))
234 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
235 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
236 ICMPv6EchoRequest(id=self.icmp_id_in))
241 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
243 Create packet stream for outside network
245 :param out_if: Outside interface
246 :param dst_ip: Destination IP address (Default use global NAT address)
247 :param ttl: TTL of generated packets
250 dst_ip = self.nat_addr
253 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
254 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
255 TCP(dport=self.tcp_port_out, sport=20))
259 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
260 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
261 UDP(dport=self.udp_port_out, sport=20))
265 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
266 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
267 ICMP(id=self.icmp_id_out, type='echo-reply'))
272 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
273 packet_num=3, dst_ip=None):
275 Verify captured packets on outside network
277 :param capture: Captured packets
278 :param nat_ip: Translated IP address (Default use global NAT address)
279 :param same_port: Sorce port number is not translated (Default False)
280 :param packet_num: Expected number of packets (Default 3)
281 :param dst_ip: Destination IP address (Default do not verify)
284 nat_ip = self.nat_addr
285 self.assertEqual(packet_num, len(capture))
286 for packet in capture:
288 self.check_ip_checksum(packet)
289 self.assertEqual(packet[IP].src, nat_ip)
290 if dst_ip is not None:
291 self.assertEqual(packet[IP].dst, dst_ip)
292 if packet.haslayer(TCP):
294 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
297 packet[TCP].sport, self.tcp_port_in)
298 self.tcp_port_out = packet[TCP].sport
299 self.check_tcp_checksum(packet)
300 elif packet.haslayer(UDP):
302 self.assertEqual(packet[UDP].sport, self.udp_port_in)
305 packet[UDP].sport, self.udp_port_in)
306 self.udp_port_out = packet[UDP].sport
309 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
311 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
312 self.icmp_id_out = packet[ICMP].id
313 self.check_icmp_checksum(packet)
315 self.logger.error(ppp("Unexpected or invalid packet "
316 "(outside network):", packet))
319 def verify_capture_in(self, capture, in_if, packet_num=3):
321 Verify captured packets on inside network
323 :param capture: Captured packets
324 :param in_if: Inside interface
325 :param packet_num: Expected number of packets (Default 3)
327 self.assertEqual(packet_num, len(capture))
328 for packet in capture:
330 self.check_ip_checksum(packet)
331 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
332 if packet.haslayer(TCP):
333 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
334 self.check_tcp_checksum(packet)
335 elif packet.haslayer(UDP):
336 self.assertEqual(packet[UDP].dport, self.udp_port_in)
338 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
339 self.check_icmp_checksum(packet)
341 self.logger.error(ppp("Unexpected or invalid packet "
342 "(inside network):", packet))
345 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
347 Verify captured IPv6 packets on inside network
349 :param capture: Captured packets
350 :param src_ip: Source IP
351 :param dst_ip: Destination IP address
352 :param packet_num: Expected number of packets (Default 3)
354 self.assertEqual(packet_num, len(capture))
355 for packet in capture:
357 self.assertEqual(packet[IPv6].src, src_ip)
358 self.assertEqual(packet[IPv6].dst, dst_ip)
359 if packet.haslayer(TCP):
360 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
361 self.check_tcp_checksum(packet)
362 elif packet.haslayer(UDP):
363 self.assertEqual(packet[UDP].dport, self.udp_port_in)
364 self.check_udp_checksum(packet)
366 self.assertEqual(packet[ICMPv6EchoReply].id,
368 self.check_icmpv6_checksum(packet)
370 self.logger.error(ppp("Unexpected or invalid packet "
371 "(inside network):", packet))
374 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
376 Verify captured packet that don't have to be translated
378 :param capture: Captured packets
379 :param ingress_if: Ingress interface
380 :param egress_if: Egress interface
382 for packet in capture:
384 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
385 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
386 if packet.haslayer(TCP):
387 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
388 elif packet.haslayer(UDP):
389 self.assertEqual(packet[UDP].sport, self.udp_port_in)
391 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
393 self.logger.error(ppp("Unexpected or invalid packet "
394 "(inside network):", packet))
397 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
398 packet_num=3, icmp_type=11):
400 Verify captured packets with ICMP errors on outside network
402 :param capture: Captured packets
403 :param src_ip: Translated IP address or IP address of VPP
404 (Default use global NAT address)
405 :param packet_num: Expected number of packets (Default 3)
406 :param icmp_type: Type of error ICMP packet
407 we are expecting (Default 11)
410 src_ip = self.nat_addr
411 self.assertEqual(packet_num, len(capture))
412 for packet in capture:
414 self.assertEqual(packet[IP].src, src_ip)
415 self.assertTrue(packet.haslayer(ICMP))
417 self.assertEqual(icmp.type, icmp_type)
418 self.assertTrue(icmp.haslayer(IPerror))
419 inner_ip = icmp[IPerror]
420 if inner_ip.haslayer(TCPerror):
421 self.assertEqual(inner_ip[TCPerror].dport,
423 elif inner_ip.haslayer(UDPerror):
424 self.assertEqual(inner_ip[UDPerror].dport,
427 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
429 self.logger.error(ppp("Unexpected or invalid packet "
430 "(outside network):", packet))
433 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
436 Verify captured packets with ICMP errors on inside network
438 :param capture: Captured packets
439 :param in_if: Inside interface
440 :param packet_num: Expected number of packets (Default 3)
441 :param icmp_type: Type of error ICMP packet
442 we are expecting (Default 11)
444 self.assertEqual(packet_num, len(capture))
445 for packet in capture:
447 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
448 self.assertTrue(packet.haslayer(ICMP))
450 self.assertEqual(icmp.type, icmp_type)
451 self.assertTrue(icmp.haslayer(IPerror))
452 inner_ip = icmp[IPerror]
453 if inner_ip.haslayer(TCPerror):
454 self.assertEqual(inner_ip[TCPerror].sport,
456 elif inner_ip.haslayer(UDPerror):
457 self.assertEqual(inner_ip[UDPerror].sport,
460 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
462 self.logger.error(ppp("Unexpected or invalid packet "
463 "(inside network):", packet))
466 def verify_ipfix_nat44_ses(self, data):
468 Verify IPFIX NAT44 session create/delete event
470 :param data: Decoded IPFIX data records
472 nat44_ses_create_num = 0
473 nat44_ses_delete_num = 0
474 self.assertEqual(6, len(data))
477 self.assertIn(ord(record[230]), [4, 5])
478 if ord(record[230]) == 4:
479 nat44_ses_create_num += 1
481 nat44_ses_delete_num += 1
483 self.assertEqual(self.pg0.remote_ip4n, record[8])
484 # postNATSourceIPv4Address
485 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
488 self.assertEqual(struct.pack("!I", 0), record[234])
489 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
490 if IP_PROTOS.icmp == ord(record[4]):
491 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
492 self.assertEqual(struct.pack("!H", self.icmp_id_out),
494 elif IP_PROTOS.tcp == ord(record[4]):
495 self.assertEqual(struct.pack("!H", self.tcp_port_in),
497 self.assertEqual(struct.pack("!H", self.tcp_port_out),
499 elif IP_PROTOS.udp == ord(record[4]):
500 self.assertEqual(struct.pack("!H", self.udp_port_in),
502 self.assertEqual(struct.pack("!H", self.udp_port_out),
505 self.fail("Invalid protocol")
506 self.assertEqual(3, nat44_ses_create_num)
507 self.assertEqual(3, nat44_ses_delete_num)
509 def verify_ipfix_addr_exhausted(self, data):
511 Verify IPFIX NAT addresses event
513 :param data: Decoded IPFIX data records
515 self.assertEqual(1, len(data))
518 self.assertEqual(ord(record[230]), 3)
520 self.assertEqual(struct.pack("!I", 0), record[283])
523 class TestNAT44(MethodHolder):
524 """ NAT44 Test Cases """
528 super(TestNAT44, cls).setUpClass()
531 cls.tcp_port_in = 6303
532 cls.tcp_port_out = 6303
533 cls.udp_port_in = 6304
534 cls.udp_port_out = 6304
535 cls.icmp_id_in = 6305
536 cls.icmp_id_out = 6305
537 cls.nat_addr = '10.0.0.3'
538 cls.ipfix_src_port = 4739
539 cls.ipfix_domain_id = 1
541 cls.create_pg_interfaces(range(10))
542 cls.interfaces = list(cls.pg_interfaces[0:4])
544 for i in cls.interfaces:
549 cls.pg0.generate_remote_hosts(3)
550 cls.pg0.configure_ipv4_neighbors()
552 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
553 cls.vapi.ip_table_add_del(10, is_add=1)
554 cls.vapi.ip_table_add_del(20, is_add=1)
556 cls.pg4._local_ip4 = "172.16.255.1"
557 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
558 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
559 cls.pg4.set_table_ip4(10)
560 cls.pg5._local_ip4 = "172.17.255.3"
561 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
562 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
563 cls.pg5.set_table_ip4(10)
564 cls.pg6._local_ip4 = "172.16.255.1"
565 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
566 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
567 cls.pg6.set_table_ip4(20)
568 for i in cls.overlapping_interfaces:
576 cls.pg9.generate_remote_hosts(2)
578 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
579 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
583 cls.pg9.resolve_arp()
584 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
585 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
586 cls.pg9.resolve_arp()
589 super(TestNAT44, cls).tearDownClass()
592 def clear_nat44(self):
594 Clear NAT44 configuration.
596 # I found no elegant way to do this
597 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
598 dst_address_length=32,
599 next_hop_address=self.pg7.remote_ip4n,
600 next_hop_sw_if_index=self.pg7.sw_if_index,
602 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
603 dst_address_length=32,
604 next_hop_address=self.pg8.remote_ip4n,
605 next_hop_sw_if_index=self.pg8.sw_if_index,
608 for intf in [self.pg7, self.pg8]:
609 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
611 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
616 if self.pg7.has_ip4_config:
617 self.pg7.unconfig_ip4()
619 interfaces = self.vapi.nat44_interface_addr_dump()
620 for intf in interfaces:
621 self.vapi.nat44_add_interface_addr(intf.sw_if_index, is_add=0)
623 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
624 domain_id=self.ipfix_domain_id)
625 self.ipfix_src_port = 4739
626 self.ipfix_domain_id = 1
628 interfaces = self.vapi.nat44_interface_dump()
629 for intf in interfaces:
630 if intf.is_inside > 1:
631 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
634 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
638 interfaces = self.vapi.nat44_interface_output_feature_dump()
639 for intf in interfaces:
640 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
644 static_mappings = self.vapi.nat44_static_mapping_dump()
645 for sm in static_mappings:
646 self.vapi.nat44_add_del_static_mapping(
648 sm.external_ip_address,
649 local_port=sm.local_port,
650 external_port=sm.external_port,
651 addr_only=sm.addr_only,
653 protocol=sm.protocol,
656 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
657 for lb_sm in lb_static_mappings:
658 self.vapi.nat44_add_del_lb_static_mapping(
667 adresses = self.vapi.nat44_address_dump()
668 for addr in adresses:
669 self.vapi.nat44_add_del_address_range(addr.ip_address,
673 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
674 local_port=0, external_port=0, vrf_id=0,
675 is_add=1, external_sw_if_index=0xFFFFFFFF,
678 Add/delete NAT44 static mapping
680 :param local_ip: Local IP address
681 :param external_ip: External IP address
682 :param local_port: Local port number (Optional)
683 :param external_port: External port number (Optional)
684 :param vrf_id: VRF ID (Default 0)
685 :param is_add: 1 if add, 0 if delete (Default add)
686 :param external_sw_if_index: External interface instead of IP address
687 :param proto: IP protocol (Mandatory if port specified)
690 if local_port and external_port:
692 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
693 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
694 self.vapi.nat44_add_del_static_mapping(
697 external_sw_if_index,
705 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
707 Add/delete NAT44 address
709 :param ip: IP address
710 :param is_add: 1 if add, 0 if delete (Default add)
712 nat_addr = socket.inet_pton(socket.AF_INET, ip)
713 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
716 def test_dynamic(self):
717 """ NAT44 dynamic translation test """
719 self.nat44_add_address(self.nat_addr)
720 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
721 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
725 pkts = self.create_stream_in(self.pg0, self.pg1)
726 self.pg0.add_stream(pkts)
727 self.pg_enable_capture(self.pg_interfaces)
729 capture = self.pg1.get_capture(len(pkts))
730 self.verify_capture_out(capture)
733 pkts = self.create_stream_out(self.pg1)
734 self.pg1.add_stream(pkts)
735 self.pg_enable_capture(self.pg_interfaces)
737 capture = self.pg0.get_capture(len(pkts))
738 self.verify_capture_in(capture, self.pg0)
740 def test_dynamic_icmp_errors_in2out_ttl_1(self):
741 """ NAT44 handling of client packets with TTL=1 """
743 self.nat44_add_address(self.nat_addr)
744 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
745 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
748 # Client side - generate traffic
749 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
750 self.pg0.add_stream(pkts)
751 self.pg_enable_capture(self.pg_interfaces)
754 # Client side - verify ICMP type 11 packets
755 capture = self.pg0.get_capture(len(pkts))
756 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
758 def test_dynamic_icmp_errors_out2in_ttl_1(self):
759 """ NAT44 handling of server packets with TTL=1 """
761 self.nat44_add_address(self.nat_addr)
762 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
763 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
766 # Client side - create sessions
767 pkts = self.create_stream_in(self.pg0, self.pg1)
768 self.pg0.add_stream(pkts)
769 self.pg_enable_capture(self.pg_interfaces)
772 # Server side - generate traffic
773 capture = self.pg1.get_capture(len(pkts))
774 self.verify_capture_out(capture)
775 pkts = self.create_stream_out(self.pg1, ttl=1)
776 self.pg1.add_stream(pkts)
777 self.pg_enable_capture(self.pg_interfaces)
780 # Server side - verify ICMP type 11 packets
781 capture = self.pg1.get_capture(len(pkts))
782 self.verify_capture_out_with_icmp_errors(capture,
783 src_ip=self.pg1.local_ip4)
785 def test_dynamic_icmp_errors_in2out_ttl_2(self):
786 """ NAT44 handling of error responses to client packets with TTL=2 """
788 self.nat44_add_address(self.nat_addr)
789 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
790 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
793 # Client side - generate traffic
794 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
795 self.pg0.add_stream(pkts)
796 self.pg_enable_capture(self.pg_interfaces)
799 # Server side - simulate ICMP type 11 response
800 capture = self.pg1.get_capture(len(pkts))
801 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
802 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
803 ICMP(type=11) / packet[IP] for packet in capture]
804 self.pg1.add_stream(pkts)
805 self.pg_enable_capture(self.pg_interfaces)
808 # Client side - verify ICMP type 11 packets
809 capture = self.pg0.get_capture(len(pkts))
810 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
812 def test_dynamic_icmp_errors_out2in_ttl_2(self):
813 """ NAT44 handling of error responses to server packets with TTL=2 """
815 self.nat44_add_address(self.nat_addr)
816 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
817 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
820 # Client side - create sessions
821 pkts = self.create_stream_in(self.pg0, self.pg1)
822 self.pg0.add_stream(pkts)
823 self.pg_enable_capture(self.pg_interfaces)
826 # Server side - generate traffic
827 capture = self.pg1.get_capture(len(pkts))
828 self.verify_capture_out(capture)
829 pkts = self.create_stream_out(self.pg1, ttl=2)
830 self.pg1.add_stream(pkts)
831 self.pg_enable_capture(self.pg_interfaces)
834 # Client side - simulate ICMP type 11 response
835 capture = self.pg0.get_capture(len(pkts))
836 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
837 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
838 ICMP(type=11) / packet[IP] for packet in capture]
839 self.pg0.add_stream(pkts)
840 self.pg_enable_capture(self.pg_interfaces)
843 # Server side - verify ICMP type 11 packets
844 capture = self.pg1.get_capture(len(pkts))
845 self.verify_capture_out_with_icmp_errors(capture)
847 def test_ping_out_interface_from_outside(self):
848 """ Ping NAT44 out interface from outside network """
850 self.nat44_add_address(self.nat_addr)
851 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
852 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
855 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
856 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
857 ICMP(id=self.icmp_id_out, type='echo-request'))
859 self.pg1.add_stream(pkts)
860 self.pg_enable_capture(self.pg_interfaces)
862 capture = self.pg1.get_capture(len(pkts))
863 self.assertEqual(1, len(capture))
866 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
867 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
868 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
869 self.assertEqual(packet[ICMP].type, 0) # echo reply
871 self.logger.error(ppp("Unexpected or invalid packet "
872 "(outside network):", packet))
875 def test_ping_internal_host_from_outside(self):
876 """ Ping internal host from outside network """
878 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
879 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
880 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
884 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
885 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
886 ICMP(id=self.icmp_id_out, type='echo-request'))
887 self.pg1.add_stream(pkt)
888 self.pg_enable_capture(self.pg_interfaces)
890 capture = self.pg0.get_capture(1)
891 self.verify_capture_in(capture, self.pg0, packet_num=1)
892 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
895 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
896 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
897 ICMP(id=self.icmp_id_in, type='echo-reply'))
898 self.pg0.add_stream(pkt)
899 self.pg_enable_capture(self.pg_interfaces)
901 capture = self.pg1.get_capture(1)
902 self.verify_capture_out(capture, same_port=True, packet_num=1)
903 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
905 def test_static_in(self):
906 """ 1:1 NAT initialized from inside network """
909 self.tcp_port_out = 6303
910 self.udp_port_out = 6304
911 self.icmp_id_out = 6305
913 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
914 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
915 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
919 pkts = self.create_stream_in(self.pg0, self.pg1)
920 self.pg0.add_stream(pkts)
921 self.pg_enable_capture(self.pg_interfaces)
923 capture = self.pg1.get_capture(len(pkts))
924 self.verify_capture_out(capture, nat_ip, True)
927 pkts = self.create_stream_out(self.pg1, nat_ip)
928 self.pg1.add_stream(pkts)
929 self.pg_enable_capture(self.pg_interfaces)
931 capture = self.pg0.get_capture(len(pkts))
932 self.verify_capture_in(capture, self.pg0)
934 def test_static_out(self):
935 """ 1:1 NAT initialized from outside network """
938 self.tcp_port_out = 6303
939 self.udp_port_out = 6304
940 self.icmp_id_out = 6305
942 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
943 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
944 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
948 pkts = self.create_stream_out(self.pg1, nat_ip)
949 self.pg1.add_stream(pkts)
950 self.pg_enable_capture(self.pg_interfaces)
952 capture = self.pg0.get_capture(len(pkts))
953 self.verify_capture_in(capture, self.pg0)
956 pkts = self.create_stream_in(self.pg0, self.pg1)
957 self.pg0.add_stream(pkts)
958 self.pg_enable_capture(self.pg_interfaces)
960 capture = self.pg1.get_capture(len(pkts))
961 self.verify_capture_out(capture, nat_ip, True)
963 def test_static_with_port_in(self):
964 """ 1:1 NAPT initialized from inside network """
966 self.tcp_port_out = 3606
967 self.udp_port_out = 3607
968 self.icmp_id_out = 3608
970 self.nat44_add_address(self.nat_addr)
971 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
972 self.tcp_port_in, self.tcp_port_out,
974 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
975 self.udp_port_in, self.udp_port_out,
977 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
978 self.icmp_id_in, self.icmp_id_out,
979 proto=IP_PROTOS.icmp)
980 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
981 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
985 pkts = self.create_stream_in(self.pg0, self.pg1)
986 self.pg0.add_stream(pkts)
987 self.pg_enable_capture(self.pg_interfaces)
989 capture = self.pg1.get_capture(len(pkts))
990 self.verify_capture_out(capture)
993 pkts = self.create_stream_out(self.pg1)
994 self.pg1.add_stream(pkts)
995 self.pg_enable_capture(self.pg_interfaces)
997 capture = self.pg0.get_capture(len(pkts))
998 self.verify_capture_in(capture, self.pg0)
1000 def test_static_with_port_out(self):
1001 """ 1:1 NAPT initialized from outside network """
1003 self.tcp_port_out = 30606
1004 self.udp_port_out = 30607
1005 self.icmp_id_out = 30608
1007 self.nat44_add_address(self.nat_addr)
1008 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1009 self.tcp_port_in, self.tcp_port_out,
1010 proto=IP_PROTOS.tcp)
1011 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1012 self.udp_port_in, self.udp_port_out,
1013 proto=IP_PROTOS.udp)
1014 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1015 self.icmp_id_in, self.icmp_id_out,
1016 proto=IP_PROTOS.icmp)
1017 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1018 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1022 pkts = self.create_stream_out(self.pg1)
1023 self.pg1.add_stream(pkts)
1024 self.pg_enable_capture(self.pg_interfaces)
1026 capture = self.pg0.get_capture(len(pkts))
1027 self.verify_capture_in(capture, self.pg0)
1030 pkts = self.create_stream_in(self.pg0, self.pg1)
1031 self.pg0.add_stream(pkts)
1032 self.pg_enable_capture(self.pg_interfaces)
1034 capture = self.pg1.get_capture(len(pkts))
1035 self.verify_capture_out(capture)
1037 def test_static_vrf_aware(self):
1038 """ 1:1 NAT VRF awareness """
1040 nat_ip1 = "10.0.0.30"
1041 nat_ip2 = "10.0.0.40"
1042 self.tcp_port_out = 6303
1043 self.udp_port_out = 6304
1044 self.icmp_id_out = 6305
1046 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1048 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1050 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1052 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1053 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1055 # inside interface VRF match NAT44 static mapping VRF
1056 pkts = self.create_stream_in(self.pg4, self.pg3)
1057 self.pg4.add_stream(pkts)
1058 self.pg_enable_capture(self.pg_interfaces)
1060 capture = self.pg3.get_capture(len(pkts))
1061 self.verify_capture_out(capture, nat_ip1, True)
1063 # inside interface VRF don't match NAT44 static mapping VRF (packets
1065 pkts = self.create_stream_in(self.pg0, self.pg3)
1066 self.pg0.add_stream(pkts)
1067 self.pg_enable_capture(self.pg_interfaces)
1069 self.pg3.assert_nothing_captured()
1071 def test_static_lb(self):
1072 """ NAT44 local service load balancing """
1073 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1076 server1 = self.pg0.remote_hosts[0]
1077 server2 = self.pg0.remote_hosts[1]
1079 locals = [{'addr': server1.ip4n,
1082 {'addr': server2.ip4n,
1086 self.nat44_add_address(self.nat_addr)
1087 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1090 local_num=len(locals),
1092 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1093 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1096 # from client to service
1097 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1098 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1099 TCP(sport=12345, dport=external_port))
1100 self.pg1.add_stream(p)
1101 self.pg_enable_capture(self.pg_interfaces)
1103 capture = self.pg0.get_capture(1)
1109 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1110 if ip.dst == server1.ip4:
1114 self.assertEqual(tcp.dport, local_port)
1115 self.check_tcp_checksum(p)
1116 self.check_ip_checksum(p)
1118 self.logger.error(ppp("Unexpected or invalid packet:", p))
1121 # from service back to client
1122 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1123 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1124 TCP(sport=local_port, dport=12345))
1125 self.pg0.add_stream(p)
1126 self.pg_enable_capture(self.pg_interfaces)
1128 capture = self.pg1.get_capture(1)
1133 self.assertEqual(ip.src, self.nat_addr)
1134 self.assertEqual(tcp.sport, external_port)
1135 self.check_tcp_checksum(p)
1136 self.check_ip_checksum(p)
1138 self.logger.error(ppp("Unexpected or invalid packet:", p))
1144 clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1146 for client in clients:
1147 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1148 IP(src=client, dst=self.nat_addr) /
1149 TCP(sport=12345, dport=external_port))
1151 self.pg1.add_stream(pkts)
1152 self.pg_enable_capture(self.pg_interfaces)
1154 capture = self.pg0.get_capture(len(pkts))
1156 if p[IP].dst == server1.ip4:
1160 self.assertTrue(server1_n > server2_n)
1162 def test_multiple_inside_interfaces(self):
1163 """ NAT44 multiple non-overlapping address space inside interfaces """
1165 self.nat44_add_address(self.nat_addr)
1166 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1167 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1168 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1171 # between two NAT44 inside interfaces (no translation)
1172 pkts = self.create_stream_in(self.pg0, self.pg1)
1173 self.pg0.add_stream(pkts)
1174 self.pg_enable_capture(self.pg_interfaces)
1176 capture = self.pg1.get_capture(len(pkts))
1177 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1179 # from NAT44 inside to interface without NAT44 feature (no translation)
1180 pkts = self.create_stream_in(self.pg0, self.pg2)
1181 self.pg0.add_stream(pkts)
1182 self.pg_enable_capture(self.pg_interfaces)
1184 capture = self.pg2.get_capture(len(pkts))
1185 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1187 # in2out 1st interface
1188 pkts = self.create_stream_in(self.pg0, self.pg3)
1189 self.pg0.add_stream(pkts)
1190 self.pg_enable_capture(self.pg_interfaces)
1192 capture = self.pg3.get_capture(len(pkts))
1193 self.verify_capture_out(capture)
1195 # out2in 1st interface
1196 pkts = self.create_stream_out(self.pg3)
1197 self.pg3.add_stream(pkts)
1198 self.pg_enable_capture(self.pg_interfaces)
1200 capture = self.pg0.get_capture(len(pkts))
1201 self.verify_capture_in(capture, self.pg0)
1203 # in2out 2nd interface
1204 pkts = self.create_stream_in(self.pg1, self.pg3)
1205 self.pg1.add_stream(pkts)
1206 self.pg_enable_capture(self.pg_interfaces)
1208 capture = self.pg3.get_capture(len(pkts))
1209 self.verify_capture_out(capture)
1211 # out2in 2nd interface
1212 pkts = self.create_stream_out(self.pg3)
1213 self.pg3.add_stream(pkts)
1214 self.pg_enable_capture(self.pg_interfaces)
1216 capture = self.pg1.get_capture(len(pkts))
1217 self.verify_capture_in(capture, self.pg1)
1219 def test_inside_overlapping_interfaces(self):
1220 """ NAT44 multiple inside interfaces with overlapping address space """
1222 static_nat_ip = "10.0.0.10"
1223 self.nat44_add_address(self.nat_addr)
1224 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1226 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1227 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1228 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1229 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1232 # between NAT44 inside interfaces with same VRF (no translation)
1233 pkts = self.create_stream_in(self.pg4, self.pg5)
1234 self.pg4.add_stream(pkts)
1235 self.pg_enable_capture(self.pg_interfaces)
1237 capture = self.pg5.get_capture(len(pkts))
1238 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1240 # between NAT44 inside interfaces with different VRF (hairpinning)
1241 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1242 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1243 TCP(sport=1234, dport=5678))
1244 self.pg4.add_stream(p)
1245 self.pg_enable_capture(self.pg_interfaces)
1247 capture = self.pg6.get_capture(1)
1252 self.assertEqual(ip.src, self.nat_addr)
1253 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1254 self.assertNotEqual(tcp.sport, 1234)
1255 self.assertEqual(tcp.dport, 5678)
1257 self.logger.error(ppp("Unexpected or invalid packet:", p))
1260 # in2out 1st interface
1261 pkts = self.create_stream_in(self.pg4, self.pg3)
1262 self.pg4.add_stream(pkts)
1263 self.pg_enable_capture(self.pg_interfaces)
1265 capture = self.pg3.get_capture(len(pkts))
1266 self.verify_capture_out(capture)
1268 # out2in 1st interface
1269 pkts = self.create_stream_out(self.pg3)
1270 self.pg3.add_stream(pkts)
1271 self.pg_enable_capture(self.pg_interfaces)
1273 capture = self.pg4.get_capture(len(pkts))
1274 self.verify_capture_in(capture, self.pg4)
1276 # in2out 2nd interface
1277 pkts = self.create_stream_in(self.pg5, self.pg3)
1278 self.pg5.add_stream(pkts)
1279 self.pg_enable_capture(self.pg_interfaces)
1281 capture = self.pg3.get_capture(len(pkts))
1282 self.verify_capture_out(capture)
1284 # out2in 2nd interface
1285 pkts = self.create_stream_out(self.pg3)
1286 self.pg3.add_stream(pkts)
1287 self.pg_enable_capture(self.pg_interfaces)
1289 capture = self.pg5.get_capture(len(pkts))
1290 self.verify_capture_in(capture, self.pg5)
1293 addresses = self.vapi.nat44_address_dump()
1294 self.assertEqual(len(addresses), 1)
1295 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1296 self.assertEqual(len(sessions), 3)
1297 for session in sessions:
1298 self.assertFalse(session.is_static)
1299 self.assertEqual(session.inside_ip_address[0:4],
1300 self.pg5.remote_ip4n)
1301 self.assertEqual(session.outside_ip_address,
1302 addresses[0].ip_address)
1303 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1304 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1305 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1306 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1307 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1308 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1309 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1310 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1311 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1313 # in2out 3rd interface
1314 pkts = self.create_stream_in(self.pg6, self.pg3)
1315 self.pg6.add_stream(pkts)
1316 self.pg_enable_capture(self.pg_interfaces)
1318 capture = self.pg3.get_capture(len(pkts))
1319 self.verify_capture_out(capture, static_nat_ip, True)
1321 # out2in 3rd interface
1322 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1323 self.pg3.add_stream(pkts)
1324 self.pg_enable_capture(self.pg_interfaces)
1326 capture = self.pg6.get_capture(len(pkts))
1327 self.verify_capture_in(capture, self.pg6)
1329 # general user and session dump verifications
1330 users = self.vapi.nat44_user_dump()
1331 self.assertTrue(len(users) >= 3)
1332 addresses = self.vapi.nat44_address_dump()
1333 self.assertEqual(len(addresses), 1)
1335 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1337 for session in sessions:
1338 self.assertEqual(user.ip_address, session.inside_ip_address)
1339 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1340 self.assertTrue(session.protocol in
1341 [IP_PROTOS.tcp, IP_PROTOS.udp,
1345 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1346 self.assertTrue(len(sessions) >= 4)
1347 for session in sessions:
1348 self.assertFalse(session.is_static)
1349 self.assertEqual(session.inside_ip_address[0:4],
1350 self.pg4.remote_ip4n)
1351 self.assertEqual(session.outside_ip_address,
1352 addresses[0].ip_address)
1355 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1356 self.assertTrue(len(sessions) >= 3)
1357 for session in sessions:
1358 self.assertTrue(session.is_static)
1359 self.assertEqual(session.inside_ip_address[0:4],
1360 self.pg6.remote_ip4n)
1361 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1362 map(int, static_nat_ip.split('.')))
1363 self.assertTrue(session.inside_port in
1364 [self.tcp_port_in, self.udp_port_in,
1367 def test_hairpinning(self):
1368 """ NAT44 hairpinning - 1:1 NAPT """
1370 host = self.pg0.remote_hosts[0]
1371 server = self.pg0.remote_hosts[1]
1374 server_in_port = 5678
1375 server_out_port = 8765
1377 self.nat44_add_address(self.nat_addr)
1378 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1379 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1381 # add static mapping for server
1382 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1383 server_in_port, server_out_port,
1384 proto=IP_PROTOS.tcp)
1386 # send packet from host to server
1387 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1388 IP(src=host.ip4, dst=self.nat_addr) /
1389 TCP(sport=host_in_port, dport=server_out_port))
1390 self.pg0.add_stream(p)
1391 self.pg_enable_capture(self.pg_interfaces)
1393 capture = self.pg0.get_capture(1)
1398 self.assertEqual(ip.src, self.nat_addr)
1399 self.assertEqual(ip.dst, server.ip4)
1400 self.assertNotEqual(tcp.sport, host_in_port)
1401 self.assertEqual(tcp.dport, server_in_port)
1402 self.check_tcp_checksum(p)
1403 host_out_port = tcp.sport
1405 self.logger.error(ppp("Unexpected or invalid packet:", p))
1408 # send reply from server to host
1409 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1410 IP(src=server.ip4, dst=self.nat_addr) /
1411 TCP(sport=server_in_port, dport=host_out_port))
1412 self.pg0.add_stream(p)
1413 self.pg_enable_capture(self.pg_interfaces)
1415 capture = self.pg0.get_capture(1)
1420 self.assertEqual(ip.src, self.nat_addr)
1421 self.assertEqual(ip.dst, host.ip4)
1422 self.assertEqual(tcp.sport, server_out_port)
1423 self.assertEqual(tcp.dport, host_in_port)
1424 self.check_tcp_checksum(p)
1426 self.logger.error(ppp("Unexpected or invalid packet:"), p)
1429 def test_hairpinning2(self):
1430 """ NAT44 hairpinning - 1:1 NAT"""
1432 server1_nat_ip = "10.0.0.10"
1433 server2_nat_ip = "10.0.0.11"
1434 host = self.pg0.remote_hosts[0]
1435 server1 = self.pg0.remote_hosts[1]
1436 server2 = self.pg0.remote_hosts[2]
1437 server_tcp_port = 22
1438 server_udp_port = 20
1440 self.nat44_add_address(self.nat_addr)
1441 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1442 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1445 # add static mapping for servers
1446 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1447 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1451 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1452 IP(src=host.ip4, dst=server1_nat_ip) /
1453 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1455 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1456 IP(src=host.ip4, dst=server1_nat_ip) /
1457 UDP(sport=self.udp_port_in, dport=server_udp_port))
1459 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1460 IP(src=host.ip4, dst=server1_nat_ip) /
1461 ICMP(id=self.icmp_id_in, type='echo-request'))
1463 self.pg0.add_stream(pkts)
1464 self.pg_enable_capture(self.pg_interfaces)
1466 capture = self.pg0.get_capture(len(pkts))
1467 for packet in capture:
1469 self.assertEqual(packet[IP].src, self.nat_addr)
1470 self.assertEqual(packet[IP].dst, server1.ip4)
1471 if packet.haslayer(TCP):
1472 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1473 self.assertEqual(packet[TCP].dport, server_tcp_port)
1474 self.tcp_port_out = packet[TCP].sport
1475 self.check_tcp_checksum(packet)
1476 elif packet.haslayer(UDP):
1477 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1478 self.assertEqual(packet[UDP].dport, server_udp_port)
1479 self.udp_port_out = packet[UDP].sport
1481 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1482 self.icmp_id_out = packet[ICMP].id
1484 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1489 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1490 IP(src=server1.ip4, dst=self.nat_addr) /
1491 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1493 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1494 IP(src=server1.ip4, dst=self.nat_addr) /
1495 UDP(sport=server_udp_port, dport=self.udp_port_out))
1497 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1498 IP(src=server1.ip4, dst=self.nat_addr) /
1499 ICMP(id=self.icmp_id_out, type='echo-reply'))
1501 self.pg0.add_stream(pkts)
1502 self.pg_enable_capture(self.pg_interfaces)
1504 capture = self.pg0.get_capture(len(pkts))
1505 for packet in capture:
1507 self.assertEqual(packet[IP].src, server1_nat_ip)
1508 self.assertEqual(packet[IP].dst, host.ip4)
1509 if packet.haslayer(TCP):
1510 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1511 self.assertEqual(packet[TCP].sport, server_tcp_port)
1512 self.check_tcp_checksum(packet)
1513 elif packet.haslayer(UDP):
1514 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1515 self.assertEqual(packet[UDP].sport, server_udp_port)
1517 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1519 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1522 # server2 to server1
1524 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1525 IP(src=server2.ip4, dst=server1_nat_ip) /
1526 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1528 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1529 IP(src=server2.ip4, dst=server1_nat_ip) /
1530 UDP(sport=self.udp_port_in, dport=server_udp_port))
1532 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1533 IP(src=server2.ip4, dst=server1_nat_ip) /
1534 ICMP(id=self.icmp_id_in, type='echo-request'))
1536 self.pg0.add_stream(pkts)
1537 self.pg_enable_capture(self.pg_interfaces)
1539 capture = self.pg0.get_capture(len(pkts))
1540 for packet in capture:
1542 self.assertEqual(packet[IP].src, server2_nat_ip)
1543 self.assertEqual(packet[IP].dst, server1.ip4)
1544 if packet.haslayer(TCP):
1545 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1546 self.assertEqual(packet[TCP].dport, server_tcp_port)
1547 self.tcp_port_out = packet[TCP].sport
1548 self.check_tcp_checksum(packet)
1549 elif packet.haslayer(UDP):
1550 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1551 self.assertEqual(packet[UDP].dport, server_udp_port)
1552 self.udp_port_out = packet[UDP].sport
1554 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1555 self.icmp_id_out = packet[ICMP].id
1557 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1560 # server1 to server2
1562 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1563 IP(src=server1.ip4, dst=server2_nat_ip) /
1564 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1566 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1567 IP(src=server1.ip4, dst=server2_nat_ip) /
1568 UDP(sport=server_udp_port, dport=self.udp_port_out))
1570 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1571 IP(src=server1.ip4, dst=server2_nat_ip) /
1572 ICMP(id=self.icmp_id_out, type='echo-reply'))
1574 self.pg0.add_stream(pkts)
1575 self.pg_enable_capture(self.pg_interfaces)
1577 capture = self.pg0.get_capture(len(pkts))
1578 for packet in capture:
1580 self.assertEqual(packet[IP].src, server1_nat_ip)
1581 self.assertEqual(packet[IP].dst, server2.ip4)
1582 if packet.haslayer(TCP):
1583 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1584 self.assertEqual(packet[TCP].sport, server_tcp_port)
1585 self.check_tcp_checksum(packet)
1586 elif packet.haslayer(UDP):
1587 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1588 self.assertEqual(packet[UDP].sport, server_udp_port)
1590 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1592 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1595 def test_max_translations_per_user(self):
1596 """ MAX translations per user - recycle the least recently used """
1598 self.nat44_add_address(self.nat_addr)
1599 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1600 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1603 # get maximum number of translations per user
1604 nat44_config = self.vapi.nat_show_config()
1606 # send more than maximum number of translations per user packets
1607 pkts_num = nat44_config.max_translations_per_user + 5
1609 for port in range(0, pkts_num):
1610 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1611 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1612 TCP(sport=1025 + port))
1614 self.pg0.add_stream(pkts)
1615 self.pg_enable_capture(self.pg_interfaces)
1618 # verify number of translated packet
1619 self.pg1.get_capture(pkts_num)
1621 def test_interface_addr(self):
1622 """ Acquire NAT44 addresses from interface """
1623 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1625 # no address in NAT pool
1626 adresses = self.vapi.nat44_address_dump()
1627 self.assertEqual(0, len(adresses))
1629 # configure interface address and check NAT address pool
1630 self.pg7.config_ip4()
1631 adresses = self.vapi.nat44_address_dump()
1632 self.assertEqual(1, len(adresses))
1633 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1635 # remove interface address and check NAT address pool
1636 self.pg7.unconfig_ip4()
1637 adresses = self.vapi.nat44_address_dump()
1638 self.assertEqual(0, len(adresses))
1640 def test_interface_addr_static_mapping(self):
1641 """ Static mapping with addresses from interface """
1642 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1643 self.nat44_add_static_mapping(
1645 external_sw_if_index=self.pg7.sw_if_index)
1647 # static mappings with external interface
1648 static_mappings = self.vapi.nat44_static_mapping_dump()
1649 self.assertEqual(1, len(static_mappings))
1650 self.assertEqual(self.pg7.sw_if_index,
1651 static_mappings[0].external_sw_if_index)
1653 # configure interface address and check static mappings
1654 self.pg7.config_ip4()
1655 static_mappings = self.vapi.nat44_static_mapping_dump()
1656 self.assertEqual(1, len(static_mappings))
1657 self.assertEqual(static_mappings[0].external_ip_address[0:4],
1658 self.pg7.local_ip4n)
1659 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1661 # remove interface address and check static mappings
1662 self.pg7.unconfig_ip4()
1663 static_mappings = self.vapi.nat44_static_mapping_dump()
1664 self.assertEqual(0, len(static_mappings))
1666 def test_ipfix_nat44_sess(self):
1667 """ IPFIX logging NAT44 session created/delted """
1668 self.ipfix_domain_id = 10
1669 self.ipfix_src_port = 20202
1670 colector_port = 30303
1671 bind_layers(UDP, IPFIX, dport=30303)
1672 self.nat44_add_address(self.nat_addr)
1673 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1674 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1676 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1677 src_address=self.pg3.local_ip4n,
1679 template_interval=10,
1680 collector_port=colector_port)
1681 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1682 src_port=self.ipfix_src_port)
1684 pkts = self.create_stream_in(self.pg0, self.pg1)
1685 self.pg0.add_stream(pkts)
1686 self.pg_enable_capture(self.pg_interfaces)
1688 capture = self.pg1.get_capture(len(pkts))
1689 self.verify_capture_out(capture)
1690 self.nat44_add_address(self.nat_addr, is_add=0)
1691 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1692 capture = self.pg3.get_capture(3)
1693 ipfix = IPFIXDecoder()
1694 # first load template
1696 self.assertTrue(p.haslayer(IPFIX))
1697 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1698 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1699 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1700 self.assertEqual(p[UDP].dport, colector_port)
1701 self.assertEqual(p[IPFIX].observationDomainID,
1702 self.ipfix_domain_id)
1703 if p.haslayer(Template):
1704 ipfix.add_template(p.getlayer(Template))
1705 # verify events in data set
1707 if p.haslayer(Data):
1708 data = ipfix.decode_data_set(p.getlayer(Set))
1709 self.verify_ipfix_nat44_ses(data)
1711 def test_ipfix_addr_exhausted(self):
1712 """ IPFIX logging NAT addresses exhausted """
1713 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1714 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1716 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1717 src_address=self.pg3.local_ip4n,
1719 template_interval=10)
1720 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1721 src_port=self.ipfix_src_port)
1723 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1724 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1726 self.pg0.add_stream(p)
1727 self.pg_enable_capture(self.pg_interfaces)
1729 capture = self.pg1.get_capture(0)
1730 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1731 capture = self.pg3.get_capture(3)
1732 ipfix = IPFIXDecoder()
1733 # first load template
1735 self.assertTrue(p.haslayer(IPFIX))
1736 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1737 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1738 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1739 self.assertEqual(p[UDP].dport, 4739)
1740 self.assertEqual(p[IPFIX].observationDomainID,
1741 self.ipfix_domain_id)
1742 if p.haslayer(Template):
1743 ipfix.add_template(p.getlayer(Template))
1744 # verify events in data set
1746 if p.haslayer(Data):
1747 data = ipfix.decode_data_set(p.getlayer(Set))
1748 self.verify_ipfix_addr_exhausted(data)
1750 def test_pool_addr_fib(self):
1751 """ NAT44 add pool addresses to FIB """
1752 static_addr = '10.0.0.10'
1753 self.nat44_add_address(self.nat_addr)
1754 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1755 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1757 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
1760 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1761 ARP(op=ARP.who_has, pdst=self.nat_addr,
1762 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1763 self.pg1.add_stream(p)
1764 self.pg_enable_capture(self.pg_interfaces)
1766 capture = self.pg1.get_capture(1)
1767 self.assertTrue(capture[0].haslayer(ARP))
1768 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1771 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1772 ARP(op=ARP.who_has, pdst=static_addr,
1773 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1774 self.pg1.add_stream(p)
1775 self.pg_enable_capture(self.pg_interfaces)
1777 capture = self.pg1.get_capture(1)
1778 self.assertTrue(capture[0].haslayer(ARP))
1779 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1781 # send ARP to non-NAT44 interface
1782 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1783 ARP(op=ARP.who_has, pdst=self.nat_addr,
1784 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1785 self.pg2.add_stream(p)
1786 self.pg_enable_capture(self.pg_interfaces)
1788 capture = self.pg1.get_capture(0)
1790 # remove addresses and verify
1791 self.nat44_add_address(self.nat_addr, is_add=0)
1792 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
1795 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1796 ARP(op=ARP.who_has, pdst=self.nat_addr,
1797 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1798 self.pg1.add_stream(p)
1799 self.pg_enable_capture(self.pg_interfaces)
1801 capture = self.pg1.get_capture(0)
1803 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1804 ARP(op=ARP.who_has, pdst=static_addr,
1805 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1806 self.pg1.add_stream(p)
1807 self.pg_enable_capture(self.pg_interfaces)
1809 capture = self.pg1.get_capture(0)
1811 def test_vrf_mode(self):
1812 """ NAT44 tenant VRF aware address pool mode """
1816 nat_ip1 = "10.0.0.10"
1817 nat_ip2 = "10.0.0.11"
1819 self.pg0.unconfig_ip4()
1820 self.pg1.unconfig_ip4()
1821 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
1822 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
1823 self.pg0.set_table_ip4(vrf_id1)
1824 self.pg1.set_table_ip4(vrf_id2)
1825 self.pg0.config_ip4()
1826 self.pg1.config_ip4()
1828 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
1829 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
1830 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1831 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1832 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1836 pkts = self.create_stream_in(self.pg0, self.pg2)
1837 self.pg0.add_stream(pkts)
1838 self.pg_enable_capture(self.pg_interfaces)
1840 capture = self.pg2.get_capture(len(pkts))
1841 self.verify_capture_out(capture, nat_ip1)
1844 pkts = self.create_stream_in(self.pg1, self.pg2)
1845 self.pg1.add_stream(pkts)
1846 self.pg_enable_capture(self.pg_interfaces)
1848 capture = self.pg2.get_capture(len(pkts))
1849 self.verify_capture_out(capture, nat_ip2)
1851 self.pg0.unconfig_ip4()
1852 self.pg1.unconfig_ip4()
1853 self.pg0.set_table_ip4(0)
1854 self.pg1.set_table_ip4(0)
1855 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
1856 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
1858 def test_vrf_feature_independent(self):
1859 """ NAT44 tenant VRF independent address pool mode """
1861 nat_ip1 = "10.0.0.10"
1862 nat_ip2 = "10.0.0.11"
1864 self.nat44_add_address(nat_ip1)
1865 self.nat44_add_address(nat_ip2)
1866 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1867 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1868 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1872 pkts = self.create_stream_in(self.pg0, self.pg2)
1873 self.pg0.add_stream(pkts)
1874 self.pg_enable_capture(self.pg_interfaces)
1876 capture = self.pg2.get_capture(len(pkts))
1877 self.verify_capture_out(capture, nat_ip1)
1880 pkts = self.create_stream_in(self.pg1, self.pg2)
1881 self.pg1.add_stream(pkts)
1882 self.pg_enable_capture(self.pg_interfaces)
1884 capture = self.pg2.get_capture(len(pkts))
1885 self.verify_capture_out(capture, nat_ip1)
1887 def test_dynamic_ipless_interfaces(self):
1888 """ NAT44 interfaces without configured IP address """
1890 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1891 mactobinary(self.pg7.remote_mac),
1892 self.pg7.remote_ip4n,
1894 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1895 mactobinary(self.pg8.remote_mac),
1896 self.pg8.remote_ip4n,
1899 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1900 dst_address_length=32,
1901 next_hop_address=self.pg7.remote_ip4n,
1902 next_hop_sw_if_index=self.pg7.sw_if_index)
1903 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1904 dst_address_length=32,
1905 next_hop_address=self.pg8.remote_ip4n,
1906 next_hop_sw_if_index=self.pg8.sw_if_index)
1908 self.nat44_add_address(self.nat_addr)
1909 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1910 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1914 pkts = self.create_stream_in(self.pg7, self.pg8)
1915 self.pg7.add_stream(pkts)
1916 self.pg_enable_capture(self.pg_interfaces)
1918 capture = self.pg8.get_capture(len(pkts))
1919 self.verify_capture_out(capture)
1922 pkts = self.create_stream_out(self.pg8, self.nat_addr)
1923 self.pg8.add_stream(pkts)
1924 self.pg_enable_capture(self.pg_interfaces)
1926 capture = self.pg7.get_capture(len(pkts))
1927 self.verify_capture_in(capture, self.pg7)
1929 def test_static_ipless_interfaces(self):
1930 """ NAT44 interfaces without configured IP address - 1:1 NAT """
1932 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1933 mactobinary(self.pg7.remote_mac),
1934 self.pg7.remote_ip4n,
1936 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1937 mactobinary(self.pg8.remote_mac),
1938 self.pg8.remote_ip4n,
1941 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1942 dst_address_length=32,
1943 next_hop_address=self.pg7.remote_ip4n,
1944 next_hop_sw_if_index=self.pg7.sw_if_index)
1945 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1946 dst_address_length=32,
1947 next_hop_address=self.pg8.remote_ip4n,
1948 next_hop_sw_if_index=self.pg8.sw_if_index)
1950 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
1951 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1952 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1956 pkts = self.create_stream_out(self.pg8)
1957 self.pg8.add_stream(pkts)
1958 self.pg_enable_capture(self.pg_interfaces)
1960 capture = self.pg7.get_capture(len(pkts))
1961 self.verify_capture_in(capture, self.pg7)
1964 pkts = self.create_stream_in(self.pg7, self.pg8)
1965 self.pg7.add_stream(pkts)
1966 self.pg_enable_capture(self.pg_interfaces)
1968 capture = self.pg8.get_capture(len(pkts))
1969 self.verify_capture_out(capture, self.nat_addr, True)
1971 def test_static_with_port_ipless_interfaces(self):
1972 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
1974 self.tcp_port_out = 30606
1975 self.udp_port_out = 30607
1976 self.icmp_id_out = 30608
1978 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1979 mactobinary(self.pg7.remote_mac),
1980 self.pg7.remote_ip4n,
1982 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1983 mactobinary(self.pg8.remote_mac),
1984 self.pg8.remote_ip4n,
1987 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1988 dst_address_length=32,
1989 next_hop_address=self.pg7.remote_ip4n,
1990 next_hop_sw_if_index=self.pg7.sw_if_index)
1991 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1992 dst_address_length=32,
1993 next_hop_address=self.pg8.remote_ip4n,
1994 next_hop_sw_if_index=self.pg8.sw_if_index)
1996 self.nat44_add_address(self.nat_addr)
1997 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1998 self.tcp_port_in, self.tcp_port_out,
1999 proto=IP_PROTOS.tcp)
2000 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2001 self.udp_port_in, self.udp_port_out,
2002 proto=IP_PROTOS.udp)
2003 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2004 self.icmp_id_in, self.icmp_id_out,
2005 proto=IP_PROTOS.icmp)
2006 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2007 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2011 pkts = self.create_stream_out(self.pg8)
2012 self.pg8.add_stream(pkts)
2013 self.pg_enable_capture(self.pg_interfaces)
2015 capture = self.pg7.get_capture(len(pkts))
2016 self.verify_capture_in(capture, self.pg7)
2019 pkts = self.create_stream_in(self.pg7, self.pg8)
2020 self.pg7.add_stream(pkts)
2021 self.pg_enable_capture(self.pg_interfaces)
2023 capture = self.pg8.get_capture(len(pkts))
2024 self.verify_capture_out(capture)
2026 def test_static_unknown_proto(self):
2027 """ 1:1 NAT translate packet with unknown protocol """
2028 nat_ip = "10.0.0.10"
2029 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2030 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2031 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2035 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2036 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2038 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2039 TCP(sport=1234, dport=1234))
2040 self.pg0.add_stream(p)
2041 self.pg_enable_capture(self.pg_interfaces)
2043 p = self.pg1.get_capture(1)
2046 self.assertEqual(packet[IP].src, nat_ip)
2047 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2048 self.assertTrue(packet.haslayer(GRE))
2049 self.check_ip_checksum(packet)
2051 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2055 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2056 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2058 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2059 TCP(sport=1234, dport=1234))
2060 self.pg1.add_stream(p)
2061 self.pg_enable_capture(self.pg_interfaces)
2063 p = self.pg0.get_capture(1)
2066 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2067 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2068 self.assertTrue(packet.haslayer(GRE))
2069 self.check_ip_checksum(packet)
2071 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2074 def test_hairpinning_static_unknown_proto(self):
2075 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2077 host = self.pg0.remote_hosts[0]
2078 server = self.pg0.remote_hosts[1]
2080 host_nat_ip = "10.0.0.10"
2081 server_nat_ip = "10.0.0.11"
2083 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2084 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2085 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2086 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2090 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2091 IP(src=host.ip4, dst=server_nat_ip) /
2093 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2094 TCP(sport=1234, dport=1234))
2095 self.pg0.add_stream(p)
2096 self.pg_enable_capture(self.pg_interfaces)
2098 p = self.pg0.get_capture(1)
2101 self.assertEqual(packet[IP].src, host_nat_ip)
2102 self.assertEqual(packet[IP].dst, server.ip4)
2103 self.assertTrue(packet.haslayer(GRE))
2104 self.check_ip_checksum(packet)
2106 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2110 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2111 IP(src=server.ip4, dst=host_nat_ip) /
2113 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2114 TCP(sport=1234, dport=1234))
2115 self.pg0.add_stream(p)
2116 self.pg_enable_capture(self.pg_interfaces)
2118 p = self.pg0.get_capture(1)
2121 self.assertEqual(packet[IP].src, server_nat_ip)
2122 self.assertEqual(packet[IP].dst, host.ip4)
2123 self.assertTrue(packet.haslayer(GRE))
2124 self.check_ip_checksum(packet)
2126 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2129 def test_unknown_proto(self):
2130 """ NAT44 translate packet with unknown protocol """
2131 self.nat44_add_address(self.nat_addr)
2132 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2133 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2137 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2138 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2139 TCP(sport=self.tcp_port_in, dport=20))
2140 self.pg0.add_stream(p)
2141 self.pg_enable_capture(self.pg_interfaces)
2143 p = self.pg1.get_capture(1)
2145 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2146 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2148 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2149 TCP(sport=1234, dport=1234))
2150 self.pg0.add_stream(p)
2151 self.pg_enable_capture(self.pg_interfaces)
2153 p = self.pg1.get_capture(1)
2156 self.assertEqual(packet[IP].src, self.nat_addr)
2157 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2158 self.assertTrue(packet.haslayer(GRE))
2159 self.check_ip_checksum(packet)
2161 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2165 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2166 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2168 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2169 TCP(sport=1234, dport=1234))
2170 self.pg1.add_stream(p)
2171 self.pg_enable_capture(self.pg_interfaces)
2173 p = self.pg0.get_capture(1)
2176 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2177 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2178 self.assertTrue(packet.haslayer(GRE))
2179 self.check_ip_checksum(packet)
2181 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2184 def test_hairpinning_unknown_proto(self):
2185 """ NAT44 translate packet with unknown protocol - hairpinning """
2186 host = self.pg0.remote_hosts[0]
2187 server = self.pg0.remote_hosts[1]
2190 server_in_port = 5678
2191 server_out_port = 8765
2192 server_nat_ip = "10.0.0.11"
2194 self.nat44_add_address(self.nat_addr)
2195 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2196 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2199 # add static mapping for server
2200 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2203 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2204 IP(src=host.ip4, dst=server_nat_ip) /
2205 TCP(sport=host_in_port, dport=server_out_port))
2206 self.pg0.add_stream(p)
2207 self.pg_enable_capture(self.pg_interfaces)
2209 capture = self.pg0.get_capture(1)
2211 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2212 IP(src=host.ip4, dst=server_nat_ip) /
2214 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2215 TCP(sport=1234, dport=1234))
2216 self.pg0.add_stream(p)
2217 self.pg_enable_capture(self.pg_interfaces)
2219 p = self.pg0.get_capture(1)
2222 self.assertEqual(packet[IP].src, self.nat_addr)
2223 self.assertEqual(packet[IP].dst, server.ip4)
2224 self.assertTrue(packet.haslayer(GRE))
2225 self.check_ip_checksum(packet)
2227 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2231 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2232 IP(src=server.ip4, dst=self.nat_addr) /
2234 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2235 TCP(sport=1234, dport=1234))
2236 self.pg0.add_stream(p)
2237 self.pg_enable_capture(self.pg_interfaces)
2239 p = self.pg0.get_capture(1)
2242 self.assertEqual(packet[IP].src, server_nat_ip)
2243 self.assertEqual(packet[IP].dst, host.ip4)
2244 self.assertTrue(packet.haslayer(GRE))
2245 self.check_ip_checksum(packet)
2247 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2250 def test_output_feature(self):
2251 """ NAT44 interface output feature (in2out postrouting) """
2252 self.nat44_add_address(self.nat_addr)
2253 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2254 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2255 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2259 pkts = self.create_stream_in(self.pg0, self.pg3)
2260 self.pg0.add_stream(pkts)
2261 self.pg_enable_capture(self.pg_interfaces)
2263 capture = self.pg3.get_capture(len(pkts))
2264 self.verify_capture_out(capture)
2267 pkts = self.create_stream_out(self.pg3)
2268 self.pg3.add_stream(pkts)
2269 self.pg_enable_capture(self.pg_interfaces)
2271 capture = self.pg0.get_capture(len(pkts))
2272 self.verify_capture_in(capture, self.pg0)
2274 # from non-NAT interface to NAT inside interface
2275 pkts = self.create_stream_in(self.pg2, self.pg0)
2276 self.pg2.add_stream(pkts)
2277 self.pg_enable_capture(self.pg_interfaces)
2279 capture = self.pg0.get_capture(len(pkts))
2280 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2282 def test_output_feature_vrf_aware(self):
2283 """ NAT44 interface output feature VRF aware (in2out postrouting) """
2284 nat_ip_vrf10 = "10.0.0.10"
2285 nat_ip_vrf20 = "10.0.0.20"
2287 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2288 dst_address_length=32,
2289 next_hop_address=self.pg3.remote_ip4n,
2290 next_hop_sw_if_index=self.pg3.sw_if_index,
2292 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2293 dst_address_length=32,
2294 next_hop_address=self.pg3.remote_ip4n,
2295 next_hop_sw_if_index=self.pg3.sw_if_index,
2298 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2299 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2300 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2301 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2302 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2306 pkts = self.create_stream_in(self.pg4, self.pg3)
2307 self.pg4.add_stream(pkts)
2308 self.pg_enable_capture(self.pg_interfaces)
2310 capture = self.pg3.get_capture(len(pkts))
2311 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2314 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2315 self.pg3.add_stream(pkts)
2316 self.pg_enable_capture(self.pg_interfaces)
2318 capture = self.pg4.get_capture(len(pkts))
2319 self.verify_capture_in(capture, self.pg4)
2322 pkts = self.create_stream_in(self.pg6, self.pg3)
2323 self.pg6.add_stream(pkts)
2324 self.pg_enable_capture(self.pg_interfaces)
2326 capture = self.pg3.get_capture(len(pkts))
2327 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2330 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2331 self.pg3.add_stream(pkts)
2332 self.pg_enable_capture(self.pg_interfaces)
2334 capture = self.pg6.get_capture(len(pkts))
2335 self.verify_capture_in(capture, self.pg6)
2337 def test_output_feature_hairpinning(self):
2338 """ NAT44 interface output feature hairpinning (in2out postrouting) """
2339 host = self.pg0.remote_hosts[0]
2340 server = self.pg0.remote_hosts[1]
2343 server_in_port = 5678
2344 server_out_port = 8765
2346 self.nat44_add_address(self.nat_addr)
2347 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2348 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2351 # add static mapping for server
2352 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2353 server_in_port, server_out_port,
2354 proto=IP_PROTOS.tcp)
2356 # send packet from host to server
2357 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2358 IP(src=host.ip4, dst=self.nat_addr) /
2359 TCP(sport=host_in_port, dport=server_out_port))
2360 self.pg0.add_stream(p)
2361 self.pg_enable_capture(self.pg_interfaces)
2363 capture = self.pg0.get_capture(1)
2368 self.assertEqual(ip.src, self.nat_addr)
2369 self.assertEqual(ip.dst, server.ip4)
2370 self.assertNotEqual(tcp.sport, host_in_port)
2371 self.assertEqual(tcp.dport, server_in_port)
2372 self.check_tcp_checksum(p)
2373 host_out_port = tcp.sport
2375 self.logger.error(ppp("Unexpected or invalid packet:", p))
2378 # send reply from server to host
2379 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2380 IP(src=server.ip4, dst=self.nat_addr) /
2381 TCP(sport=server_in_port, dport=host_out_port))
2382 self.pg0.add_stream(p)
2383 self.pg_enable_capture(self.pg_interfaces)
2385 capture = self.pg0.get_capture(1)
2390 self.assertEqual(ip.src, self.nat_addr)
2391 self.assertEqual(ip.dst, host.ip4)
2392 self.assertEqual(tcp.sport, server_out_port)
2393 self.assertEqual(tcp.dport, host_in_port)
2394 self.check_tcp_checksum(p)
2396 self.logger.error(ppp("Unexpected or invalid packet:"), p)
2399 def test_one_armed_nat44(self):
2400 """ One armed NAT44 """
2401 remote_host = self.pg9.remote_hosts[0]
2402 local_host = self.pg9.remote_hosts[1]
2405 self.nat44_add_address(self.nat_addr)
2406 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2407 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2411 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2412 IP(src=local_host.ip4, dst=remote_host.ip4) /
2413 TCP(sport=12345, dport=80))
2414 self.pg9.add_stream(p)
2415 self.pg_enable_capture(self.pg_interfaces)
2417 capture = self.pg9.get_capture(1)
2422 self.assertEqual(ip.src, self.nat_addr)
2423 self.assertEqual(ip.dst, remote_host.ip4)
2424 self.assertNotEqual(tcp.sport, 12345)
2425 external_port = tcp.sport
2426 self.assertEqual(tcp.dport, 80)
2427 self.check_tcp_checksum(p)
2428 self.check_ip_checksum(p)
2430 self.logger.error(ppp("Unexpected or invalid packet:", p))
2434 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2435 IP(src=remote_host.ip4, dst=self.nat_addr) /
2436 TCP(sport=80, dport=external_port))
2437 self.pg9.add_stream(p)
2438 self.pg_enable_capture(self.pg_interfaces)
2440 capture = self.pg9.get_capture(1)
2445 self.assertEqual(ip.src, remote_host.ip4)
2446 self.assertEqual(ip.dst, local_host.ip4)
2447 self.assertEqual(tcp.sport, 80)
2448 self.assertEqual(tcp.dport, 12345)
2449 self.check_tcp_checksum(p)
2450 self.check_ip_checksum(p)
2452 self.logger.error(ppp("Unexpected or invalid packet:", p))
2456 super(TestNAT44, self).tearDown()
2457 if not self.vpp_dead:
2458 self.logger.info(self.vapi.cli("show nat44 verbose"))
2462 class TestDeterministicNAT(MethodHolder):
2463 """ Deterministic NAT Test Cases """
2466 def setUpConstants(cls):
2467 super(TestDeterministicNAT, cls).setUpConstants()
2468 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
2471 def setUpClass(cls):
2472 super(TestDeterministicNAT, cls).setUpClass()
2475 cls.tcp_port_in = 6303
2476 cls.tcp_external_port = 6303
2477 cls.udp_port_in = 6304
2478 cls.udp_external_port = 6304
2479 cls.icmp_id_in = 6305
2480 cls.nat_addr = '10.0.0.3'
2482 cls.create_pg_interfaces(range(3))
2483 cls.interfaces = list(cls.pg_interfaces)
2485 for i in cls.interfaces:
2490 cls.pg0.generate_remote_hosts(2)
2491 cls.pg0.configure_ipv4_neighbors()
2494 super(TestDeterministicNAT, cls).tearDownClass()
2497 def create_stream_in(self, in_if, out_if, ttl=64):
2499 Create packet stream for inside network
2501 :param in_if: Inside interface
2502 :param out_if: Outside interface
2503 :param ttl: TTL of generated packets
2507 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2508 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2509 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
2513 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2514 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2515 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2519 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2520 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2521 ICMP(id=self.icmp_id_in, type='echo-request'))
2526 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2528 Create packet stream for outside network
2530 :param out_if: Outside interface
2531 :param dst_ip: Destination IP address (Default use global NAT address)
2532 :param ttl: TTL of generated packets
2535 dst_ip = self.nat_addr
2538 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2539 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2540 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2544 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2545 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2546 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2550 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2551 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2552 ICMP(id=self.icmp_external_id, type='echo-reply'))
2557 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2559 Verify captured packets on outside network
2561 :param capture: Captured packets
2562 :param nat_ip: Translated IP address (Default use global NAT address)
2563 :param same_port: Sorce port number is not translated (Default False)
2564 :param packet_num: Expected number of packets (Default 3)
2567 nat_ip = self.nat_addr
2568 self.assertEqual(packet_num, len(capture))
2569 for packet in capture:
2571 self.assertEqual(packet[IP].src, nat_ip)
2572 if packet.haslayer(TCP):
2573 self.tcp_port_out = packet[TCP].sport
2574 elif packet.haslayer(UDP):
2575 self.udp_port_out = packet[UDP].sport
2577 self.icmp_external_id = packet[ICMP].id
2579 self.logger.error(ppp("Unexpected or invalid packet "
2580 "(outside network):", packet))
2583 def initiate_tcp_session(self, in_if, out_if):
2585 Initiates TCP session
2587 :param in_if: Inside interface
2588 :param out_if: Outside interface
2591 # SYN packet in->out
2592 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2593 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2594 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2597 self.pg_enable_capture(self.pg_interfaces)
2599 capture = out_if.get_capture(1)
2601 self.tcp_port_out = p[TCP].sport
2603 # SYN + ACK packet out->in
2604 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2605 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
2606 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2608 out_if.add_stream(p)
2609 self.pg_enable_capture(self.pg_interfaces)
2611 in_if.get_capture(1)
2613 # ACK packet in->out
2614 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2615 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2616 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2619 self.pg_enable_capture(self.pg_interfaces)
2621 out_if.get_capture(1)
2624 self.logger.error("TCP 3 way handshake failed")
2627 def verify_ipfix_max_entries_per_user(self, data):
2629 Verify IPFIX maximum entries per user exceeded event
2631 :param data: Decoded IPFIX data records
2633 self.assertEqual(1, len(data))
2636 self.assertEqual(ord(record[230]), 13)
2637 # natQuotaExceededEvent
2638 self.assertEqual('\x03\x00\x00\x00', record[466])
2640 self.assertEqual(self.pg0.remote_ip4n, record[8])
2642 def test_deterministic_mode(self):
2643 """ NAT plugin run deterministic mode """
2644 in_addr = '172.16.255.0'
2645 out_addr = '172.17.255.50'
2646 in_addr_t = '172.16.255.20'
2647 in_addr_n = socket.inet_aton(in_addr)
2648 out_addr_n = socket.inet_aton(out_addr)
2649 in_addr_t_n = socket.inet_aton(in_addr_t)
2653 nat_config = self.vapi.nat_show_config()
2654 self.assertEqual(1, nat_config.deterministic)
2656 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
2658 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
2659 self.assertEqual(rep1.out_addr[:4], out_addr_n)
2660 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
2661 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2663 deterministic_mappings = self.vapi.nat_det_map_dump()
2664 self.assertEqual(len(deterministic_mappings), 1)
2665 dsm = deterministic_mappings[0]
2666 self.assertEqual(in_addr_n, dsm.in_addr[:4])
2667 self.assertEqual(in_plen, dsm.in_plen)
2668 self.assertEqual(out_addr_n, dsm.out_addr[:4])
2669 self.assertEqual(out_plen, dsm.out_plen)
2671 self.clear_nat_det()
2672 deterministic_mappings = self.vapi.nat_det_map_dump()
2673 self.assertEqual(len(deterministic_mappings), 0)
2675 def test_set_timeouts(self):
2676 """ Set deterministic NAT timeouts """
2677 timeouts_before = self.vapi.nat_det_get_timeouts()
2679 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
2680 timeouts_before.tcp_established + 10,
2681 timeouts_before.tcp_transitory + 10,
2682 timeouts_before.icmp + 10)
2684 timeouts_after = self.vapi.nat_det_get_timeouts()
2686 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
2687 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
2688 self.assertNotEqual(timeouts_before.tcp_established,
2689 timeouts_after.tcp_established)
2690 self.assertNotEqual(timeouts_before.tcp_transitory,
2691 timeouts_after.tcp_transitory)
2693 def test_det_in(self):
2694 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
2696 nat_ip = "10.0.0.10"
2698 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2700 socket.inet_aton(nat_ip),
2702 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2703 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2707 pkts = self.create_stream_in(self.pg0, self.pg1)
2708 self.pg0.add_stream(pkts)
2709 self.pg_enable_capture(self.pg_interfaces)
2711 capture = self.pg1.get_capture(len(pkts))
2712 self.verify_capture_out(capture, nat_ip)
2715 pkts = self.create_stream_out(self.pg1, nat_ip)
2716 self.pg1.add_stream(pkts)
2717 self.pg_enable_capture(self.pg_interfaces)
2719 capture = self.pg0.get_capture(len(pkts))
2720 self.verify_capture_in(capture, self.pg0)
2723 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
2724 self.assertEqual(len(sessions), 3)
2728 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2729 self.assertEqual(s.in_port, self.tcp_port_in)
2730 self.assertEqual(s.out_port, self.tcp_port_out)
2731 self.assertEqual(s.ext_port, self.tcp_external_port)
2735 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2736 self.assertEqual(s.in_port, self.udp_port_in)
2737 self.assertEqual(s.out_port, self.udp_port_out)
2738 self.assertEqual(s.ext_port, self.udp_external_port)
2742 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2743 self.assertEqual(s.in_port, self.icmp_id_in)
2744 self.assertEqual(s.out_port, self.icmp_external_id)
2746 def test_multiple_users(self):
2747 """ Deterministic NAT multiple users """
2749 nat_ip = "10.0.0.10"
2751 external_port = 6303
2753 host0 = self.pg0.remote_hosts[0]
2754 host1 = self.pg0.remote_hosts[1]
2756 self.vapi.nat_det_add_del_map(host0.ip4n,
2758 socket.inet_aton(nat_ip),
2760 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2761 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2765 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2766 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2767 TCP(sport=port_in, dport=external_port))
2768 self.pg0.add_stream(p)
2769 self.pg_enable_capture(self.pg_interfaces)
2771 capture = self.pg1.get_capture(1)
2776 self.assertEqual(ip.src, nat_ip)
2777 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2778 self.assertEqual(tcp.dport, external_port)
2779 port_out0 = tcp.sport
2781 self.logger.error(ppp("Unexpected or invalid packet:", p))
2785 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2786 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2787 TCP(sport=port_in, dport=external_port))
2788 self.pg0.add_stream(p)
2789 self.pg_enable_capture(self.pg_interfaces)
2791 capture = self.pg1.get_capture(1)
2796 self.assertEqual(ip.src, nat_ip)
2797 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2798 self.assertEqual(tcp.dport, external_port)
2799 port_out1 = tcp.sport
2801 self.logger.error(ppp("Unexpected or invalid packet:", p))
2804 dms = self.vapi.nat_det_map_dump()
2805 self.assertEqual(1, len(dms))
2806 self.assertEqual(2, dms[0].ses_num)
2809 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2810 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2811 TCP(sport=external_port, dport=port_out0))
2812 self.pg1.add_stream(p)
2813 self.pg_enable_capture(self.pg_interfaces)
2815 capture = self.pg0.get_capture(1)
2820 self.assertEqual(ip.src, self.pg1.remote_ip4)
2821 self.assertEqual(ip.dst, host0.ip4)
2822 self.assertEqual(tcp.dport, port_in)
2823 self.assertEqual(tcp.sport, external_port)
2825 self.logger.error(ppp("Unexpected or invalid packet:", p))
2829 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2830 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2831 TCP(sport=external_port, dport=port_out1))
2832 self.pg1.add_stream(p)
2833 self.pg_enable_capture(self.pg_interfaces)
2835 capture = self.pg0.get_capture(1)
2840 self.assertEqual(ip.src, self.pg1.remote_ip4)
2841 self.assertEqual(ip.dst, host1.ip4)
2842 self.assertEqual(tcp.dport, port_in)
2843 self.assertEqual(tcp.sport, external_port)
2845 self.logger.error(ppp("Unexpected or invalid packet", p))
2848 # session close api test
2849 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
2851 self.pg1.remote_ip4n,
2853 dms = self.vapi.nat_det_map_dump()
2854 self.assertEqual(dms[0].ses_num, 1)
2856 self.vapi.nat_det_close_session_in(host0.ip4n,
2858 self.pg1.remote_ip4n,
2860 dms = self.vapi.nat_det_map_dump()
2861 self.assertEqual(dms[0].ses_num, 0)
2863 def test_tcp_session_close_detection_in(self):
2864 """ Deterministic NAT TCP session close from inside network """
2865 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2867 socket.inet_aton(self.nat_addr),
2869 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2870 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2873 self.initiate_tcp_session(self.pg0, self.pg1)
2875 # close the session from inside
2877 # FIN packet in -> out
2878 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2879 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2880 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2882 self.pg0.add_stream(p)
2883 self.pg_enable_capture(self.pg_interfaces)
2885 self.pg1.get_capture(1)
2889 # ACK packet out -> in
2890 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2891 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2892 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2896 # FIN packet out -> in
2897 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2898 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2899 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2903 self.pg1.add_stream(pkts)
2904 self.pg_enable_capture(self.pg_interfaces)
2906 self.pg0.get_capture(2)
2908 # ACK packet in -> out
2909 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2910 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2911 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2913 self.pg0.add_stream(p)
2914 self.pg_enable_capture(self.pg_interfaces)
2916 self.pg1.get_capture(1)
2918 # Check if deterministic NAT44 closed the session
2919 dms = self.vapi.nat_det_map_dump()
2920 self.assertEqual(0, dms[0].ses_num)
2922 self.logger.error("TCP session termination failed")
2925 def test_tcp_session_close_detection_out(self):
2926 """ Deterministic NAT TCP session close from outside network """
2927 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2929 socket.inet_aton(self.nat_addr),
2931 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2932 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2935 self.initiate_tcp_session(self.pg0, self.pg1)
2937 # close the session from outside
2939 # FIN packet out -> in
2940 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2941 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2942 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2944 self.pg1.add_stream(p)
2945 self.pg_enable_capture(self.pg_interfaces)
2947 self.pg0.get_capture(1)
2951 # ACK packet in -> out
2952 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2953 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2954 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2958 # ACK packet in -> out
2959 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2960 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2961 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2965 self.pg0.add_stream(pkts)
2966 self.pg_enable_capture(self.pg_interfaces)
2968 self.pg1.get_capture(2)
2970 # ACK packet out -> in
2971 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2972 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2973 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2975 self.pg1.add_stream(p)
2976 self.pg_enable_capture(self.pg_interfaces)
2978 self.pg0.get_capture(1)
2980 # Check if deterministic NAT44 closed the session
2981 dms = self.vapi.nat_det_map_dump()
2982 self.assertEqual(0, dms[0].ses_num)
2984 self.logger.error("TCP session termination failed")
2987 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2988 def test_session_timeout(self):
2989 """ Deterministic NAT session timeouts """
2990 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2992 socket.inet_aton(self.nat_addr),
2994 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2995 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2998 self.initiate_tcp_session(self.pg0, self.pg1)
2999 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
3000 pkts = self.create_stream_in(self.pg0, self.pg1)
3001 self.pg0.add_stream(pkts)
3002 self.pg_enable_capture(self.pg_interfaces)
3004 capture = self.pg1.get_capture(len(pkts))
3007 dms = self.vapi.nat_det_map_dump()
3008 self.assertEqual(0, dms[0].ses_num)
3010 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3011 def test_session_limit_per_user(self):
3012 """ Deterministic NAT maximum sessions per user limit """
3013 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3015 socket.inet_aton(self.nat_addr),
3017 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3018 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3020 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
3021 src_address=self.pg2.local_ip4n,
3023 template_interval=10)
3024 self.vapi.nat_ipfix()
3027 for port in range(1025, 2025):
3028 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3029 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3030 UDP(sport=port, dport=port))
3033 self.pg0.add_stream(pkts)
3034 self.pg_enable_capture(self.pg_interfaces)
3036 capture = self.pg1.get_capture(len(pkts))
3038 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3039 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3040 UDP(sport=3001, dport=3002))
3041 self.pg0.add_stream(p)
3042 self.pg_enable_capture(self.pg_interfaces)
3044 capture = self.pg1.assert_nothing_captured()
3046 # verify ICMP error packet
3047 capture = self.pg0.get_capture(1)
3049 self.assertTrue(p.haslayer(ICMP))
3051 self.assertEqual(icmp.type, 3)
3052 self.assertEqual(icmp.code, 1)
3053 self.assertTrue(icmp.haslayer(IPerror))
3054 inner_ip = icmp[IPerror]
3055 self.assertEqual(inner_ip[UDPerror].sport, 3001)
3056 self.assertEqual(inner_ip[UDPerror].dport, 3002)
3058 dms = self.vapi.nat_det_map_dump()
3060 self.assertEqual(1000, dms[0].ses_num)
3062 # verify IPFIX logging
3063 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3065 capture = self.pg2.get_capture(2)
3066 ipfix = IPFIXDecoder()
3067 # first load template
3069 self.assertTrue(p.haslayer(IPFIX))
3070 if p.haslayer(Template):
3071 ipfix.add_template(p.getlayer(Template))
3072 # verify events in data set
3074 if p.haslayer(Data):
3075 data = ipfix.decode_data_set(p.getlayer(Set))
3076 self.verify_ipfix_max_entries_per_user(data)
3078 def clear_nat_det(self):
3080 Clear deterministic NAT configuration.
3082 self.vapi.nat_ipfix(enable=0)
3083 self.vapi.nat_det_set_timeouts()
3084 deterministic_mappings = self.vapi.nat_det_map_dump()
3085 for dsm in deterministic_mappings:
3086 self.vapi.nat_det_add_del_map(dsm.in_addr,
3092 interfaces = self.vapi.nat44_interface_dump()
3093 for intf in interfaces:
3094 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
3099 super(TestDeterministicNAT, self).tearDown()
3100 if not self.vpp_dead:
3101 self.logger.info(self.vapi.cli("show nat44 detail"))
3102 self.clear_nat_det()
3105 class TestNAT64(MethodHolder):
3106 """ NAT64 Test Cases """
3109 def setUpClass(cls):
3110 super(TestNAT64, cls).setUpClass()
3113 cls.tcp_port_in = 6303
3114 cls.tcp_port_out = 6303
3115 cls.udp_port_in = 6304
3116 cls.udp_port_out = 6304
3117 cls.icmp_id_in = 6305
3118 cls.icmp_id_out = 6305
3119 cls.nat_addr = '10.0.0.3'
3120 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3122 cls.vrf1_nat_addr = '10.0.10.3'
3123 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
3126 cls.create_pg_interfaces(range(4))
3127 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
3128 cls.ip6_interfaces.append(cls.pg_interfaces[2])
3129 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
3131 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
3133 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
3135 cls.pg0.generate_remote_hosts(2)
3137 for i in cls.ip6_interfaces:
3140 i.configure_ipv6_neighbors()
3142 for i in cls.ip4_interfaces:
3148 cls.pg3.config_ip4()
3149 cls.pg3.resolve_arp()
3150 cls.pg3.config_ip6()
3151 cls.pg3.configure_ipv6_neighbors()
3154 super(TestNAT64, cls).tearDownClass()
3157 def test_pool(self):
3158 """ Add/delete address to NAT64 pool """
3159 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
3161 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
3163 addresses = self.vapi.nat64_pool_addr_dump()
3164 self.assertEqual(len(addresses), 1)
3165 self.assertEqual(addresses[0].address, nat_addr)
3167 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
3169 addresses = self.vapi.nat64_pool_addr_dump()
3170 self.assertEqual(len(addresses), 0)
3172 def test_interface(self):
3173 """ Enable/disable NAT64 feature on the interface """
3174 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3175 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3177 interfaces = self.vapi.nat64_interface_dump()
3178 self.assertEqual(len(interfaces), 2)
3181 for intf in interfaces:
3182 if intf.sw_if_index == self.pg0.sw_if_index:
3183 self.assertEqual(intf.is_inside, 1)
3185 elif intf.sw_if_index == self.pg1.sw_if_index:
3186 self.assertEqual(intf.is_inside, 0)
3188 self.assertTrue(pg0_found)
3189 self.assertTrue(pg1_found)
3191 features = self.vapi.cli("show interface features pg0")
3192 self.assertNotEqual(features.find('nat64-in2out'), -1)
3193 features = self.vapi.cli("show interface features pg1")
3194 self.assertNotEqual(features.find('nat64-out2in'), -1)
3196 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
3197 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
3199 interfaces = self.vapi.nat64_interface_dump()
3200 self.assertEqual(len(interfaces), 0)
3202 def test_static_bib(self):
3203 """ Add/delete static BIB entry """
3204 in_addr = socket.inet_pton(socket.AF_INET6,
3205 '2001:db8:85a3::8a2e:370:7334')
3206 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
3209 proto = IP_PROTOS.tcp
3211 self.vapi.nat64_add_del_static_bib(in_addr,
3216 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3221 self.assertEqual(bibe.i_addr, in_addr)
3222 self.assertEqual(bibe.o_addr, out_addr)
3223 self.assertEqual(bibe.i_port, in_port)
3224 self.assertEqual(bibe.o_port, out_port)
3225 self.assertEqual(static_bib_num, 1)
3227 self.vapi.nat64_add_del_static_bib(in_addr,
3233 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3238 self.assertEqual(static_bib_num, 0)
3240 def test_set_timeouts(self):
3241 """ Set NAT64 timeouts """
3242 # verify default values
3243 timeouts = self.vapi.nat64_get_timeouts()
3244 self.assertEqual(timeouts.udp, 300)
3245 self.assertEqual(timeouts.icmp, 60)
3246 self.assertEqual(timeouts.tcp_trans, 240)
3247 self.assertEqual(timeouts.tcp_est, 7440)
3248 self.assertEqual(timeouts.tcp_incoming_syn, 6)
3250 # set and verify custom values
3251 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
3252 tcp_est=7450, tcp_incoming_syn=10)
3253 timeouts = self.vapi.nat64_get_timeouts()
3254 self.assertEqual(timeouts.udp, 200)
3255 self.assertEqual(timeouts.icmp, 30)
3256 self.assertEqual(timeouts.tcp_trans, 250)
3257 self.assertEqual(timeouts.tcp_est, 7450)
3258 self.assertEqual(timeouts.tcp_incoming_syn, 10)
3260 def test_dynamic(self):
3261 """ NAT64 dynamic translation test """
3262 self.tcp_port_in = 6303
3263 self.udp_port_in = 6304
3264 self.icmp_id_in = 6305
3266 ses_num_start = self.nat64_get_ses_num()
3268 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3270 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3271 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3274 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3275 self.pg0.add_stream(pkts)
3276 self.pg_enable_capture(self.pg_interfaces)
3278 capture = self.pg1.get_capture(len(pkts))
3279 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3280 dst_ip=self.pg1.remote_ip4)
3283 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3284 self.pg1.add_stream(pkts)
3285 self.pg_enable_capture(self.pg_interfaces)
3287 capture = self.pg0.get_capture(len(pkts))
3288 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3289 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3292 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3293 self.pg0.add_stream(pkts)
3294 self.pg_enable_capture(self.pg_interfaces)
3296 capture = self.pg1.get_capture(len(pkts))
3297 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3298 dst_ip=self.pg1.remote_ip4)
3301 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3302 self.pg1.add_stream(pkts)
3303 self.pg_enable_capture(self.pg_interfaces)
3305 capture = self.pg0.get_capture(len(pkts))
3306 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3308 ses_num_end = self.nat64_get_ses_num()
3310 self.assertEqual(ses_num_end - ses_num_start, 3)
3312 # tenant with specific VRF
3313 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3314 self.vrf1_nat_addr_n,
3315 vrf_id=self.vrf1_id)
3316 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3318 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
3319 self.pg2.add_stream(pkts)
3320 self.pg_enable_capture(self.pg_interfaces)
3322 capture = self.pg1.get_capture(len(pkts))
3323 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3324 dst_ip=self.pg1.remote_ip4)
3326 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3327 self.pg1.add_stream(pkts)
3328 self.pg_enable_capture(self.pg_interfaces)
3330 capture = self.pg2.get_capture(len(pkts))
3331 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
3333 def test_static(self):
3334 """ NAT64 static translation test """
3335 self.tcp_port_in = 60303
3336 self.udp_port_in = 60304
3337 self.icmp_id_in = 60305
3338 self.tcp_port_out = 60303
3339 self.udp_port_out = 60304
3340 self.icmp_id_out = 60305
3342 ses_num_start = self.nat64_get_ses_num()
3344 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3346 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3347 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3349 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3354 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3359 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3366 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3367 self.pg0.add_stream(pkts)
3368 self.pg_enable_capture(self.pg_interfaces)
3370 capture = self.pg1.get_capture(len(pkts))
3371 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3372 dst_ip=self.pg1.remote_ip4, same_port=True)
3375 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3376 self.pg1.add_stream(pkts)
3377 self.pg_enable_capture(self.pg_interfaces)
3379 capture = self.pg0.get_capture(len(pkts))
3380 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3381 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3383 ses_num_end = self.nat64_get_ses_num()
3385 self.assertEqual(ses_num_end - ses_num_start, 3)
3387 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3388 def test_session_timeout(self):
3389 """ NAT64 session timeout """
3390 self.icmp_id_in = 1234
3391 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3393 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3394 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3395 self.vapi.nat64_set_timeouts(icmp=5)
3397 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3398 self.pg0.add_stream(pkts)
3399 self.pg_enable_capture(self.pg_interfaces)
3401 capture = self.pg1.get_capture(len(pkts))
3403 ses_num_before_timeout = self.nat64_get_ses_num()
3407 # ICMP session after timeout
3408 ses_num_after_timeout = self.nat64_get_ses_num()
3409 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
3411 def test_icmp_error(self):
3412 """ NAT64 ICMP Error message translation """
3413 self.tcp_port_in = 6303
3414 self.udp_port_in = 6304
3415 self.icmp_id_in = 6305
3417 ses_num_start = self.nat64_get_ses_num()
3419 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3421 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3422 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3424 # send some packets to create sessions
3425 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3426 self.pg0.add_stream(pkts)
3427 self.pg_enable_capture(self.pg_interfaces)
3429 capture_ip4 = self.pg1.get_capture(len(pkts))
3430 self.verify_capture_out(capture_ip4,
3431 nat_ip=self.nat_addr,
3432 dst_ip=self.pg1.remote_ip4)
3434 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3435 self.pg1.add_stream(pkts)
3436 self.pg_enable_capture(self.pg_interfaces)
3438 capture_ip6 = self.pg0.get_capture(len(pkts))
3439 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3440 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
3441 self.pg0.remote_ip6)
3444 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3445 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
3446 ICMPv6DestUnreach(code=1) /
3447 packet[IPv6] for packet in capture_ip6]
3448 self.pg0.add_stream(pkts)
3449 self.pg_enable_capture(self.pg_interfaces)
3451 capture = self.pg1.get_capture(len(pkts))
3452 for packet in capture:
3454 self.assertEqual(packet[IP].src, self.nat_addr)
3455 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3456 self.assertEqual(packet[ICMP].type, 3)
3457 self.assertEqual(packet[ICMP].code, 13)
3458 inner = packet[IPerror]
3459 self.assertEqual(inner.src, self.pg1.remote_ip4)
3460 self.assertEqual(inner.dst, self.nat_addr)
3461 self.check_icmp_checksum(packet)
3462 if inner.haslayer(TCPerror):
3463 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
3464 elif inner.haslayer(UDPerror):
3465 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
3467 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
3469 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3473 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3474 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3475 ICMP(type=3, code=13) /
3476 packet[IP] for packet in capture_ip4]
3477 self.pg1.add_stream(pkts)
3478 self.pg_enable_capture(self.pg_interfaces)
3480 capture = self.pg0.get_capture(len(pkts))
3481 for packet in capture:
3483 self.assertEqual(packet[IPv6].src, ip.src)
3484 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3485 icmp = packet[ICMPv6DestUnreach]
3486 self.assertEqual(icmp.code, 1)
3487 inner = icmp[IPerror6]
3488 self.assertEqual(inner.src, self.pg0.remote_ip6)
3489 self.assertEqual(inner.dst, ip.src)
3490 self.check_icmpv6_checksum(packet)
3491 if inner.haslayer(TCPerror):
3492 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
3493 elif inner.haslayer(UDPerror):
3494 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
3496 self.assertEqual(inner[ICMPv6EchoRequest].id,
3499 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3502 def test_hairpinning(self):
3503 """ NAT64 hairpinning """
3505 client = self.pg0.remote_hosts[0]
3506 server = self.pg0.remote_hosts[1]
3507 server_tcp_in_port = 22
3508 server_tcp_out_port = 4022
3509 server_udp_in_port = 23
3510 server_udp_out_port = 4023
3511 client_tcp_in_port = 1234
3512 client_udp_in_port = 1235
3513 client_tcp_out_port = 0
3514 client_udp_out_port = 0
3515 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
3516 nat_addr_ip6 = ip.src
3518 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3520 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3521 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3523 self.vapi.nat64_add_del_static_bib(server.ip6n,
3526 server_tcp_out_port,
3528 self.vapi.nat64_add_del_static_bib(server.ip6n,
3531 server_udp_out_port,
3536 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3537 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3538 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3540 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3541 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3542 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3544 self.pg0.add_stream(pkts)
3545 self.pg_enable_capture(self.pg_interfaces)
3547 capture = self.pg0.get_capture(len(pkts))
3548 for packet in capture:
3550 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3551 self.assertEqual(packet[IPv6].dst, server.ip6)
3552 if packet.haslayer(TCP):
3553 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3554 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3555 self.check_tcp_checksum(packet)
3556 client_tcp_out_port = packet[TCP].sport
3558 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3559 self.assertEqual(packet[UDP].dport, server_udp_in_port)
3560 self.check_udp_checksum(packet)
3561 client_udp_out_port = packet[UDP].sport
3563 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3568 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3569 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3570 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3572 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3573 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3574 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3576 self.pg0.add_stream(pkts)
3577 self.pg_enable_capture(self.pg_interfaces)
3579 capture = self.pg0.get_capture(len(pkts))
3580 for packet in capture:
3582 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3583 self.assertEqual(packet[IPv6].dst, client.ip6)
3584 if packet.haslayer(TCP):
3585 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3586 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3587 self.check_tcp_checksum(packet)
3589 self.assertEqual(packet[UDP].sport, server_udp_out_port)
3590 self.assertEqual(packet[UDP].dport, client_udp_in_port)
3591 self.check_udp_checksum(packet)
3593 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3598 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3599 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3600 ICMPv6DestUnreach(code=1) /
3601 packet[IPv6] for packet in capture]
3602 self.pg0.add_stream(pkts)
3603 self.pg_enable_capture(self.pg_interfaces)
3605 capture = self.pg0.get_capture(len(pkts))
3606 for packet in capture:
3608 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3609 self.assertEqual(packet[IPv6].dst, server.ip6)
3610 icmp = packet[ICMPv6DestUnreach]
3611 self.assertEqual(icmp.code, 1)
3612 inner = icmp[IPerror6]
3613 self.assertEqual(inner.src, server.ip6)
3614 self.assertEqual(inner.dst, nat_addr_ip6)
3615 self.check_icmpv6_checksum(packet)
3616 if inner.haslayer(TCPerror):
3617 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3618 self.assertEqual(inner[TCPerror].dport,
3619 client_tcp_out_port)
3621 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3622 self.assertEqual(inner[UDPerror].dport,
3623 client_udp_out_port)
3625 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3628 def test_prefix(self):
3629 """ NAT64 Network-Specific Prefix """
3631 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3633 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3634 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3635 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3636 self.vrf1_nat_addr_n,
3637 vrf_id=self.vrf1_id)
3638 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3641 global_pref64 = "2001:db8::"
3642 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
3643 global_pref64_len = 32
3644 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
3646 prefix = self.vapi.nat64_prefix_dump()
3647 self.assertEqual(len(prefix), 1)
3648 self.assertEqual(prefix[0].prefix, global_pref64_n)
3649 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
3650 self.assertEqual(prefix[0].vrf_id, 0)
3652 # Add tenant specific prefix
3653 vrf1_pref64 = "2001:db8:122:300::"
3654 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
3655 vrf1_pref64_len = 56
3656 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
3658 vrf_id=self.vrf1_id)
3659 prefix = self.vapi.nat64_prefix_dump()
3660 self.assertEqual(len(prefix), 2)
3663 pkts = self.create_stream_in_ip6(self.pg0,
3666 plen=global_pref64_len)
3667 self.pg0.add_stream(pkts)
3668 self.pg_enable_capture(self.pg_interfaces)
3670 capture = self.pg1.get_capture(len(pkts))
3671 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3672 dst_ip=self.pg1.remote_ip4)
3674 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3675 self.pg1.add_stream(pkts)
3676 self.pg_enable_capture(self.pg_interfaces)
3678 capture = self.pg0.get_capture(len(pkts))
3679 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3682 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
3684 # Tenant specific prefix
3685 pkts = self.create_stream_in_ip6(self.pg2,
3688 plen=vrf1_pref64_len)
3689 self.pg2.add_stream(pkts)
3690 self.pg_enable_capture(self.pg_interfaces)
3692 capture = self.pg1.get_capture(len(pkts))
3693 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3694 dst_ip=self.pg1.remote_ip4)
3696 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3697 self.pg1.add_stream(pkts)
3698 self.pg_enable_capture(self.pg_interfaces)
3700 capture = self.pg2.get_capture(len(pkts))
3701 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3704 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
3706 def test_unknown_proto(self):
3707 """ NAT64 translate packet with unknown protocol """
3709 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3711 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3712 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3713 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
3716 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3717 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
3718 TCP(sport=self.tcp_port_in, dport=20))
3719 self.pg0.add_stream(p)
3720 self.pg_enable_capture(self.pg_interfaces)
3722 p = self.pg1.get_capture(1)
3724 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3725 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
3727 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3728 TCP(sport=1234, dport=1234))
3729 self.pg0.add_stream(p)
3730 self.pg_enable_capture(self.pg_interfaces)
3732 p = self.pg1.get_capture(1)
3735 self.assertEqual(packet[IP].src, self.nat_addr)
3736 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3737 self.assertTrue(packet.haslayer(GRE))
3738 self.check_ip_checksum(packet)
3740 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3744 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3745 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3747 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3748 TCP(sport=1234, dport=1234))
3749 self.pg1.add_stream(p)
3750 self.pg_enable_capture(self.pg_interfaces)
3752 p = self.pg0.get_capture(1)
3755 self.assertEqual(packet[IPv6].src, remote_ip6)
3756 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3757 self.assertEqual(packet[IPv6].nh, 47)
3759 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3762 def test_hairpinning_unknown_proto(self):
3763 """ NAT64 translate packet with unknown protocol - hairpinning """
3765 client = self.pg0.remote_hosts[0]
3766 server = self.pg0.remote_hosts[1]
3767 server_tcp_in_port = 22
3768 server_tcp_out_port = 4022
3769 client_tcp_in_port = 1234
3770 client_tcp_out_port = 1235
3771 server_nat_ip = "10.0.0.100"
3772 client_nat_ip = "10.0.0.110"
3773 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
3774 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
3775 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
3776 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
3778 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
3780 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3781 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3783 self.vapi.nat64_add_del_static_bib(server.ip6n,
3786 server_tcp_out_port,
3789 self.vapi.nat64_add_del_static_bib(server.ip6n,
3795 self.vapi.nat64_add_del_static_bib(client.ip6n,
3798 client_tcp_out_port,
3802 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3803 IPv6(src=client.ip6, dst=server_nat_ip6) /
3804 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3805 self.pg0.add_stream(p)
3806 self.pg_enable_capture(self.pg_interfaces)
3808 p = self.pg0.get_capture(1)
3810 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3811 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
3813 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3814 TCP(sport=1234, dport=1234))
3815 self.pg0.add_stream(p)
3816 self.pg_enable_capture(self.pg_interfaces)
3818 p = self.pg0.get_capture(1)
3821 self.assertEqual(packet[IPv6].src, client_nat_ip6)
3822 self.assertEqual(packet[IPv6].dst, server.ip6)
3823 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3825 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3829 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3830 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
3832 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3833 TCP(sport=1234, dport=1234))
3834 self.pg0.add_stream(p)
3835 self.pg_enable_capture(self.pg_interfaces)
3837 p = self.pg0.get_capture(1)
3840 self.assertEqual(packet[IPv6].src, server_nat_ip6)
3841 self.assertEqual(packet[IPv6].dst, client.ip6)
3842 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3844 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3847 def test_one_armed_nat64(self):
3848 """ One armed NAT64 """
3850 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
3854 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3856 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
3857 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
3860 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
3861 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
3862 TCP(sport=12345, dport=80))
3863 self.pg3.add_stream(p)
3864 self.pg_enable_capture(self.pg_interfaces)
3866 capture = self.pg3.get_capture(1)
3871 self.assertEqual(ip.src, self.nat_addr)
3872 self.assertEqual(ip.dst, self.pg3.remote_ip4)
3873 self.assertNotEqual(tcp.sport, 12345)
3874 external_port = tcp.sport
3875 self.assertEqual(tcp.dport, 80)
3876 self.check_tcp_checksum(p)
3877 self.check_ip_checksum(p)
3879 self.logger.error(ppp("Unexpected or invalid packet:", p))
3883 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
3884 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
3885 TCP(sport=80, dport=external_port))
3886 self.pg3.add_stream(p)
3887 self.pg_enable_capture(self.pg_interfaces)
3889 capture = self.pg3.get_capture(1)
3894 self.assertEqual(ip.src, remote_host_ip6)
3895 self.assertEqual(ip.dst, self.pg3.remote_ip6)
3896 self.assertEqual(tcp.sport, 80)
3897 self.assertEqual(tcp.dport, 12345)
3898 self.check_tcp_checksum(p)
3900 self.logger.error(ppp("Unexpected or invalid packet:", p))
3903 def nat64_get_ses_num(self):
3905 Return number of active NAT64 sessions.
3907 st = self.vapi.nat64_st_dump()
3910 def clear_nat64(self):
3912 Clear NAT64 configuration.
3914 self.vapi.nat64_set_timeouts()
3916 interfaces = self.vapi.nat64_interface_dump()
3917 for intf in interfaces:
3918 if intf.is_inside > 1:
3919 self.vapi.nat64_add_del_interface(intf.sw_if_index,
3922 self.vapi.nat64_add_del_interface(intf.sw_if_index,
3926 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3929 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3937 bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
3940 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3948 bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
3951 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3959 adresses = self.vapi.nat64_pool_addr_dump()
3960 for addr in adresses:
3961 self.vapi.nat64_add_del_pool_addr_range(addr.address,
3966 prefixes = self.vapi.nat64_prefix_dump()
3967 for prefix in prefixes:
3968 self.vapi.nat64_add_del_prefix(prefix.prefix,
3970 vrf_id=prefix.vrf_id,
3974 super(TestNAT64, self).tearDown()
3975 if not self.vpp_dead:
3976 self.logger.info(self.vapi.cli("show nat64 pool"))
3977 self.logger.info(self.vapi.cli("show nat64 interfaces"))
3978 self.logger.info(self.vapi.cli("show nat64 prefix"))
3979 self.logger.info(self.vapi.cli("show nat64 bib all"))
3980 self.logger.info(self.vapi.cli("show nat64 session table all"))
3983 if __name__ == '__main__':
3984 unittest.main(testRunner=VppTestRunner)