7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
12 from scapy.layers.l2 import Ether, ARP, GRE
13 from scapy.data import IP_PROTOS
14 from scapy.packet import bind_layers
16 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
17 from time import sleep
18 from util import ip4_range
21 class MethodHolder(VppTestCase):
22 """ NAT create capture and verify method holder """
26 super(MethodHolder, cls).setUpClass()
29 super(MethodHolder, self).tearDown()
31 def check_ip_checksum(self, pkt):
33 Check IP checksum of the packet
35 :param pkt: Packet to check IP checksum
37 new = pkt.__class__(str(pkt))
39 new = new.__class__(str(new))
40 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
42 def check_tcp_checksum(self, pkt):
44 Check TCP checksum in IP packet
46 :param pkt: Packet to check TCP checksum
48 new = pkt.__class__(str(pkt))
50 new = new.__class__(str(new))
51 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
53 def check_udp_checksum(self, pkt):
55 Check UDP checksum in IP packet
57 :param pkt: Packet to check UDP checksum
59 new = pkt.__class__(str(pkt))
61 new = new.__class__(str(new))
62 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
64 def check_icmp_errror_embedded(self, pkt):
66 Check ICMP error embeded packet checksum
68 :param pkt: Packet to check ICMP error embeded packet checksum
70 if pkt.haslayer(IPerror):
71 new = pkt.__class__(str(pkt))
72 del new['IPerror'].chksum
73 new = new.__class__(str(new))
74 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
76 if pkt.haslayer(TCPerror):
77 new = pkt.__class__(str(pkt))
78 del new['TCPerror'].chksum
79 new = new.__class__(str(new))
80 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
82 if pkt.haslayer(UDPerror):
83 if pkt['UDPerror'].chksum != 0:
84 new = pkt.__class__(str(pkt))
85 del new['UDPerror'].chksum
86 new = new.__class__(str(new))
87 self.assertEqual(new['UDPerror'].chksum,
88 pkt['UDPerror'].chksum)
90 if pkt.haslayer(ICMPerror):
91 del new['ICMPerror'].chksum
92 new = new.__class__(str(new))
93 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
95 def check_icmp_checksum(self, pkt):
97 Check ICMP checksum in IPv4 packet
99 :param pkt: Packet to check ICMP checksum
101 new = pkt.__class__(str(pkt))
102 del new['ICMP'].chksum
103 new = new.__class__(str(new))
104 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
105 if pkt.haslayer(IPerror):
106 self.check_icmp_errror_embedded(pkt)
108 def check_icmpv6_checksum(self, pkt):
110 Check ICMPv6 checksum in IPv4 packet
112 :param pkt: Packet to check ICMPv6 checksum
114 new = pkt.__class__(str(pkt))
115 if pkt.haslayer(ICMPv6DestUnreach):
116 del new['ICMPv6DestUnreach'].cksum
117 new = new.__class__(str(new))
118 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
119 pkt['ICMPv6DestUnreach'].cksum)
120 self.check_icmp_errror_embedded(pkt)
121 if pkt.haslayer(ICMPv6EchoRequest):
122 del new['ICMPv6EchoRequest'].cksum
123 new = new.__class__(str(new))
124 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
125 pkt['ICMPv6EchoRequest'].cksum)
126 if pkt.haslayer(ICMPv6EchoReply):
127 del new['ICMPv6EchoReply'].cksum
128 new = new.__class__(str(new))
129 self.assertEqual(new['ICMPv6EchoReply'].cksum,
130 pkt['ICMPv6EchoReply'].cksum)
132 def create_stream_in(self, in_if, out_if, ttl=64):
134 Create packet stream for inside network
136 :param in_if: Inside interface
137 :param out_if: Outside interface
138 :param ttl: TTL of generated packets
142 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
143 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
144 TCP(sport=self.tcp_port_in, dport=20))
148 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
149 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
150 UDP(sport=self.udp_port_in, dport=20))
154 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
155 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
156 ICMP(id=self.icmp_id_in, type='echo-request'))
161 def compose_ip6(self, ip4, pref, plen):
163 Compose IPv4-embedded IPv6 addresses
165 :param ip4: IPv4 address
166 :param pref: IPv6 prefix
167 :param plen: IPv6 prefix length
168 :returns: IPv4-embedded IPv6 addresses
170 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
171 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
186 pref_n[10] = ip4_n[3]
190 pref_n[10] = ip4_n[2]
191 pref_n[11] = ip4_n[3]
194 pref_n[10] = ip4_n[1]
195 pref_n[11] = ip4_n[2]
196 pref_n[12] = ip4_n[3]
198 pref_n[12] = ip4_n[0]
199 pref_n[13] = ip4_n[1]
200 pref_n[14] = ip4_n[2]
201 pref_n[15] = ip4_n[3]
202 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
204 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
206 Create IPv6 packet stream for inside network
208 :param in_if: Inside interface
209 :param out_if: Outside interface
210 :param ttl: Hop Limit of generated packets
211 :param pref: NAT64 prefix
212 :param plen: NAT64 prefix length
216 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
218 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
221 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
222 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
223 TCP(sport=self.tcp_port_in, dport=20))
227 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
228 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
229 UDP(sport=self.udp_port_in, dport=20))
233 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
234 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
235 ICMPv6EchoRequest(id=self.icmp_id_in))
240 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
242 Create packet stream for outside network
244 :param out_if: Outside interface
245 :param dst_ip: Destination IP address (Default use global NAT address)
246 :param ttl: TTL of generated packets
249 dst_ip = self.nat_addr
252 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
253 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
254 TCP(dport=self.tcp_port_out, sport=20))
258 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
259 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
260 UDP(dport=self.udp_port_out, sport=20))
264 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
265 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
266 ICMP(id=self.icmp_id_out, type='echo-reply'))
271 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
272 packet_num=3, dst_ip=None):
274 Verify captured packets on outside network
276 :param capture: Captured packets
277 :param nat_ip: Translated IP address (Default use global NAT address)
278 :param same_port: Sorce port number is not translated (Default False)
279 :param packet_num: Expected number of packets (Default 3)
280 :param dst_ip: Destination IP address (Default do not verify)
283 nat_ip = self.nat_addr
284 self.assertEqual(packet_num, len(capture))
285 for packet in capture:
287 self.check_ip_checksum(packet)
288 self.assertEqual(packet[IP].src, nat_ip)
289 if dst_ip is not None:
290 self.assertEqual(packet[IP].dst, dst_ip)
291 if packet.haslayer(TCP):
293 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
296 packet[TCP].sport, self.tcp_port_in)
297 self.tcp_port_out = packet[TCP].sport
298 self.check_tcp_checksum(packet)
299 elif packet.haslayer(UDP):
301 self.assertEqual(packet[UDP].sport, self.udp_port_in)
304 packet[UDP].sport, self.udp_port_in)
305 self.udp_port_out = packet[UDP].sport
308 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
310 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
311 self.icmp_id_out = packet[ICMP].id
312 self.check_icmp_checksum(packet)
314 self.logger.error(ppp("Unexpected or invalid packet "
315 "(outside network):", packet))
318 def verify_capture_in(self, capture, in_if, packet_num=3):
320 Verify captured packets on inside network
322 :param capture: Captured packets
323 :param in_if: Inside interface
324 :param packet_num: Expected number of packets (Default 3)
326 self.assertEqual(packet_num, len(capture))
327 for packet in capture:
329 self.check_ip_checksum(packet)
330 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
331 if packet.haslayer(TCP):
332 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
333 self.check_tcp_checksum(packet)
334 elif packet.haslayer(UDP):
335 self.assertEqual(packet[UDP].dport, self.udp_port_in)
337 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
338 self.check_icmp_checksum(packet)
340 self.logger.error(ppp("Unexpected or invalid packet "
341 "(inside network):", packet))
344 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
346 Verify captured IPv6 packets on inside network
348 :param capture: Captured packets
349 :param src_ip: Source IP
350 :param dst_ip: Destination IP address
351 :param packet_num: Expected number of packets (Default 3)
353 self.assertEqual(packet_num, len(capture))
354 for packet in capture:
356 self.assertEqual(packet[IPv6].src, src_ip)
357 self.assertEqual(packet[IPv6].dst, dst_ip)
358 if packet.haslayer(TCP):
359 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
360 self.check_tcp_checksum(packet)
361 elif packet.haslayer(UDP):
362 self.assertEqual(packet[UDP].dport, self.udp_port_in)
363 self.check_udp_checksum(packet)
365 self.assertEqual(packet[ICMPv6EchoReply].id,
367 self.check_icmpv6_checksum(packet)
369 self.logger.error(ppp("Unexpected or invalid packet "
370 "(inside network):", packet))
373 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
375 Verify captured packet that don't have to be translated
377 :param capture: Captured packets
378 :param ingress_if: Ingress interface
379 :param egress_if: Egress interface
381 for packet in capture:
383 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
384 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
385 if packet.haslayer(TCP):
386 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
387 elif packet.haslayer(UDP):
388 self.assertEqual(packet[UDP].sport, self.udp_port_in)
390 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
392 self.logger.error(ppp("Unexpected or invalid packet "
393 "(inside network):", packet))
396 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
397 packet_num=3, icmp_type=11):
399 Verify captured packets with ICMP errors on outside network
401 :param capture: Captured packets
402 :param src_ip: Translated IP address or IP address of VPP
403 (Default use global NAT address)
404 :param packet_num: Expected number of packets (Default 3)
405 :param icmp_type: Type of error ICMP packet
406 we are expecting (Default 11)
409 src_ip = self.nat_addr
410 self.assertEqual(packet_num, len(capture))
411 for packet in capture:
413 self.assertEqual(packet[IP].src, src_ip)
414 self.assertTrue(packet.haslayer(ICMP))
416 self.assertEqual(icmp.type, icmp_type)
417 self.assertTrue(icmp.haslayer(IPerror))
418 inner_ip = icmp[IPerror]
419 if inner_ip.haslayer(TCPerror):
420 self.assertEqual(inner_ip[TCPerror].dport,
422 elif inner_ip.haslayer(UDPerror):
423 self.assertEqual(inner_ip[UDPerror].dport,
426 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
428 self.logger.error(ppp("Unexpected or invalid packet "
429 "(outside network):", packet))
432 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
435 Verify captured packets with ICMP errors on inside network
437 :param capture: Captured packets
438 :param in_if: Inside interface
439 :param packet_num: Expected number of packets (Default 3)
440 :param icmp_type: Type of error ICMP packet
441 we are expecting (Default 11)
443 self.assertEqual(packet_num, len(capture))
444 for packet in capture:
446 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447 self.assertTrue(packet.haslayer(ICMP))
449 self.assertEqual(icmp.type, icmp_type)
450 self.assertTrue(icmp.haslayer(IPerror))
451 inner_ip = icmp[IPerror]
452 if inner_ip.haslayer(TCPerror):
453 self.assertEqual(inner_ip[TCPerror].sport,
455 elif inner_ip.haslayer(UDPerror):
456 self.assertEqual(inner_ip[UDPerror].sport,
459 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
461 self.logger.error(ppp("Unexpected or invalid packet "
462 "(inside network):", packet))
465 def verify_ipfix_nat44_ses(self, data):
467 Verify IPFIX NAT44 session create/delete event
469 :param data: Decoded IPFIX data records
471 nat44_ses_create_num = 0
472 nat44_ses_delete_num = 0
473 self.assertEqual(6, len(data))
476 self.assertIn(ord(record[230]), [4, 5])
477 if ord(record[230]) == 4:
478 nat44_ses_create_num += 1
480 nat44_ses_delete_num += 1
482 self.assertEqual(self.pg0.remote_ip4n, record[8])
483 # postNATSourceIPv4Address
484 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
487 self.assertEqual(struct.pack("!I", 0), record[234])
488 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
489 if IP_PROTOS.icmp == ord(record[4]):
490 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
491 self.assertEqual(struct.pack("!H", self.icmp_id_out),
493 elif IP_PROTOS.tcp == ord(record[4]):
494 self.assertEqual(struct.pack("!H", self.tcp_port_in),
496 self.assertEqual(struct.pack("!H", self.tcp_port_out),
498 elif IP_PROTOS.udp == ord(record[4]):
499 self.assertEqual(struct.pack("!H", self.udp_port_in),
501 self.assertEqual(struct.pack("!H", self.udp_port_out),
504 self.fail("Invalid protocol")
505 self.assertEqual(3, nat44_ses_create_num)
506 self.assertEqual(3, nat44_ses_delete_num)
508 def verify_ipfix_addr_exhausted(self, data):
510 Verify IPFIX NAT addresses event
512 :param data: Decoded IPFIX data records
514 self.assertEqual(1, len(data))
517 self.assertEqual(ord(record[230]), 3)
519 self.assertEqual(struct.pack("!I", 0), record[283])
522 class TestNAT44(MethodHolder):
523 """ NAT44 Test Cases """
527 super(TestNAT44, cls).setUpClass()
530 cls.tcp_port_in = 6303
531 cls.tcp_port_out = 6303
532 cls.udp_port_in = 6304
533 cls.udp_port_out = 6304
534 cls.icmp_id_in = 6305
535 cls.icmp_id_out = 6305
536 cls.nat_addr = '10.0.0.3'
537 cls.ipfix_src_port = 4739
538 cls.ipfix_domain_id = 1
540 cls.create_pg_interfaces(range(10))
541 cls.interfaces = list(cls.pg_interfaces[0:4])
543 for i in cls.interfaces:
548 cls.pg0.generate_remote_hosts(3)
549 cls.pg0.configure_ipv4_neighbors()
551 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
552 cls.vapi.ip_table_add_del(10, is_add=1)
553 cls.vapi.ip_table_add_del(20, is_add=1)
555 cls.pg4._local_ip4 = "172.16.255.1"
556 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
557 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
558 cls.pg4.set_table_ip4(10)
559 cls.pg5._local_ip4 = "172.17.255.3"
560 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
561 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
562 cls.pg5.set_table_ip4(10)
563 cls.pg6._local_ip4 = "172.16.255.1"
564 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
565 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
566 cls.pg6.set_table_ip4(20)
567 for i in cls.overlapping_interfaces:
575 cls.pg9.generate_remote_hosts(2)
577 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
578 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
582 cls.pg9.resolve_arp()
583 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
584 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
585 cls.pg9.resolve_arp()
588 super(TestNAT44, cls).tearDownClass()
591 def clear_nat44(self):
593 Clear NAT44 configuration.
595 # I found no elegant way to do this
596 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
597 dst_address_length=32,
598 next_hop_address=self.pg7.remote_ip4n,
599 next_hop_sw_if_index=self.pg7.sw_if_index,
601 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
602 dst_address_length=32,
603 next_hop_address=self.pg8.remote_ip4n,
604 next_hop_sw_if_index=self.pg8.sw_if_index,
607 for intf in [self.pg7, self.pg8]:
608 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
610 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
615 if self.pg7.has_ip4_config:
616 self.pg7.unconfig_ip4()
618 interfaces = self.vapi.nat44_interface_addr_dump()
619 for intf in interfaces:
620 self.vapi.nat44_add_interface_addr(intf.sw_if_index, is_add=0)
622 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
623 domain_id=self.ipfix_domain_id)
624 self.ipfix_src_port = 4739
625 self.ipfix_domain_id = 1
627 interfaces = self.vapi.nat44_interface_dump()
628 for intf in interfaces:
629 if intf.is_inside > 1:
630 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
633 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
637 interfaces = self.vapi.nat44_interface_output_feature_dump()
638 for intf in interfaces:
639 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
643 static_mappings = self.vapi.nat44_static_mapping_dump()
644 for sm in static_mappings:
645 self.vapi.nat44_add_del_static_mapping(
647 sm.external_ip_address,
648 local_port=sm.local_port,
649 external_port=sm.external_port,
650 addr_only=sm.addr_only,
652 protocol=sm.protocol,
655 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
656 for lb_sm in lb_static_mappings:
657 self.vapi.nat44_add_del_lb_static_mapping(
664 adresses = self.vapi.nat44_address_dump()
665 for addr in adresses:
666 self.vapi.nat44_add_del_address_range(addr.ip_address,
670 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
671 local_port=0, external_port=0, vrf_id=0,
672 is_add=1, external_sw_if_index=0xFFFFFFFF,
675 Add/delete NAT44 static mapping
677 :param local_ip: Local IP address
678 :param external_ip: External IP address
679 :param local_port: Local port number (Optional)
680 :param external_port: External port number (Optional)
681 :param vrf_id: VRF ID (Default 0)
682 :param is_add: 1 if add, 0 if delete (Default add)
683 :param external_sw_if_index: External interface instead of IP address
684 :param proto: IP protocol (Mandatory if port specified)
687 if local_port and external_port:
689 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
690 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
691 self.vapi.nat44_add_del_static_mapping(
694 external_sw_if_index,
702 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
704 Add/delete NAT44 address
706 :param ip: IP address
707 :param is_add: 1 if add, 0 if delete (Default add)
709 nat_addr = socket.inet_pton(socket.AF_INET, ip)
710 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
713 def test_dynamic(self):
714 """ NAT44 dynamic translation test """
716 self.nat44_add_address(self.nat_addr)
717 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
718 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
722 pkts = self.create_stream_in(self.pg0, self.pg1)
723 self.pg0.add_stream(pkts)
724 self.pg_enable_capture(self.pg_interfaces)
726 capture = self.pg1.get_capture(len(pkts))
727 self.verify_capture_out(capture)
730 pkts = self.create_stream_out(self.pg1)
731 self.pg1.add_stream(pkts)
732 self.pg_enable_capture(self.pg_interfaces)
734 capture = self.pg0.get_capture(len(pkts))
735 self.verify_capture_in(capture, self.pg0)
737 def test_dynamic_icmp_errors_in2out_ttl_1(self):
738 """ NAT44 handling of client packets with TTL=1 """
740 self.nat44_add_address(self.nat_addr)
741 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
742 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
745 # Client side - generate traffic
746 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
747 self.pg0.add_stream(pkts)
748 self.pg_enable_capture(self.pg_interfaces)
751 # Client side - verify ICMP type 11 packets
752 capture = self.pg0.get_capture(len(pkts))
753 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
755 def test_dynamic_icmp_errors_out2in_ttl_1(self):
756 """ NAT44 handling of server packets with TTL=1 """
758 self.nat44_add_address(self.nat_addr)
759 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
760 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
763 # Client side - create sessions
764 pkts = self.create_stream_in(self.pg0, self.pg1)
765 self.pg0.add_stream(pkts)
766 self.pg_enable_capture(self.pg_interfaces)
769 # Server side - generate traffic
770 capture = self.pg1.get_capture(len(pkts))
771 self.verify_capture_out(capture)
772 pkts = self.create_stream_out(self.pg1, ttl=1)
773 self.pg1.add_stream(pkts)
774 self.pg_enable_capture(self.pg_interfaces)
777 # Server side - verify ICMP type 11 packets
778 capture = self.pg1.get_capture(len(pkts))
779 self.verify_capture_out_with_icmp_errors(capture,
780 src_ip=self.pg1.local_ip4)
782 def test_dynamic_icmp_errors_in2out_ttl_2(self):
783 """ NAT44 handling of error responses to client packets with TTL=2 """
785 self.nat44_add_address(self.nat_addr)
786 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
787 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
790 # Client side - generate traffic
791 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
792 self.pg0.add_stream(pkts)
793 self.pg_enable_capture(self.pg_interfaces)
796 # Server side - simulate ICMP type 11 response
797 capture = self.pg1.get_capture(len(pkts))
798 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
799 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
800 ICMP(type=11) / packet[IP] for packet in capture]
801 self.pg1.add_stream(pkts)
802 self.pg_enable_capture(self.pg_interfaces)
805 # Client side - verify ICMP type 11 packets
806 capture = self.pg0.get_capture(len(pkts))
807 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
809 def test_dynamic_icmp_errors_out2in_ttl_2(self):
810 """ NAT44 handling of error responses to server packets with TTL=2 """
812 self.nat44_add_address(self.nat_addr)
813 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
814 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
817 # Client side - create sessions
818 pkts = self.create_stream_in(self.pg0, self.pg1)
819 self.pg0.add_stream(pkts)
820 self.pg_enable_capture(self.pg_interfaces)
823 # Server side - generate traffic
824 capture = self.pg1.get_capture(len(pkts))
825 self.verify_capture_out(capture)
826 pkts = self.create_stream_out(self.pg1, ttl=2)
827 self.pg1.add_stream(pkts)
828 self.pg_enable_capture(self.pg_interfaces)
831 # Client side - simulate ICMP type 11 response
832 capture = self.pg0.get_capture(len(pkts))
833 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
834 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
835 ICMP(type=11) / packet[IP] for packet in capture]
836 self.pg0.add_stream(pkts)
837 self.pg_enable_capture(self.pg_interfaces)
840 # Server side - verify ICMP type 11 packets
841 capture = self.pg1.get_capture(len(pkts))
842 self.verify_capture_out_with_icmp_errors(capture)
844 def test_ping_out_interface_from_outside(self):
845 """ Ping NAT44 out interface from outside network """
847 self.nat44_add_address(self.nat_addr)
848 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
849 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
852 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
853 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
854 ICMP(id=self.icmp_id_out, type='echo-request'))
856 self.pg1.add_stream(pkts)
857 self.pg_enable_capture(self.pg_interfaces)
859 capture = self.pg1.get_capture(len(pkts))
860 self.assertEqual(1, len(capture))
863 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
864 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
865 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
866 self.assertEqual(packet[ICMP].type, 0) # echo reply
868 self.logger.error(ppp("Unexpected or invalid packet "
869 "(outside network):", packet))
872 def test_ping_internal_host_from_outside(self):
873 """ Ping internal host from outside network """
875 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
876 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
877 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
881 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
882 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
883 ICMP(id=self.icmp_id_out, type='echo-request'))
884 self.pg1.add_stream(pkt)
885 self.pg_enable_capture(self.pg_interfaces)
887 capture = self.pg0.get_capture(1)
888 self.verify_capture_in(capture, self.pg0, packet_num=1)
889 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
892 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
893 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
894 ICMP(id=self.icmp_id_in, type='echo-reply'))
895 self.pg0.add_stream(pkt)
896 self.pg_enable_capture(self.pg_interfaces)
898 capture = self.pg1.get_capture(1)
899 self.verify_capture_out(capture, same_port=True, packet_num=1)
900 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
902 def test_static_in(self):
903 """ 1:1 NAT initialized from inside network """
906 self.tcp_port_out = 6303
907 self.udp_port_out = 6304
908 self.icmp_id_out = 6305
910 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
911 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
912 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
916 pkts = self.create_stream_in(self.pg0, self.pg1)
917 self.pg0.add_stream(pkts)
918 self.pg_enable_capture(self.pg_interfaces)
920 capture = self.pg1.get_capture(len(pkts))
921 self.verify_capture_out(capture, nat_ip, True)
924 pkts = self.create_stream_out(self.pg1, nat_ip)
925 self.pg1.add_stream(pkts)
926 self.pg_enable_capture(self.pg_interfaces)
928 capture = self.pg0.get_capture(len(pkts))
929 self.verify_capture_in(capture, self.pg0)
931 def test_static_out(self):
932 """ 1:1 NAT initialized from outside network """
935 self.tcp_port_out = 6303
936 self.udp_port_out = 6304
937 self.icmp_id_out = 6305
939 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
940 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
941 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
945 pkts = self.create_stream_out(self.pg1, nat_ip)
946 self.pg1.add_stream(pkts)
947 self.pg_enable_capture(self.pg_interfaces)
949 capture = self.pg0.get_capture(len(pkts))
950 self.verify_capture_in(capture, self.pg0)
953 pkts = self.create_stream_in(self.pg0, self.pg1)
954 self.pg0.add_stream(pkts)
955 self.pg_enable_capture(self.pg_interfaces)
957 capture = self.pg1.get_capture(len(pkts))
958 self.verify_capture_out(capture, nat_ip, True)
960 def test_static_with_port_in(self):
961 """ 1:1 NAPT initialized from inside network """
963 self.tcp_port_out = 3606
964 self.udp_port_out = 3607
965 self.icmp_id_out = 3608
967 self.nat44_add_address(self.nat_addr)
968 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
969 self.tcp_port_in, self.tcp_port_out,
971 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
972 self.udp_port_in, self.udp_port_out,
974 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
975 self.icmp_id_in, self.icmp_id_out,
976 proto=IP_PROTOS.icmp)
977 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
978 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
982 pkts = self.create_stream_in(self.pg0, self.pg1)
983 self.pg0.add_stream(pkts)
984 self.pg_enable_capture(self.pg_interfaces)
986 capture = self.pg1.get_capture(len(pkts))
987 self.verify_capture_out(capture)
990 pkts = self.create_stream_out(self.pg1)
991 self.pg1.add_stream(pkts)
992 self.pg_enable_capture(self.pg_interfaces)
994 capture = self.pg0.get_capture(len(pkts))
995 self.verify_capture_in(capture, self.pg0)
997 def test_static_with_port_out(self):
998 """ 1:1 NAPT initialized from outside network """
1000 self.tcp_port_out = 30606
1001 self.udp_port_out = 30607
1002 self.icmp_id_out = 30608
1004 self.nat44_add_address(self.nat_addr)
1005 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1006 self.tcp_port_in, self.tcp_port_out,
1007 proto=IP_PROTOS.tcp)
1008 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1009 self.udp_port_in, self.udp_port_out,
1010 proto=IP_PROTOS.udp)
1011 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1012 self.icmp_id_in, self.icmp_id_out,
1013 proto=IP_PROTOS.icmp)
1014 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1015 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1019 pkts = self.create_stream_out(self.pg1)
1020 self.pg1.add_stream(pkts)
1021 self.pg_enable_capture(self.pg_interfaces)
1023 capture = self.pg0.get_capture(len(pkts))
1024 self.verify_capture_in(capture, self.pg0)
1027 pkts = self.create_stream_in(self.pg0, self.pg1)
1028 self.pg0.add_stream(pkts)
1029 self.pg_enable_capture(self.pg_interfaces)
1031 capture = self.pg1.get_capture(len(pkts))
1032 self.verify_capture_out(capture)
1034 def test_static_vrf_aware(self):
1035 """ 1:1 NAT VRF awareness """
1037 nat_ip1 = "10.0.0.30"
1038 nat_ip2 = "10.0.0.40"
1039 self.tcp_port_out = 6303
1040 self.udp_port_out = 6304
1041 self.icmp_id_out = 6305
1043 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1045 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1047 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1049 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1050 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1052 # inside interface VRF match NAT44 static mapping VRF
1053 pkts = self.create_stream_in(self.pg4, self.pg3)
1054 self.pg4.add_stream(pkts)
1055 self.pg_enable_capture(self.pg_interfaces)
1057 capture = self.pg3.get_capture(len(pkts))
1058 self.verify_capture_out(capture, nat_ip1, True)
1060 # inside interface VRF don't match NAT44 static mapping VRF (packets
1062 pkts = self.create_stream_in(self.pg0, self.pg3)
1063 self.pg0.add_stream(pkts)
1064 self.pg_enable_capture(self.pg_interfaces)
1066 self.pg3.assert_nothing_captured()
1068 def test_static_lb(self):
1069 """ NAT44 local service load balancing """
1070 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1073 server1 = self.pg0.remote_hosts[0]
1074 server2 = self.pg0.remote_hosts[1]
1076 locals = [{'addr': server1.ip4n,
1079 {'addr': server2.ip4n,
1083 self.nat44_add_address(self.nat_addr)
1084 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1087 local_num=len(locals),
1089 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1090 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1093 # from client to service
1094 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1095 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1096 TCP(sport=12345, dport=external_port))
1097 self.pg1.add_stream(p)
1098 self.pg_enable_capture(self.pg_interfaces)
1100 capture = self.pg0.get_capture(1)
1106 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1107 if ip.dst == server1.ip4:
1111 self.assertEqual(tcp.dport, local_port)
1112 self.check_tcp_checksum(p)
1113 self.check_ip_checksum(p)
1115 self.logger.error(ppp("Unexpected or invalid packet:", p))
1118 # from service back to client
1119 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1120 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1121 TCP(sport=local_port, dport=12345))
1122 self.pg0.add_stream(p)
1123 self.pg_enable_capture(self.pg_interfaces)
1125 capture = self.pg1.get_capture(1)
1130 self.assertEqual(ip.src, self.nat_addr)
1131 self.assertEqual(tcp.sport, external_port)
1132 self.check_tcp_checksum(p)
1133 self.check_ip_checksum(p)
1135 self.logger.error(ppp("Unexpected or invalid packet:", p))
1141 clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1143 for client in clients:
1144 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1145 IP(src=client, dst=self.nat_addr) /
1146 TCP(sport=12345, dport=external_port))
1148 self.pg1.add_stream(pkts)
1149 self.pg_enable_capture(self.pg_interfaces)
1151 capture = self.pg0.get_capture(len(pkts))
1153 if p[IP].dst == server1.ip4:
1157 self.assertTrue(server1_n > server2_n)
1159 def test_multiple_inside_interfaces(self):
1160 """ NAT44 multiple non-overlapping address space inside interfaces """
1162 self.nat44_add_address(self.nat_addr)
1163 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1164 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1165 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1168 # between two NAT44 inside interfaces (no translation)
1169 pkts = self.create_stream_in(self.pg0, self.pg1)
1170 self.pg0.add_stream(pkts)
1171 self.pg_enable_capture(self.pg_interfaces)
1173 capture = self.pg1.get_capture(len(pkts))
1174 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1176 # from NAT44 inside to interface without NAT44 feature (no translation)
1177 pkts = self.create_stream_in(self.pg0, self.pg2)
1178 self.pg0.add_stream(pkts)
1179 self.pg_enable_capture(self.pg_interfaces)
1181 capture = self.pg2.get_capture(len(pkts))
1182 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1184 # in2out 1st interface
1185 pkts = self.create_stream_in(self.pg0, self.pg3)
1186 self.pg0.add_stream(pkts)
1187 self.pg_enable_capture(self.pg_interfaces)
1189 capture = self.pg3.get_capture(len(pkts))
1190 self.verify_capture_out(capture)
1192 # out2in 1st interface
1193 pkts = self.create_stream_out(self.pg3)
1194 self.pg3.add_stream(pkts)
1195 self.pg_enable_capture(self.pg_interfaces)
1197 capture = self.pg0.get_capture(len(pkts))
1198 self.verify_capture_in(capture, self.pg0)
1200 # in2out 2nd interface
1201 pkts = self.create_stream_in(self.pg1, self.pg3)
1202 self.pg1.add_stream(pkts)
1203 self.pg_enable_capture(self.pg_interfaces)
1205 capture = self.pg3.get_capture(len(pkts))
1206 self.verify_capture_out(capture)
1208 # out2in 2nd interface
1209 pkts = self.create_stream_out(self.pg3)
1210 self.pg3.add_stream(pkts)
1211 self.pg_enable_capture(self.pg_interfaces)
1213 capture = self.pg1.get_capture(len(pkts))
1214 self.verify_capture_in(capture, self.pg1)
1216 def test_inside_overlapping_interfaces(self):
1217 """ NAT44 multiple inside interfaces with overlapping address space """
1219 static_nat_ip = "10.0.0.10"
1220 self.nat44_add_address(self.nat_addr)
1221 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1223 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1224 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1225 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1226 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1229 # between NAT44 inside interfaces with same VRF (no translation)
1230 pkts = self.create_stream_in(self.pg4, self.pg5)
1231 self.pg4.add_stream(pkts)
1232 self.pg_enable_capture(self.pg_interfaces)
1234 capture = self.pg5.get_capture(len(pkts))
1235 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1237 # between NAT44 inside interfaces with different VRF (hairpinning)
1238 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1239 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1240 TCP(sport=1234, dport=5678))
1241 self.pg4.add_stream(p)
1242 self.pg_enable_capture(self.pg_interfaces)
1244 capture = self.pg6.get_capture(1)
1249 self.assertEqual(ip.src, self.nat_addr)
1250 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1251 self.assertNotEqual(tcp.sport, 1234)
1252 self.assertEqual(tcp.dport, 5678)
1254 self.logger.error(ppp("Unexpected or invalid packet:", p))
1257 # in2out 1st interface
1258 pkts = self.create_stream_in(self.pg4, self.pg3)
1259 self.pg4.add_stream(pkts)
1260 self.pg_enable_capture(self.pg_interfaces)
1262 capture = self.pg3.get_capture(len(pkts))
1263 self.verify_capture_out(capture)
1265 # out2in 1st interface
1266 pkts = self.create_stream_out(self.pg3)
1267 self.pg3.add_stream(pkts)
1268 self.pg_enable_capture(self.pg_interfaces)
1270 capture = self.pg4.get_capture(len(pkts))
1271 self.verify_capture_in(capture, self.pg4)
1273 # in2out 2nd interface
1274 pkts = self.create_stream_in(self.pg5, self.pg3)
1275 self.pg5.add_stream(pkts)
1276 self.pg_enable_capture(self.pg_interfaces)
1278 capture = self.pg3.get_capture(len(pkts))
1279 self.verify_capture_out(capture)
1281 # out2in 2nd interface
1282 pkts = self.create_stream_out(self.pg3)
1283 self.pg3.add_stream(pkts)
1284 self.pg_enable_capture(self.pg_interfaces)
1286 capture = self.pg5.get_capture(len(pkts))
1287 self.verify_capture_in(capture, self.pg5)
1290 addresses = self.vapi.nat44_address_dump()
1291 self.assertEqual(len(addresses), 1)
1292 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1293 self.assertEqual(len(sessions), 3)
1294 for session in sessions:
1295 self.assertFalse(session.is_static)
1296 self.assertEqual(session.inside_ip_address[0:4],
1297 self.pg5.remote_ip4n)
1298 self.assertEqual(session.outside_ip_address,
1299 addresses[0].ip_address)
1300 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1301 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1302 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1303 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1304 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1305 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1306 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1307 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1308 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1310 # in2out 3rd interface
1311 pkts = self.create_stream_in(self.pg6, self.pg3)
1312 self.pg6.add_stream(pkts)
1313 self.pg_enable_capture(self.pg_interfaces)
1315 capture = self.pg3.get_capture(len(pkts))
1316 self.verify_capture_out(capture, static_nat_ip, True)
1318 # out2in 3rd interface
1319 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1320 self.pg3.add_stream(pkts)
1321 self.pg_enable_capture(self.pg_interfaces)
1323 capture = self.pg6.get_capture(len(pkts))
1324 self.verify_capture_in(capture, self.pg6)
1326 # general user and session dump verifications
1327 users = self.vapi.nat44_user_dump()
1328 self.assertTrue(len(users) >= 3)
1329 addresses = self.vapi.nat44_address_dump()
1330 self.assertEqual(len(addresses), 1)
1332 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1334 for session in sessions:
1335 self.assertEqual(user.ip_address, session.inside_ip_address)
1336 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1337 self.assertTrue(session.protocol in
1338 [IP_PROTOS.tcp, IP_PROTOS.udp,
1342 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1343 self.assertTrue(len(sessions) >= 4)
1344 for session in sessions:
1345 self.assertFalse(session.is_static)
1346 self.assertEqual(session.inside_ip_address[0:4],
1347 self.pg4.remote_ip4n)
1348 self.assertEqual(session.outside_ip_address,
1349 addresses[0].ip_address)
1352 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1353 self.assertTrue(len(sessions) >= 3)
1354 for session in sessions:
1355 self.assertTrue(session.is_static)
1356 self.assertEqual(session.inside_ip_address[0:4],
1357 self.pg6.remote_ip4n)
1358 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1359 map(int, static_nat_ip.split('.')))
1360 self.assertTrue(session.inside_port in
1361 [self.tcp_port_in, self.udp_port_in,
1364 def test_hairpinning(self):
1365 """ NAT44 hairpinning - 1:1 NAPT """
1367 host = self.pg0.remote_hosts[0]
1368 server = self.pg0.remote_hosts[1]
1371 server_in_port = 5678
1372 server_out_port = 8765
1374 self.nat44_add_address(self.nat_addr)
1375 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1376 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1378 # add static mapping for server
1379 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1380 server_in_port, server_out_port,
1381 proto=IP_PROTOS.tcp)
1383 # send packet from host to server
1384 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1385 IP(src=host.ip4, dst=self.nat_addr) /
1386 TCP(sport=host_in_port, dport=server_out_port))
1387 self.pg0.add_stream(p)
1388 self.pg_enable_capture(self.pg_interfaces)
1390 capture = self.pg0.get_capture(1)
1395 self.assertEqual(ip.src, self.nat_addr)
1396 self.assertEqual(ip.dst, server.ip4)
1397 self.assertNotEqual(tcp.sport, host_in_port)
1398 self.assertEqual(tcp.dport, server_in_port)
1399 self.check_tcp_checksum(p)
1400 host_out_port = tcp.sport
1402 self.logger.error(ppp("Unexpected or invalid packet:", p))
1405 # send reply from server to host
1406 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1407 IP(src=server.ip4, dst=self.nat_addr) /
1408 TCP(sport=server_in_port, dport=host_out_port))
1409 self.pg0.add_stream(p)
1410 self.pg_enable_capture(self.pg_interfaces)
1412 capture = self.pg0.get_capture(1)
1417 self.assertEqual(ip.src, self.nat_addr)
1418 self.assertEqual(ip.dst, host.ip4)
1419 self.assertEqual(tcp.sport, server_out_port)
1420 self.assertEqual(tcp.dport, host_in_port)
1421 self.check_tcp_checksum(p)
1423 self.logger.error(ppp("Unexpected or invalid packet:"), p)
1426 def test_hairpinning2(self):
1427 """ NAT44 hairpinning - 1:1 NAT"""
1429 server1_nat_ip = "10.0.0.10"
1430 server2_nat_ip = "10.0.0.11"
1431 host = self.pg0.remote_hosts[0]
1432 server1 = self.pg0.remote_hosts[1]
1433 server2 = self.pg0.remote_hosts[2]
1434 server_tcp_port = 22
1435 server_udp_port = 20
1437 self.nat44_add_address(self.nat_addr)
1438 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1439 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1442 # add static mapping for servers
1443 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1444 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1448 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1449 IP(src=host.ip4, dst=server1_nat_ip) /
1450 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1452 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1453 IP(src=host.ip4, dst=server1_nat_ip) /
1454 UDP(sport=self.udp_port_in, dport=server_udp_port))
1456 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1457 IP(src=host.ip4, dst=server1_nat_ip) /
1458 ICMP(id=self.icmp_id_in, type='echo-request'))
1460 self.pg0.add_stream(pkts)
1461 self.pg_enable_capture(self.pg_interfaces)
1463 capture = self.pg0.get_capture(len(pkts))
1464 for packet in capture:
1466 self.assertEqual(packet[IP].src, self.nat_addr)
1467 self.assertEqual(packet[IP].dst, server1.ip4)
1468 if packet.haslayer(TCP):
1469 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1470 self.assertEqual(packet[TCP].dport, server_tcp_port)
1471 self.tcp_port_out = packet[TCP].sport
1472 self.check_tcp_checksum(packet)
1473 elif packet.haslayer(UDP):
1474 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1475 self.assertEqual(packet[UDP].dport, server_udp_port)
1476 self.udp_port_out = packet[UDP].sport
1478 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1479 self.icmp_id_out = packet[ICMP].id
1481 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1486 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1487 IP(src=server1.ip4, dst=self.nat_addr) /
1488 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1490 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1491 IP(src=server1.ip4, dst=self.nat_addr) /
1492 UDP(sport=server_udp_port, dport=self.udp_port_out))
1494 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1495 IP(src=server1.ip4, dst=self.nat_addr) /
1496 ICMP(id=self.icmp_id_out, type='echo-reply'))
1498 self.pg0.add_stream(pkts)
1499 self.pg_enable_capture(self.pg_interfaces)
1501 capture = self.pg0.get_capture(len(pkts))
1502 for packet in capture:
1504 self.assertEqual(packet[IP].src, server1_nat_ip)
1505 self.assertEqual(packet[IP].dst, host.ip4)
1506 if packet.haslayer(TCP):
1507 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1508 self.assertEqual(packet[TCP].sport, server_tcp_port)
1509 self.check_tcp_checksum(packet)
1510 elif packet.haslayer(UDP):
1511 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1512 self.assertEqual(packet[UDP].sport, server_udp_port)
1514 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1516 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1519 # server2 to server1
1521 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1522 IP(src=server2.ip4, dst=server1_nat_ip) /
1523 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1525 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1526 IP(src=server2.ip4, dst=server1_nat_ip) /
1527 UDP(sport=self.udp_port_in, dport=server_udp_port))
1529 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1530 IP(src=server2.ip4, dst=server1_nat_ip) /
1531 ICMP(id=self.icmp_id_in, type='echo-request'))
1533 self.pg0.add_stream(pkts)
1534 self.pg_enable_capture(self.pg_interfaces)
1536 capture = self.pg0.get_capture(len(pkts))
1537 for packet in capture:
1539 self.assertEqual(packet[IP].src, server2_nat_ip)
1540 self.assertEqual(packet[IP].dst, server1.ip4)
1541 if packet.haslayer(TCP):
1542 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1543 self.assertEqual(packet[TCP].dport, server_tcp_port)
1544 self.tcp_port_out = packet[TCP].sport
1545 self.check_tcp_checksum(packet)
1546 elif packet.haslayer(UDP):
1547 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1548 self.assertEqual(packet[UDP].dport, server_udp_port)
1549 self.udp_port_out = packet[UDP].sport
1551 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1552 self.icmp_id_out = packet[ICMP].id
1554 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1557 # server1 to server2
1559 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1560 IP(src=server1.ip4, dst=server2_nat_ip) /
1561 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1563 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1564 IP(src=server1.ip4, dst=server2_nat_ip) /
1565 UDP(sport=server_udp_port, dport=self.udp_port_out))
1567 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1568 IP(src=server1.ip4, dst=server2_nat_ip) /
1569 ICMP(id=self.icmp_id_out, type='echo-reply'))
1571 self.pg0.add_stream(pkts)
1572 self.pg_enable_capture(self.pg_interfaces)
1574 capture = self.pg0.get_capture(len(pkts))
1575 for packet in capture:
1577 self.assertEqual(packet[IP].src, server1_nat_ip)
1578 self.assertEqual(packet[IP].dst, server2.ip4)
1579 if packet.haslayer(TCP):
1580 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1581 self.assertEqual(packet[TCP].sport, server_tcp_port)
1582 self.check_tcp_checksum(packet)
1583 elif packet.haslayer(UDP):
1584 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1585 self.assertEqual(packet[UDP].sport, server_udp_port)
1587 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1589 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1592 def test_max_translations_per_user(self):
1593 """ MAX translations per user - recycle the least recently used """
1595 self.nat44_add_address(self.nat_addr)
1596 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1597 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1600 # get maximum number of translations per user
1601 nat44_config = self.vapi.nat_show_config()
1603 # send more than maximum number of translations per user packets
1604 pkts_num = nat44_config.max_translations_per_user + 5
1606 for port in range(0, pkts_num):
1607 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1608 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1609 TCP(sport=1025 + port))
1611 self.pg0.add_stream(pkts)
1612 self.pg_enable_capture(self.pg_interfaces)
1615 # verify number of translated packet
1616 self.pg1.get_capture(pkts_num)
1618 def test_interface_addr(self):
1619 """ Acquire NAT44 addresses from interface """
1620 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1622 # no address in NAT pool
1623 adresses = self.vapi.nat44_address_dump()
1624 self.assertEqual(0, len(adresses))
1626 # configure interface address and check NAT address pool
1627 self.pg7.config_ip4()
1628 adresses = self.vapi.nat44_address_dump()
1629 self.assertEqual(1, len(adresses))
1630 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1632 # remove interface address and check NAT address pool
1633 self.pg7.unconfig_ip4()
1634 adresses = self.vapi.nat44_address_dump()
1635 self.assertEqual(0, len(adresses))
1637 def test_interface_addr_static_mapping(self):
1638 """ Static mapping with addresses from interface """
1639 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1640 self.nat44_add_static_mapping(
1642 external_sw_if_index=self.pg7.sw_if_index)
1644 # static mappings with external interface
1645 static_mappings = self.vapi.nat44_static_mapping_dump()
1646 self.assertEqual(1, len(static_mappings))
1647 self.assertEqual(self.pg7.sw_if_index,
1648 static_mappings[0].external_sw_if_index)
1650 # configure interface address and check static mappings
1651 self.pg7.config_ip4()
1652 static_mappings = self.vapi.nat44_static_mapping_dump()
1653 self.assertEqual(1, len(static_mappings))
1654 self.assertEqual(static_mappings[0].external_ip_address[0:4],
1655 self.pg7.local_ip4n)
1656 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1658 # remove interface address and check static mappings
1659 self.pg7.unconfig_ip4()
1660 static_mappings = self.vapi.nat44_static_mapping_dump()
1661 self.assertEqual(0, len(static_mappings))
1663 def test_ipfix_nat44_sess(self):
1664 """ IPFIX logging NAT44 session created/delted """
1665 self.ipfix_domain_id = 10
1666 self.ipfix_src_port = 20202
1667 colector_port = 30303
1668 bind_layers(UDP, IPFIX, dport=30303)
1669 self.nat44_add_address(self.nat_addr)
1670 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1671 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1673 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1674 src_address=self.pg3.local_ip4n,
1676 template_interval=10,
1677 collector_port=colector_port)
1678 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1679 src_port=self.ipfix_src_port)
1681 pkts = self.create_stream_in(self.pg0, self.pg1)
1682 self.pg0.add_stream(pkts)
1683 self.pg_enable_capture(self.pg_interfaces)
1685 capture = self.pg1.get_capture(len(pkts))
1686 self.verify_capture_out(capture)
1687 self.nat44_add_address(self.nat_addr, is_add=0)
1688 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1689 capture = self.pg3.get_capture(3)
1690 ipfix = IPFIXDecoder()
1691 # first load template
1693 self.assertTrue(p.haslayer(IPFIX))
1694 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1695 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1696 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1697 self.assertEqual(p[UDP].dport, colector_port)
1698 self.assertEqual(p[IPFIX].observationDomainID,
1699 self.ipfix_domain_id)
1700 if p.haslayer(Template):
1701 ipfix.add_template(p.getlayer(Template))
1702 # verify events in data set
1704 if p.haslayer(Data):
1705 data = ipfix.decode_data_set(p.getlayer(Set))
1706 self.verify_ipfix_nat44_ses(data)
1708 def test_ipfix_addr_exhausted(self):
1709 """ IPFIX logging NAT addresses exhausted """
1710 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1711 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1713 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1714 src_address=self.pg3.local_ip4n,
1716 template_interval=10)
1717 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1718 src_port=self.ipfix_src_port)
1720 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1721 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1723 self.pg0.add_stream(p)
1724 self.pg_enable_capture(self.pg_interfaces)
1726 capture = self.pg1.get_capture(0)
1727 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1728 capture = self.pg3.get_capture(3)
1729 ipfix = IPFIXDecoder()
1730 # first load template
1732 self.assertTrue(p.haslayer(IPFIX))
1733 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1734 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1735 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1736 self.assertEqual(p[UDP].dport, 4739)
1737 self.assertEqual(p[IPFIX].observationDomainID,
1738 self.ipfix_domain_id)
1739 if p.haslayer(Template):
1740 ipfix.add_template(p.getlayer(Template))
1741 # verify events in data set
1743 if p.haslayer(Data):
1744 data = ipfix.decode_data_set(p.getlayer(Set))
1745 self.verify_ipfix_addr_exhausted(data)
1747 def test_pool_addr_fib(self):
1748 """ NAT44 add pool addresses to FIB """
1749 static_addr = '10.0.0.10'
1750 self.nat44_add_address(self.nat_addr)
1751 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1752 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1754 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
1757 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1758 ARP(op=ARP.who_has, pdst=self.nat_addr,
1759 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1760 self.pg1.add_stream(p)
1761 self.pg_enable_capture(self.pg_interfaces)
1763 capture = self.pg1.get_capture(1)
1764 self.assertTrue(capture[0].haslayer(ARP))
1765 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1768 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1769 ARP(op=ARP.who_has, pdst=static_addr,
1770 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1771 self.pg1.add_stream(p)
1772 self.pg_enable_capture(self.pg_interfaces)
1774 capture = self.pg1.get_capture(1)
1775 self.assertTrue(capture[0].haslayer(ARP))
1776 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1778 # send ARP to non-NAT44 interface
1779 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1780 ARP(op=ARP.who_has, pdst=self.nat_addr,
1781 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1782 self.pg2.add_stream(p)
1783 self.pg_enable_capture(self.pg_interfaces)
1785 capture = self.pg1.get_capture(0)
1787 # remove addresses and verify
1788 self.nat44_add_address(self.nat_addr, is_add=0)
1789 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
1792 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1793 ARP(op=ARP.who_has, pdst=self.nat_addr,
1794 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1795 self.pg1.add_stream(p)
1796 self.pg_enable_capture(self.pg_interfaces)
1798 capture = self.pg1.get_capture(0)
1800 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1801 ARP(op=ARP.who_has, pdst=static_addr,
1802 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1803 self.pg1.add_stream(p)
1804 self.pg_enable_capture(self.pg_interfaces)
1806 capture = self.pg1.get_capture(0)
1808 def test_vrf_mode(self):
1809 """ NAT44 tenant VRF aware address pool mode """
1813 nat_ip1 = "10.0.0.10"
1814 nat_ip2 = "10.0.0.11"
1816 self.pg0.unconfig_ip4()
1817 self.pg1.unconfig_ip4()
1818 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
1819 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
1820 self.pg0.set_table_ip4(vrf_id1)
1821 self.pg1.set_table_ip4(vrf_id2)
1822 self.pg0.config_ip4()
1823 self.pg1.config_ip4()
1825 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
1826 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
1827 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1828 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1829 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1833 pkts = self.create_stream_in(self.pg0, self.pg2)
1834 self.pg0.add_stream(pkts)
1835 self.pg_enable_capture(self.pg_interfaces)
1837 capture = self.pg2.get_capture(len(pkts))
1838 self.verify_capture_out(capture, nat_ip1)
1841 pkts = self.create_stream_in(self.pg1, self.pg2)
1842 self.pg1.add_stream(pkts)
1843 self.pg_enable_capture(self.pg_interfaces)
1845 capture = self.pg2.get_capture(len(pkts))
1846 self.verify_capture_out(capture, nat_ip2)
1848 self.pg0.unconfig_ip4()
1849 self.pg1.unconfig_ip4()
1850 self.pg0.set_table_ip4(0)
1851 self.pg1.set_table_ip4(0)
1852 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
1853 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
1855 def test_vrf_feature_independent(self):
1856 """ NAT44 tenant VRF independent address pool mode """
1858 nat_ip1 = "10.0.0.10"
1859 nat_ip2 = "10.0.0.11"
1861 self.nat44_add_address(nat_ip1)
1862 self.nat44_add_address(nat_ip2)
1863 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1864 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1865 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1869 pkts = self.create_stream_in(self.pg0, self.pg2)
1870 self.pg0.add_stream(pkts)
1871 self.pg_enable_capture(self.pg_interfaces)
1873 capture = self.pg2.get_capture(len(pkts))
1874 self.verify_capture_out(capture, nat_ip1)
1877 pkts = self.create_stream_in(self.pg1, self.pg2)
1878 self.pg1.add_stream(pkts)
1879 self.pg_enable_capture(self.pg_interfaces)
1881 capture = self.pg2.get_capture(len(pkts))
1882 self.verify_capture_out(capture, nat_ip1)
1884 def test_dynamic_ipless_interfaces(self):
1885 """ NAT44 interfaces without configured IP address """
1887 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1888 self.pg7.remote_mac,
1889 self.pg7.remote_ip4n,
1891 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1892 self.pg8.remote_mac,
1893 self.pg8.remote_ip4n,
1896 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1897 dst_address_length=32,
1898 next_hop_address=self.pg7.remote_ip4n,
1899 next_hop_sw_if_index=self.pg7.sw_if_index)
1900 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1901 dst_address_length=32,
1902 next_hop_address=self.pg8.remote_ip4n,
1903 next_hop_sw_if_index=self.pg8.sw_if_index)
1905 self.nat44_add_address(self.nat_addr)
1906 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1907 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1911 pkts = self.create_stream_in(self.pg7, self.pg8)
1912 self.pg7.add_stream(pkts)
1913 self.pg_enable_capture(self.pg_interfaces)
1915 capture = self.pg8.get_capture(len(pkts))
1916 self.verify_capture_out(capture)
1919 pkts = self.create_stream_out(self.pg8, self.nat_addr)
1920 self.pg8.add_stream(pkts)
1921 self.pg_enable_capture(self.pg_interfaces)
1923 capture = self.pg7.get_capture(len(pkts))
1924 self.verify_capture_in(capture, self.pg7)
1926 def test_static_ipless_interfaces(self):
1927 """ NAT44 interfaces without configured IP address - 1:1 NAT """
1929 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1930 self.pg7.remote_mac,
1931 self.pg7.remote_ip4n,
1933 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1934 self.pg8.remote_mac,
1935 self.pg8.remote_ip4n,
1938 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1939 dst_address_length=32,
1940 next_hop_address=self.pg7.remote_ip4n,
1941 next_hop_sw_if_index=self.pg7.sw_if_index)
1942 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1943 dst_address_length=32,
1944 next_hop_address=self.pg8.remote_ip4n,
1945 next_hop_sw_if_index=self.pg8.sw_if_index)
1947 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
1948 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1949 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1953 pkts = self.create_stream_out(self.pg8)
1954 self.pg8.add_stream(pkts)
1955 self.pg_enable_capture(self.pg_interfaces)
1957 capture = self.pg7.get_capture(len(pkts))
1958 self.verify_capture_in(capture, self.pg7)
1961 pkts = self.create_stream_in(self.pg7, self.pg8)
1962 self.pg7.add_stream(pkts)
1963 self.pg_enable_capture(self.pg_interfaces)
1965 capture = self.pg8.get_capture(len(pkts))
1966 self.verify_capture_out(capture, self.nat_addr, True)
1968 def test_static_with_port_ipless_interfaces(self):
1969 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
1971 self.tcp_port_out = 30606
1972 self.udp_port_out = 30607
1973 self.icmp_id_out = 30608
1975 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1976 self.pg7.remote_mac,
1977 self.pg7.remote_ip4n,
1979 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1980 self.pg8.remote_mac,
1981 self.pg8.remote_ip4n,
1984 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1985 dst_address_length=32,
1986 next_hop_address=self.pg7.remote_ip4n,
1987 next_hop_sw_if_index=self.pg7.sw_if_index)
1988 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1989 dst_address_length=32,
1990 next_hop_address=self.pg8.remote_ip4n,
1991 next_hop_sw_if_index=self.pg8.sw_if_index)
1993 self.nat44_add_address(self.nat_addr)
1994 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1995 self.tcp_port_in, self.tcp_port_out,
1996 proto=IP_PROTOS.tcp)
1997 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1998 self.udp_port_in, self.udp_port_out,
1999 proto=IP_PROTOS.udp)
2000 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2001 self.icmp_id_in, self.icmp_id_out,
2002 proto=IP_PROTOS.icmp)
2003 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2004 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2008 pkts = self.create_stream_out(self.pg8)
2009 self.pg8.add_stream(pkts)
2010 self.pg_enable_capture(self.pg_interfaces)
2012 capture = self.pg7.get_capture(len(pkts))
2013 self.verify_capture_in(capture, self.pg7)
2016 pkts = self.create_stream_in(self.pg7, self.pg8)
2017 self.pg7.add_stream(pkts)
2018 self.pg_enable_capture(self.pg_interfaces)
2020 capture = self.pg8.get_capture(len(pkts))
2021 self.verify_capture_out(capture)
2023 def test_static_unknown_proto(self):
2024 """ 1:1 NAT translate packet with unknown protocol """
2025 nat_ip = "10.0.0.10"
2026 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2027 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2028 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2032 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2033 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2035 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2036 TCP(sport=1234, dport=1234))
2037 self.pg0.add_stream(p)
2038 self.pg_enable_capture(self.pg_interfaces)
2040 p = self.pg1.get_capture(1)
2043 self.assertEqual(packet[IP].src, nat_ip)
2044 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2045 self.assertTrue(packet.haslayer(GRE))
2046 self.check_ip_checksum(packet)
2048 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2052 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2053 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2055 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2056 TCP(sport=1234, dport=1234))
2057 self.pg1.add_stream(p)
2058 self.pg_enable_capture(self.pg_interfaces)
2060 p = self.pg0.get_capture(1)
2063 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2064 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2065 self.assertTrue(packet.haslayer(GRE))
2066 self.check_ip_checksum(packet)
2068 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2071 def test_hairpinning_static_unknown_proto(self):
2072 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2074 host = self.pg0.remote_hosts[0]
2075 server = self.pg0.remote_hosts[1]
2077 host_nat_ip = "10.0.0.10"
2078 server_nat_ip = "10.0.0.11"
2080 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2081 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2082 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2083 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2087 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2088 IP(src=host.ip4, dst=server_nat_ip) /
2090 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2091 TCP(sport=1234, dport=1234))
2092 self.pg0.add_stream(p)
2093 self.pg_enable_capture(self.pg_interfaces)
2095 p = self.pg0.get_capture(1)
2098 self.assertEqual(packet[IP].src, host_nat_ip)
2099 self.assertEqual(packet[IP].dst, server.ip4)
2100 self.assertTrue(packet.haslayer(GRE))
2101 self.check_ip_checksum(packet)
2103 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2107 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2108 IP(src=server.ip4, dst=host_nat_ip) /
2110 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2111 TCP(sport=1234, dport=1234))
2112 self.pg0.add_stream(p)
2113 self.pg_enable_capture(self.pg_interfaces)
2115 p = self.pg0.get_capture(1)
2118 self.assertEqual(packet[IP].src, server_nat_ip)
2119 self.assertEqual(packet[IP].dst, host.ip4)
2120 self.assertTrue(packet.haslayer(GRE))
2121 self.check_ip_checksum(packet)
2123 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2126 def test_unknown_proto(self):
2127 """ NAT44 translate packet with unknown protocol """
2128 self.nat44_add_address(self.nat_addr)
2129 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2130 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2134 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2135 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2136 TCP(sport=self.tcp_port_in, dport=20))
2137 self.pg0.add_stream(p)
2138 self.pg_enable_capture(self.pg_interfaces)
2140 p = self.pg1.get_capture(1)
2142 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2143 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2145 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2146 TCP(sport=1234, dport=1234))
2147 self.pg0.add_stream(p)
2148 self.pg_enable_capture(self.pg_interfaces)
2150 p = self.pg1.get_capture(1)
2153 self.assertEqual(packet[IP].src, self.nat_addr)
2154 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2155 self.assertTrue(packet.haslayer(GRE))
2156 self.check_ip_checksum(packet)
2158 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2162 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2163 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2165 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2166 TCP(sport=1234, dport=1234))
2167 self.pg1.add_stream(p)
2168 self.pg_enable_capture(self.pg_interfaces)
2170 p = self.pg0.get_capture(1)
2173 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2174 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2175 self.assertTrue(packet.haslayer(GRE))
2176 self.check_ip_checksum(packet)
2178 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2181 def test_hairpinning_unknown_proto(self):
2182 """ NAT44 translate packet with unknown protocol - hairpinning """
2183 host = self.pg0.remote_hosts[0]
2184 server = self.pg0.remote_hosts[1]
2187 server_in_port = 5678
2188 server_out_port = 8765
2189 server_nat_ip = "10.0.0.11"
2191 self.nat44_add_address(self.nat_addr)
2192 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2193 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2196 # add static mapping for server
2197 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2200 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2201 IP(src=host.ip4, dst=server_nat_ip) /
2202 TCP(sport=host_in_port, dport=server_out_port))
2203 self.pg0.add_stream(p)
2204 self.pg_enable_capture(self.pg_interfaces)
2206 capture = self.pg0.get_capture(1)
2208 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2209 IP(src=host.ip4, dst=server_nat_ip) /
2211 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2212 TCP(sport=1234, dport=1234))
2213 self.pg0.add_stream(p)
2214 self.pg_enable_capture(self.pg_interfaces)
2216 p = self.pg0.get_capture(1)
2219 self.assertEqual(packet[IP].src, self.nat_addr)
2220 self.assertEqual(packet[IP].dst, server.ip4)
2221 self.assertTrue(packet.haslayer(GRE))
2222 self.check_ip_checksum(packet)
2224 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2228 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2229 IP(src=server.ip4, dst=self.nat_addr) /
2231 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2232 TCP(sport=1234, dport=1234))
2233 self.pg0.add_stream(p)
2234 self.pg_enable_capture(self.pg_interfaces)
2236 p = self.pg0.get_capture(1)
2239 self.assertEqual(packet[IP].src, server_nat_ip)
2240 self.assertEqual(packet[IP].dst, host.ip4)
2241 self.assertTrue(packet.haslayer(GRE))
2242 self.check_ip_checksum(packet)
2244 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2247 def test_output_feature(self):
2248 """ NAT44 interface output feature (in2out postrouting) """
2249 self.nat44_add_address(self.nat_addr)
2250 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2251 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2252 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2256 pkts = self.create_stream_in(self.pg0, self.pg3)
2257 self.pg0.add_stream(pkts)
2258 self.pg_enable_capture(self.pg_interfaces)
2260 capture = self.pg3.get_capture(len(pkts))
2261 self.verify_capture_out(capture)
2264 pkts = self.create_stream_out(self.pg3)
2265 self.pg3.add_stream(pkts)
2266 self.pg_enable_capture(self.pg_interfaces)
2268 capture = self.pg0.get_capture(len(pkts))
2269 self.verify_capture_in(capture, self.pg0)
2271 # from non-NAT interface to NAT inside interface
2272 pkts = self.create_stream_in(self.pg2, self.pg0)
2273 self.pg2.add_stream(pkts)
2274 self.pg_enable_capture(self.pg_interfaces)
2276 capture = self.pg0.get_capture(len(pkts))
2277 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2279 def test_output_feature_vrf_aware(self):
2280 """ NAT44 interface output feature VRF aware (in2out postrouting) """
2281 nat_ip_vrf10 = "10.0.0.10"
2282 nat_ip_vrf20 = "10.0.0.20"
2284 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2285 dst_address_length=32,
2286 next_hop_address=self.pg3.remote_ip4n,
2287 next_hop_sw_if_index=self.pg3.sw_if_index,
2289 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2290 dst_address_length=32,
2291 next_hop_address=self.pg3.remote_ip4n,
2292 next_hop_sw_if_index=self.pg3.sw_if_index,
2295 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2296 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2297 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2298 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2299 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2303 pkts = self.create_stream_in(self.pg4, self.pg3)
2304 self.pg4.add_stream(pkts)
2305 self.pg_enable_capture(self.pg_interfaces)
2307 capture = self.pg3.get_capture(len(pkts))
2308 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2311 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2312 self.pg3.add_stream(pkts)
2313 self.pg_enable_capture(self.pg_interfaces)
2315 capture = self.pg4.get_capture(len(pkts))
2316 self.verify_capture_in(capture, self.pg4)
2319 pkts = self.create_stream_in(self.pg6, self.pg3)
2320 self.pg6.add_stream(pkts)
2321 self.pg_enable_capture(self.pg_interfaces)
2323 capture = self.pg3.get_capture(len(pkts))
2324 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2327 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2328 self.pg3.add_stream(pkts)
2329 self.pg_enable_capture(self.pg_interfaces)
2331 capture = self.pg6.get_capture(len(pkts))
2332 self.verify_capture_in(capture, self.pg6)
2334 def test_output_feature_hairpinning(self):
2335 """ NAT44 interface output feature hairpinning (in2out postrouting) """
2336 host = self.pg0.remote_hosts[0]
2337 server = self.pg0.remote_hosts[1]
2340 server_in_port = 5678
2341 server_out_port = 8765
2343 self.nat44_add_address(self.nat_addr)
2344 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2345 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2348 # add static mapping for server
2349 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2350 server_in_port, server_out_port,
2351 proto=IP_PROTOS.tcp)
2353 # send packet from host to server
2354 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2355 IP(src=host.ip4, dst=self.nat_addr) /
2356 TCP(sport=host_in_port, dport=server_out_port))
2357 self.pg0.add_stream(p)
2358 self.pg_enable_capture(self.pg_interfaces)
2360 capture = self.pg0.get_capture(1)
2365 self.assertEqual(ip.src, self.nat_addr)
2366 self.assertEqual(ip.dst, server.ip4)
2367 self.assertNotEqual(tcp.sport, host_in_port)
2368 self.assertEqual(tcp.dport, server_in_port)
2369 self.check_tcp_checksum(p)
2370 host_out_port = tcp.sport
2372 self.logger.error(ppp("Unexpected or invalid packet:", p))
2375 # send reply from server to host
2376 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2377 IP(src=server.ip4, dst=self.nat_addr) /
2378 TCP(sport=server_in_port, dport=host_out_port))
2379 self.pg0.add_stream(p)
2380 self.pg_enable_capture(self.pg_interfaces)
2382 capture = self.pg0.get_capture(1)
2387 self.assertEqual(ip.src, self.nat_addr)
2388 self.assertEqual(ip.dst, host.ip4)
2389 self.assertEqual(tcp.sport, server_out_port)
2390 self.assertEqual(tcp.dport, host_in_port)
2391 self.check_tcp_checksum(p)
2393 self.logger.error(ppp("Unexpected or invalid packet:"), p)
2396 def test_one_armed_nat44(self):
2397 """ One armed NAT44 """
2398 remote_host = self.pg9.remote_hosts[0]
2399 local_host = self.pg9.remote_hosts[1]
2402 self.nat44_add_address(self.nat_addr)
2403 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2404 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2408 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2409 IP(src=local_host.ip4, dst=remote_host.ip4) /
2410 TCP(sport=12345, dport=80))
2411 self.pg9.add_stream(p)
2412 self.pg_enable_capture(self.pg_interfaces)
2414 capture = self.pg9.get_capture(1)
2419 self.assertEqual(ip.src, self.nat_addr)
2420 self.assertEqual(ip.dst, remote_host.ip4)
2421 self.assertNotEqual(tcp.sport, 12345)
2422 external_port = tcp.sport
2423 self.assertEqual(tcp.dport, 80)
2424 self.check_tcp_checksum(p)
2425 self.check_ip_checksum(p)
2427 self.logger.error(ppp("Unexpected or invalid packet:", p))
2431 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2432 IP(src=remote_host.ip4, dst=self.nat_addr) /
2433 TCP(sport=80, dport=external_port))
2434 self.pg9.add_stream(p)
2435 self.pg_enable_capture(self.pg_interfaces)
2437 capture = self.pg9.get_capture(1)
2442 self.assertEqual(ip.src, remote_host.ip4)
2443 self.assertEqual(ip.dst, local_host.ip4)
2444 self.assertEqual(tcp.sport, 80)
2445 self.assertEqual(tcp.dport, 12345)
2446 self.check_tcp_checksum(p)
2447 self.check_ip_checksum(p)
2449 self.logger.error(ppp("Unexpected or invalid packet:", p))
2453 super(TestNAT44, self).tearDown()
2454 if not self.vpp_dead:
2455 self.logger.info(self.vapi.cli("show nat44 verbose"))
2459 class TestDeterministicNAT(MethodHolder):
2460 """ Deterministic NAT Test Cases """
2463 def setUpConstants(cls):
2464 super(TestDeterministicNAT, cls).setUpConstants()
2465 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
2468 def setUpClass(cls):
2469 super(TestDeterministicNAT, cls).setUpClass()
2472 cls.tcp_port_in = 6303
2473 cls.tcp_external_port = 6303
2474 cls.udp_port_in = 6304
2475 cls.udp_external_port = 6304
2476 cls.icmp_id_in = 6305
2477 cls.nat_addr = '10.0.0.3'
2479 cls.create_pg_interfaces(range(3))
2480 cls.interfaces = list(cls.pg_interfaces)
2482 for i in cls.interfaces:
2487 cls.pg0.generate_remote_hosts(2)
2488 cls.pg0.configure_ipv4_neighbors()
2491 super(TestDeterministicNAT, cls).tearDownClass()
2494 def create_stream_in(self, in_if, out_if, ttl=64):
2496 Create packet stream for inside network
2498 :param in_if: Inside interface
2499 :param out_if: Outside interface
2500 :param ttl: TTL of generated packets
2504 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2505 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2506 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
2510 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2511 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2512 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2516 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2517 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2518 ICMP(id=self.icmp_id_in, type='echo-request'))
2523 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2525 Create packet stream for outside network
2527 :param out_if: Outside interface
2528 :param dst_ip: Destination IP address (Default use global NAT address)
2529 :param ttl: TTL of generated packets
2532 dst_ip = self.nat_addr
2535 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2536 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2537 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2541 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2542 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2543 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2547 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2548 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2549 ICMP(id=self.icmp_external_id, type='echo-reply'))
2554 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2556 Verify captured packets on outside network
2558 :param capture: Captured packets
2559 :param nat_ip: Translated IP address (Default use global NAT address)
2560 :param same_port: Sorce port number is not translated (Default False)
2561 :param packet_num: Expected number of packets (Default 3)
2564 nat_ip = self.nat_addr
2565 self.assertEqual(packet_num, len(capture))
2566 for packet in capture:
2568 self.assertEqual(packet[IP].src, nat_ip)
2569 if packet.haslayer(TCP):
2570 self.tcp_port_out = packet[TCP].sport
2571 elif packet.haslayer(UDP):
2572 self.udp_port_out = packet[UDP].sport
2574 self.icmp_external_id = packet[ICMP].id
2576 self.logger.error(ppp("Unexpected or invalid packet "
2577 "(outside network):", packet))
2580 def initiate_tcp_session(self, in_if, out_if):
2582 Initiates TCP session
2584 :param in_if: Inside interface
2585 :param out_if: Outside interface
2588 # SYN packet in->out
2589 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2590 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2591 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2594 self.pg_enable_capture(self.pg_interfaces)
2596 capture = out_if.get_capture(1)
2598 self.tcp_port_out = p[TCP].sport
2600 # SYN + ACK packet out->in
2601 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2602 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
2603 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2605 out_if.add_stream(p)
2606 self.pg_enable_capture(self.pg_interfaces)
2608 in_if.get_capture(1)
2610 # ACK packet in->out
2611 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2612 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2613 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2616 self.pg_enable_capture(self.pg_interfaces)
2618 out_if.get_capture(1)
2621 self.logger.error("TCP 3 way handshake failed")
2624 def verify_ipfix_max_entries_per_user(self, data):
2626 Verify IPFIX maximum entries per user exceeded event
2628 :param data: Decoded IPFIX data records
2630 self.assertEqual(1, len(data))
2633 self.assertEqual(ord(record[230]), 13)
2634 # natQuotaExceededEvent
2635 self.assertEqual('\x03\x00\x00\x00', record[466])
2637 self.assertEqual(self.pg0.remote_ip4n, record[8])
2639 def test_deterministic_mode(self):
2640 """ NAT plugin run deterministic mode """
2641 in_addr = '172.16.255.0'
2642 out_addr = '172.17.255.50'
2643 in_addr_t = '172.16.255.20'
2644 in_addr_n = socket.inet_aton(in_addr)
2645 out_addr_n = socket.inet_aton(out_addr)
2646 in_addr_t_n = socket.inet_aton(in_addr_t)
2650 nat_config = self.vapi.nat_show_config()
2651 self.assertEqual(1, nat_config.deterministic)
2653 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
2655 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
2656 self.assertEqual(rep1.out_addr[:4], out_addr_n)
2657 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
2658 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2660 deterministic_mappings = self.vapi.nat_det_map_dump()
2661 self.assertEqual(len(deterministic_mappings), 1)
2662 dsm = deterministic_mappings[0]
2663 self.assertEqual(in_addr_n, dsm.in_addr[:4])
2664 self.assertEqual(in_plen, dsm.in_plen)
2665 self.assertEqual(out_addr_n, dsm.out_addr[:4])
2666 self.assertEqual(out_plen, dsm.out_plen)
2668 self.clear_nat_det()
2669 deterministic_mappings = self.vapi.nat_det_map_dump()
2670 self.assertEqual(len(deterministic_mappings), 0)
2672 def test_set_timeouts(self):
2673 """ Set deterministic NAT timeouts """
2674 timeouts_before = self.vapi.nat_det_get_timeouts()
2676 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
2677 timeouts_before.tcp_established + 10,
2678 timeouts_before.tcp_transitory + 10,
2679 timeouts_before.icmp + 10)
2681 timeouts_after = self.vapi.nat_det_get_timeouts()
2683 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
2684 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
2685 self.assertNotEqual(timeouts_before.tcp_established,
2686 timeouts_after.tcp_established)
2687 self.assertNotEqual(timeouts_before.tcp_transitory,
2688 timeouts_after.tcp_transitory)
2690 def test_det_in(self):
2691 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
2693 nat_ip = "10.0.0.10"
2695 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2697 socket.inet_aton(nat_ip),
2699 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2700 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2704 pkts = self.create_stream_in(self.pg0, self.pg1)
2705 self.pg0.add_stream(pkts)
2706 self.pg_enable_capture(self.pg_interfaces)
2708 capture = self.pg1.get_capture(len(pkts))
2709 self.verify_capture_out(capture, nat_ip)
2712 pkts = self.create_stream_out(self.pg1, nat_ip)
2713 self.pg1.add_stream(pkts)
2714 self.pg_enable_capture(self.pg_interfaces)
2716 capture = self.pg0.get_capture(len(pkts))
2717 self.verify_capture_in(capture, self.pg0)
2720 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
2721 self.assertEqual(len(sessions), 3)
2725 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2726 self.assertEqual(s.in_port, self.tcp_port_in)
2727 self.assertEqual(s.out_port, self.tcp_port_out)
2728 self.assertEqual(s.ext_port, self.tcp_external_port)
2732 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2733 self.assertEqual(s.in_port, self.udp_port_in)
2734 self.assertEqual(s.out_port, self.udp_port_out)
2735 self.assertEqual(s.ext_port, self.udp_external_port)
2739 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2740 self.assertEqual(s.in_port, self.icmp_id_in)
2741 self.assertEqual(s.out_port, self.icmp_external_id)
2743 def test_multiple_users(self):
2744 """ Deterministic NAT multiple users """
2746 nat_ip = "10.0.0.10"
2748 external_port = 6303
2750 host0 = self.pg0.remote_hosts[0]
2751 host1 = self.pg0.remote_hosts[1]
2753 self.vapi.nat_det_add_del_map(host0.ip4n,
2755 socket.inet_aton(nat_ip),
2757 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2758 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2762 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2763 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2764 TCP(sport=port_in, dport=external_port))
2765 self.pg0.add_stream(p)
2766 self.pg_enable_capture(self.pg_interfaces)
2768 capture = self.pg1.get_capture(1)
2773 self.assertEqual(ip.src, nat_ip)
2774 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2775 self.assertEqual(tcp.dport, external_port)
2776 port_out0 = tcp.sport
2778 self.logger.error(ppp("Unexpected or invalid packet:", p))
2782 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2783 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2784 TCP(sport=port_in, dport=external_port))
2785 self.pg0.add_stream(p)
2786 self.pg_enable_capture(self.pg_interfaces)
2788 capture = self.pg1.get_capture(1)
2793 self.assertEqual(ip.src, nat_ip)
2794 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2795 self.assertEqual(tcp.dport, external_port)
2796 port_out1 = tcp.sport
2798 self.logger.error(ppp("Unexpected or invalid packet:", p))
2801 dms = self.vapi.nat_det_map_dump()
2802 self.assertEqual(1, len(dms))
2803 self.assertEqual(2, dms[0].ses_num)
2806 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2807 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2808 TCP(sport=external_port, dport=port_out0))
2809 self.pg1.add_stream(p)
2810 self.pg_enable_capture(self.pg_interfaces)
2812 capture = self.pg0.get_capture(1)
2817 self.assertEqual(ip.src, self.pg1.remote_ip4)
2818 self.assertEqual(ip.dst, host0.ip4)
2819 self.assertEqual(tcp.dport, port_in)
2820 self.assertEqual(tcp.sport, external_port)
2822 self.logger.error(ppp("Unexpected or invalid packet:", p))
2826 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2827 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2828 TCP(sport=external_port, dport=port_out1))
2829 self.pg1.add_stream(p)
2830 self.pg_enable_capture(self.pg_interfaces)
2832 capture = self.pg0.get_capture(1)
2837 self.assertEqual(ip.src, self.pg1.remote_ip4)
2838 self.assertEqual(ip.dst, host1.ip4)
2839 self.assertEqual(tcp.dport, port_in)
2840 self.assertEqual(tcp.sport, external_port)
2842 self.logger.error(ppp("Unexpected or invalid packet", p))
2845 # session close api test
2846 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
2848 self.pg1.remote_ip4n,
2850 dms = self.vapi.nat_det_map_dump()
2851 self.assertEqual(dms[0].ses_num, 1)
2853 self.vapi.nat_det_close_session_in(host0.ip4n,
2855 self.pg1.remote_ip4n,
2857 dms = self.vapi.nat_det_map_dump()
2858 self.assertEqual(dms[0].ses_num, 0)
2860 def test_tcp_session_close_detection_in(self):
2861 """ Deterministic NAT TCP session close from inside network """
2862 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2864 socket.inet_aton(self.nat_addr),
2866 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2867 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2870 self.initiate_tcp_session(self.pg0, self.pg1)
2872 # close the session from inside
2874 # FIN packet in -> out
2875 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2876 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2877 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2879 self.pg0.add_stream(p)
2880 self.pg_enable_capture(self.pg_interfaces)
2882 self.pg1.get_capture(1)
2886 # ACK packet out -> in
2887 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2888 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2889 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2893 # FIN packet out -> in
2894 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2895 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2896 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2900 self.pg1.add_stream(pkts)
2901 self.pg_enable_capture(self.pg_interfaces)
2903 self.pg0.get_capture(2)
2905 # ACK packet in -> out
2906 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2907 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2908 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2910 self.pg0.add_stream(p)
2911 self.pg_enable_capture(self.pg_interfaces)
2913 self.pg1.get_capture(1)
2915 # Check if deterministic NAT44 closed the session
2916 dms = self.vapi.nat_det_map_dump()
2917 self.assertEqual(0, dms[0].ses_num)
2919 self.logger.error("TCP session termination failed")
2922 def test_tcp_session_close_detection_out(self):
2923 """ Deterministic NAT TCP session close from outside network """
2924 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2926 socket.inet_aton(self.nat_addr),
2928 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2929 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2932 self.initiate_tcp_session(self.pg0, self.pg1)
2934 # close the session from outside
2936 # FIN packet out -> in
2937 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2938 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2939 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2941 self.pg1.add_stream(p)
2942 self.pg_enable_capture(self.pg_interfaces)
2944 self.pg0.get_capture(1)
2948 # ACK packet in -> out
2949 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2950 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2951 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2955 # ACK packet in -> out
2956 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2957 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2958 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2962 self.pg0.add_stream(pkts)
2963 self.pg_enable_capture(self.pg_interfaces)
2965 self.pg1.get_capture(2)
2967 # ACK packet out -> in
2968 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2969 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2970 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2972 self.pg1.add_stream(p)
2973 self.pg_enable_capture(self.pg_interfaces)
2975 self.pg0.get_capture(1)
2977 # Check if deterministic NAT44 closed the session
2978 dms = self.vapi.nat_det_map_dump()
2979 self.assertEqual(0, dms[0].ses_num)
2981 self.logger.error("TCP session termination failed")
2984 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2985 def test_session_timeout(self):
2986 """ Deterministic NAT session timeouts """
2987 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2989 socket.inet_aton(self.nat_addr),
2991 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2992 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2995 self.initiate_tcp_session(self.pg0, self.pg1)
2996 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
2997 pkts = self.create_stream_in(self.pg0, self.pg1)
2998 self.pg0.add_stream(pkts)
2999 self.pg_enable_capture(self.pg_interfaces)
3001 capture = self.pg1.get_capture(len(pkts))
3004 dms = self.vapi.nat_det_map_dump()
3005 self.assertEqual(0, dms[0].ses_num)
3007 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3008 def test_session_limit_per_user(self):
3009 """ Deterministic NAT maximum sessions per user limit """
3010 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3012 socket.inet_aton(self.nat_addr),
3014 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3015 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3017 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
3018 src_address=self.pg2.local_ip4n,
3020 template_interval=10)
3021 self.vapi.nat_ipfix()
3024 for port in range(1025, 2025):
3025 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3026 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3027 UDP(sport=port, dport=port))
3030 self.pg0.add_stream(pkts)
3031 self.pg_enable_capture(self.pg_interfaces)
3033 capture = self.pg1.get_capture(len(pkts))
3035 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3036 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3037 UDP(sport=3001, dport=3002))
3038 self.pg0.add_stream(p)
3039 self.pg_enable_capture(self.pg_interfaces)
3041 capture = self.pg1.assert_nothing_captured()
3043 # verify ICMP error packet
3044 capture = self.pg0.get_capture(1)
3046 self.assertTrue(p.haslayer(ICMP))
3048 self.assertEqual(icmp.type, 3)
3049 self.assertEqual(icmp.code, 1)
3050 self.assertTrue(icmp.haslayer(IPerror))
3051 inner_ip = icmp[IPerror]
3052 self.assertEqual(inner_ip[UDPerror].sport, 3001)
3053 self.assertEqual(inner_ip[UDPerror].dport, 3002)
3055 dms = self.vapi.nat_det_map_dump()
3057 self.assertEqual(1000, dms[0].ses_num)
3059 # verify IPFIX logging
3060 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3062 capture = self.pg2.get_capture(2)
3063 ipfix = IPFIXDecoder()
3064 # first load template
3066 self.assertTrue(p.haslayer(IPFIX))
3067 if p.haslayer(Template):
3068 ipfix.add_template(p.getlayer(Template))
3069 # verify events in data set
3071 if p.haslayer(Data):
3072 data = ipfix.decode_data_set(p.getlayer(Set))
3073 self.verify_ipfix_max_entries_per_user(data)
3075 def clear_nat_det(self):
3077 Clear deterministic NAT configuration.
3079 self.vapi.nat_ipfix(enable=0)
3080 self.vapi.nat_det_set_timeouts()
3081 deterministic_mappings = self.vapi.nat_det_map_dump()
3082 for dsm in deterministic_mappings:
3083 self.vapi.nat_det_add_del_map(dsm.in_addr,
3089 interfaces = self.vapi.nat44_interface_dump()
3090 for intf in interfaces:
3091 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
3096 super(TestDeterministicNAT, self).tearDown()
3097 if not self.vpp_dead:
3098 self.logger.info(self.vapi.cli("show nat44 detail"))
3099 self.clear_nat_det()
3102 class TestNAT64(MethodHolder):
3103 """ NAT64 Test Cases """
3106 def setUpClass(cls):
3107 super(TestNAT64, cls).setUpClass()
3110 cls.tcp_port_in = 6303
3111 cls.tcp_port_out = 6303
3112 cls.udp_port_in = 6304
3113 cls.udp_port_out = 6304
3114 cls.icmp_id_in = 6305
3115 cls.icmp_id_out = 6305
3116 cls.nat_addr = '10.0.0.3'
3117 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3119 cls.vrf1_nat_addr = '10.0.10.3'
3120 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
3123 cls.create_pg_interfaces(range(4))
3124 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
3125 cls.ip6_interfaces.append(cls.pg_interfaces[2])
3126 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
3128 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
3130 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
3132 cls.pg0.generate_remote_hosts(2)
3134 for i in cls.ip6_interfaces:
3137 i.configure_ipv6_neighbors()
3139 for i in cls.ip4_interfaces:
3145 cls.pg3.config_ip4()
3146 cls.pg3.resolve_arp()
3147 cls.pg3.config_ip6()
3148 cls.pg3.configure_ipv6_neighbors()
3151 super(TestNAT64, cls).tearDownClass()
3154 def test_pool(self):
3155 """ Add/delete address to NAT64 pool """
3156 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
3158 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
3160 addresses = self.vapi.nat64_pool_addr_dump()
3161 self.assertEqual(len(addresses), 1)
3162 self.assertEqual(addresses[0].address, nat_addr)
3164 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
3166 addresses = self.vapi.nat64_pool_addr_dump()
3167 self.assertEqual(len(addresses), 0)
3169 def test_interface(self):
3170 """ Enable/disable NAT64 feature on the interface """
3171 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3172 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3174 interfaces = self.vapi.nat64_interface_dump()
3175 self.assertEqual(len(interfaces), 2)
3178 for intf in interfaces:
3179 if intf.sw_if_index == self.pg0.sw_if_index:
3180 self.assertEqual(intf.is_inside, 1)
3182 elif intf.sw_if_index == self.pg1.sw_if_index:
3183 self.assertEqual(intf.is_inside, 0)
3185 self.assertTrue(pg0_found)
3186 self.assertTrue(pg1_found)
3188 features = self.vapi.cli("show interface features pg0")
3189 self.assertNotEqual(features.find('nat64-in2out'), -1)
3190 features = self.vapi.cli("show interface features pg1")
3191 self.assertNotEqual(features.find('nat64-out2in'), -1)
3193 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
3194 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
3196 interfaces = self.vapi.nat64_interface_dump()
3197 self.assertEqual(len(interfaces), 0)
3199 def test_static_bib(self):
3200 """ Add/delete static BIB entry """
3201 in_addr = socket.inet_pton(socket.AF_INET6,
3202 '2001:db8:85a3::8a2e:370:7334')
3203 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
3206 proto = IP_PROTOS.tcp
3208 self.vapi.nat64_add_del_static_bib(in_addr,
3213 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3218 self.assertEqual(bibe.i_addr, in_addr)
3219 self.assertEqual(bibe.o_addr, out_addr)
3220 self.assertEqual(bibe.i_port, in_port)
3221 self.assertEqual(bibe.o_port, out_port)
3222 self.assertEqual(static_bib_num, 1)
3224 self.vapi.nat64_add_del_static_bib(in_addr,
3230 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3235 self.assertEqual(static_bib_num, 0)
3237 def test_set_timeouts(self):
3238 """ Set NAT64 timeouts """
3239 # verify default values
3240 timeouts = self.vapi.nat64_get_timeouts()
3241 self.assertEqual(timeouts.udp, 300)
3242 self.assertEqual(timeouts.icmp, 60)
3243 self.assertEqual(timeouts.tcp_trans, 240)
3244 self.assertEqual(timeouts.tcp_est, 7440)
3245 self.assertEqual(timeouts.tcp_incoming_syn, 6)
3247 # set and verify custom values
3248 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
3249 tcp_est=7450, tcp_incoming_syn=10)
3250 timeouts = self.vapi.nat64_get_timeouts()
3251 self.assertEqual(timeouts.udp, 200)
3252 self.assertEqual(timeouts.icmp, 30)
3253 self.assertEqual(timeouts.tcp_trans, 250)
3254 self.assertEqual(timeouts.tcp_est, 7450)
3255 self.assertEqual(timeouts.tcp_incoming_syn, 10)
3257 def test_dynamic(self):
3258 """ NAT64 dynamic translation test """
3259 self.tcp_port_in = 6303
3260 self.udp_port_in = 6304
3261 self.icmp_id_in = 6305
3263 ses_num_start = self.nat64_get_ses_num()
3265 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3267 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3268 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3271 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3272 self.pg0.add_stream(pkts)
3273 self.pg_enable_capture(self.pg_interfaces)
3275 capture = self.pg1.get_capture(len(pkts))
3276 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3277 dst_ip=self.pg1.remote_ip4)
3280 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3281 self.pg1.add_stream(pkts)
3282 self.pg_enable_capture(self.pg_interfaces)
3284 capture = self.pg0.get_capture(len(pkts))
3285 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3286 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3289 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3290 self.pg0.add_stream(pkts)
3291 self.pg_enable_capture(self.pg_interfaces)
3293 capture = self.pg1.get_capture(len(pkts))
3294 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3295 dst_ip=self.pg1.remote_ip4)
3298 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3299 self.pg1.add_stream(pkts)
3300 self.pg_enable_capture(self.pg_interfaces)
3302 capture = self.pg0.get_capture(len(pkts))
3303 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3305 ses_num_end = self.nat64_get_ses_num()
3307 self.assertEqual(ses_num_end - ses_num_start, 3)
3309 # tenant with specific VRF
3310 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3311 self.vrf1_nat_addr_n,
3312 vrf_id=self.vrf1_id)
3313 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3315 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
3316 self.pg2.add_stream(pkts)
3317 self.pg_enable_capture(self.pg_interfaces)
3319 capture = self.pg1.get_capture(len(pkts))
3320 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3321 dst_ip=self.pg1.remote_ip4)
3323 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3324 self.pg1.add_stream(pkts)
3325 self.pg_enable_capture(self.pg_interfaces)
3327 capture = self.pg2.get_capture(len(pkts))
3328 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
3330 def test_static(self):
3331 """ NAT64 static translation test """
3332 self.tcp_port_in = 60303
3333 self.udp_port_in = 60304
3334 self.icmp_id_in = 60305
3335 self.tcp_port_out = 60303
3336 self.udp_port_out = 60304
3337 self.icmp_id_out = 60305
3339 ses_num_start = self.nat64_get_ses_num()
3341 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3343 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3344 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3346 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3351 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3356 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3363 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3364 self.pg0.add_stream(pkts)
3365 self.pg_enable_capture(self.pg_interfaces)
3367 capture = self.pg1.get_capture(len(pkts))
3368 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3369 dst_ip=self.pg1.remote_ip4, same_port=True)
3372 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3373 self.pg1.add_stream(pkts)
3374 self.pg_enable_capture(self.pg_interfaces)
3376 capture = self.pg0.get_capture(len(pkts))
3377 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3378 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3380 ses_num_end = self.nat64_get_ses_num()
3382 self.assertEqual(ses_num_end - ses_num_start, 3)
3384 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3385 def test_session_timeout(self):
3386 """ NAT64 session timeout """
3387 self.icmp_id_in = 1234
3388 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3390 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3391 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3392 self.vapi.nat64_set_timeouts(icmp=5)
3394 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3395 self.pg0.add_stream(pkts)
3396 self.pg_enable_capture(self.pg_interfaces)
3398 capture = self.pg1.get_capture(len(pkts))
3400 ses_num_before_timeout = self.nat64_get_ses_num()
3404 # ICMP session after timeout
3405 ses_num_after_timeout = self.nat64_get_ses_num()
3406 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
3408 def test_icmp_error(self):
3409 """ NAT64 ICMP Error message translation """
3410 self.tcp_port_in = 6303
3411 self.udp_port_in = 6304
3412 self.icmp_id_in = 6305
3414 ses_num_start = self.nat64_get_ses_num()
3416 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3418 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3419 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3421 # send some packets to create sessions
3422 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3423 self.pg0.add_stream(pkts)
3424 self.pg_enable_capture(self.pg_interfaces)
3426 capture_ip4 = self.pg1.get_capture(len(pkts))
3427 self.verify_capture_out(capture_ip4,
3428 nat_ip=self.nat_addr,
3429 dst_ip=self.pg1.remote_ip4)
3431 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3432 self.pg1.add_stream(pkts)
3433 self.pg_enable_capture(self.pg_interfaces)
3435 capture_ip6 = self.pg0.get_capture(len(pkts))
3436 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3437 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
3438 self.pg0.remote_ip6)
3441 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3442 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
3443 ICMPv6DestUnreach(code=1) /
3444 packet[IPv6] for packet in capture_ip6]
3445 self.pg0.add_stream(pkts)
3446 self.pg_enable_capture(self.pg_interfaces)
3448 capture = self.pg1.get_capture(len(pkts))
3449 for packet in capture:
3451 self.assertEqual(packet[IP].src, self.nat_addr)
3452 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3453 self.assertEqual(packet[ICMP].type, 3)
3454 self.assertEqual(packet[ICMP].code, 13)
3455 inner = packet[IPerror]
3456 self.assertEqual(inner.src, self.pg1.remote_ip4)
3457 self.assertEqual(inner.dst, self.nat_addr)
3458 self.check_icmp_checksum(packet)
3459 if inner.haslayer(TCPerror):
3460 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
3461 elif inner.haslayer(UDPerror):
3462 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
3464 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
3466 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3470 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3471 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3472 ICMP(type=3, code=13) /
3473 packet[IP] for packet in capture_ip4]
3474 self.pg1.add_stream(pkts)
3475 self.pg_enable_capture(self.pg_interfaces)
3477 capture = self.pg0.get_capture(len(pkts))
3478 for packet in capture:
3480 self.assertEqual(packet[IPv6].src, ip.src)
3481 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3482 icmp = packet[ICMPv6DestUnreach]
3483 self.assertEqual(icmp.code, 1)
3484 inner = icmp[IPerror6]
3485 self.assertEqual(inner.src, self.pg0.remote_ip6)
3486 self.assertEqual(inner.dst, ip.src)
3487 self.check_icmpv6_checksum(packet)
3488 if inner.haslayer(TCPerror):
3489 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
3490 elif inner.haslayer(UDPerror):
3491 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
3493 self.assertEqual(inner[ICMPv6EchoRequest].id,
3496 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3499 def test_hairpinning(self):
3500 """ NAT64 hairpinning """
3502 client = self.pg0.remote_hosts[0]
3503 server = self.pg0.remote_hosts[1]
3504 server_tcp_in_port = 22
3505 server_tcp_out_port = 4022
3506 server_udp_in_port = 23
3507 server_udp_out_port = 4023
3508 client_tcp_in_port = 1234
3509 client_udp_in_port = 1235
3510 client_tcp_out_port = 0
3511 client_udp_out_port = 0
3512 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
3513 nat_addr_ip6 = ip.src
3515 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3517 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3518 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3520 self.vapi.nat64_add_del_static_bib(server.ip6n,
3523 server_tcp_out_port,
3525 self.vapi.nat64_add_del_static_bib(server.ip6n,
3528 server_udp_out_port,
3533 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3534 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3535 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3537 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3538 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3539 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3541 self.pg0.add_stream(pkts)
3542 self.pg_enable_capture(self.pg_interfaces)
3544 capture = self.pg0.get_capture(len(pkts))
3545 for packet in capture:
3547 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3548 self.assertEqual(packet[IPv6].dst, server.ip6)
3549 if packet.haslayer(TCP):
3550 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3551 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3552 self.check_tcp_checksum(packet)
3553 client_tcp_out_port = packet[TCP].sport
3555 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3556 self.assertEqual(packet[UDP].dport, server_udp_in_port)
3557 self.check_udp_checksum(packet)
3558 client_udp_out_port = packet[UDP].sport
3560 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3565 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3566 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3567 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3569 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3570 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3571 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3573 self.pg0.add_stream(pkts)
3574 self.pg_enable_capture(self.pg_interfaces)
3576 capture = self.pg0.get_capture(len(pkts))
3577 for packet in capture:
3579 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3580 self.assertEqual(packet[IPv6].dst, client.ip6)
3581 if packet.haslayer(TCP):
3582 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3583 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3584 self.check_tcp_checksum(packet)
3586 self.assertEqual(packet[UDP].sport, server_udp_out_port)
3587 self.assertEqual(packet[UDP].dport, client_udp_in_port)
3588 self.check_udp_checksum(packet)
3590 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3595 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3596 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3597 ICMPv6DestUnreach(code=1) /
3598 packet[IPv6] for packet in capture]
3599 self.pg0.add_stream(pkts)
3600 self.pg_enable_capture(self.pg_interfaces)
3602 capture = self.pg0.get_capture(len(pkts))
3603 for packet in capture:
3605 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3606 self.assertEqual(packet[IPv6].dst, server.ip6)
3607 icmp = packet[ICMPv6DestUnreach]
3608 self.assertEqual(icmp.code, 1)
3609 inner = icmp[IPerror6]
3610 self.assertEqual(inner.src, server.ip6)
3611 self.assertEqual(inner.dst, nat_addr_ip6)
3612 self.check_icmpv6_checksum(packet)
3613 if inner.haslayer(TCPerror):
3614 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3615 self.assertEqual(inner[TCPerror].dport,
3616 client_tcp_out_port)
3618 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3619 self.assertEqual(inner[UDPerror].dport,
3620 client_udp_out_port)
3622 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3625 def test_prefix(self):
3626 """ NAT64 Network-Specific Prefix """
3628 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3630 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3631 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3632 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3633 self.vrf1_nat_addr_n,
3634 vrf_id=self.vrf1_id)
3635 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3638 global_pref64 = "2001:db8::"
3639 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
3640 global_pref64_len = 32
3641 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
3643 prefix = self.vapi.nat64_prefix_dump()
3644 self.assertEqual(len(prefix), 1)
3645 self.assertEqual(prefix[0].prefix, global_pref64_n)
3646 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
3647 self.assertEqual(prefix[0].vrf_id, 0)
3649 # Add tenant specific prefix
3650 vrf1_pref64 = "2001:db8:122:300::"
3651 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
3652 vrf1_pref64_len = 56
3653 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
3655 vrf_id=self.vrf1_id)
3656 prefix = self.vapi.nat64_prefix_dump()
3657 self.assertEqual(len(prefix), 2)
3660 pkts = self.create_stream_in_ip6(self.pg0,
3663 plen=global_pref64_len)
3664 self.pg0.add_stream(pkts)
3665 self.pg_enable_capture(self.pg_interfaces)
3667 capture = self.pg1.get_capture(len(pkts))
3668 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3669 dst_ip=self.pg1.remote_ip4)
3671 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3672 self.pg1.add_stream(pkts)
3673 self.pg_enable_capture(self.pg_interfaces)
3675 capture = self.pg0.get_capture(len(pkts))
3676 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3679 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
3681 # Tenant specific prefix
3682 pkts = self.create_stream_in_ip6(self.pg2,
3685 plen=vrf1_pref64_len)
3686 self.pg2.add_stream(pkts)
3687 self.pg_enable_capture(self.pg_interfaces)
3689 capture = self.pg1.get_capture(len(pkts))
3690 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3691 dst_ip=self.pg1.remote_ip4)
3693 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3694 self.pg1.add_stream(pkts)
3695 self.pg_enable_capture(self.pg_interfaces)
3697 capture = self.pg2.get_capture(len(pkts))
3698 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3701 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
3703 def test_unknown_proto(self):
3704 """ NAT64 translate packet with unknown protocol """
3706 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3708 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3709 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3710 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
3713 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3714 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
3715 TCP(sport=self.tcp_port_in, dport=20))
3716 self.pg0.add_stream(p)
3717 self.pg_enable_capture(self.pg_interfaces)
3719 p = self.pg1.get_capture(1)
3721 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3722 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
3724 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3725 TCP(sport=1234, dport=1234))
3726 self.pg0.add_stream(p)
3727 self.pg_enable_capture(self.pg_interfaces)
3729 p = self.pg1.get_capture(1)
3732 self.assertEqual(packet[IP].src, self.nat_addr)
3733 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3734 self.assertTrue(packet.haslayer(GRE))
3735 self.check_ip_checksum(packet)
3737 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3741 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3742 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3744 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3745 TCP(sport=1234, dport=1234))
3746 self.pg1.add_stream(p)
3747 self.pg_enable_capture(self.pg_interfaces)
3749 p = self.pg0.get_capture(1)
3752 self.assertEqual(packet[IPv6].src, remote_ip6)
3753 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3754 self.assertEqual(packet[IPv6].nh, 47)
3756 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3759 def test_hairpinning_unknown_proto(self):
3760 """ NAT64 translate packet with unknown protocol - hairpinning """
3762 client = self.pg0.remote_hosts[0]
3763 server = self.pg0.remote_hosts[1]
3764 server_tcp_in_port = 22
3765 server_tcp_out_port = 4022
3766 client_tcp_in_port = 1234
3767 client_tcp_out_port = 1235
3768 server_nat_ip = "10.0.0.100"
3769 client_nat_ip = "10.0.0.110"
3770 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
3771 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
3772 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
3773 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
3775 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
3777 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3778 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3780 self.vapi.nat64_add_del_static_bib(server.ip6n,
3783 server_tcp_out_port,
3786 self.vapi.nat64_add_del_static_bib(server.ip6n,
3792 self.vapi.nat64_add_del_static_bib(client.ip6n,
3795 client_tcp_out_port,
3799 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3800 IPv6(src=client.ip6, dst=server_nat_ip6) /
3801 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3802 self.pg0.add_stream(p)
3803 self.pg_enable_capture(self.pg_interfaces)
3805 p = self.pg0.get_capture(1)
3807 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3808 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
3810 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3811 TCP(sport=1234, dport=1234))
3812 self.pg0.add_stream(p)
3813 self.pg_enable_capture(self.pg_interfaces)
3815 p = self.pg0.get_capture(1)
3818 self.assertEqual(packet[IPv6].src, client_nat_ip6)
3819 self.assertEqual(packet[IPv6].dst, server.ip6)
3820 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3822 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3826 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3827 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
3829 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3830 TCP(sport=1234, dport=1234))
3831 self.pg0.add_stream(p)
3832 self.pg_enable_capture(self.pg_interfaces)
3834 p = self.pg0.get_capture(1)
3837 self.assertEqual(packet[IPv6].src, server_nat_ip6)
3838 self.assertEqual(packet[IPv6].dst, client.ip6)
3839 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3841 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3844 def test_one_armed_nat64(self):
3845 """ One armed NAT64 """
3847 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
3851 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3853 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
3854 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
3857 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
3858 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
3859 TCP(sport=12345, dport=80))
3860 self.pg3.add_stream(p)
3861 self.pg_enable_capture(self.pg_interfaces)
3863 capture = self.pg3.get_capture(1)
3868 self.assertEqual(ip.src, self.nat_addr)
3869 self.assertEqual(ip.dst, self.pg3.remote_ip4)
3870 self.assertNotEqual(tcp.sport, 12345)
3871 external_port = tcp.sport
3872 self.assertEqual(tcp.dport, 80)
3873 self.check_tcp_checksum(p)
3874 self.check_ip_checksum(p)
3876 self.logger.error(ppp("Unexpected or invalid packet:", p))
3880 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
3881 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
3882 TCP(sport=80, dport=external_port))
3883 self.pg3.add_stream(p)
3884 self.pg_enable_capture(self.pg_interfaces)
3886 capture = self.pg3.get_capture(1)
3891 self.assertEqual(ip.src, remote_host_ip6)
3892 self.assertEqual(ip.dst, self.pg3.remote_ip6)
3893 self.assertEqual(tcp.sport, 80)
3894 self.assertEqual(tcp.dport, 12345)
3895 self.check_tcp_checksum(p)
3897 self.logger.error(ppp("Unexpected or invalid packet:", p))
3900 def nat64_get_ses_num(self):
3902 Return number of active NAT64 sessions.
3904 st = self.vapi.nat64_st_dump()
3907 def clear_nat64(self):
3909 Clear NAT64 configuration.
3911 self.vapi.nat64_set_timeouts()
3913 interfaces = self.vapi.nat64_interface_dump()
3914 for intf in interfaces:
3915 if intf.is_inside > 1:
3916 self.vapi.nat64_add_del_interface(intf.sw_if_index,
3919 self.vapi.nat64_add_del_interface(intf.sw_if_index,
3923 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3926 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3934 bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
3937 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3945 bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
3948 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3956 adresses = self.vapi.nat64_pool_addr_dump()
3957 for addr in adresses:
3958 self.vapi.nat64_add_del_pool_addr_range(addr.address,
3963 prefixes = self.vapi.nat64_prefix_dump()
3964 for prefix in prefixes:
3965 self.vapi.nat64_add_del_prefix(prefix.prefix,
3967 vrf_id=prefix.vrf_id,
3971 super(TestNAT64, self).tearDown()
3972 if not self.vpp_dead:
3973 self.logger.info(self.vapi.cli("show nat64 pool"))
3974 self.logger.info(self.vapi.cli("show nat64 interfaces"))
3975 self.logger.info(self.vapi.cli("show nat64 prefix"))
3976 self.logger.info(self.vapi.cli("show nat64 bib all"))
3977 self.logger.info(self.vapi.cli("show nat64 session table all"))
3980 if __name__ == '__main__':
3981 unittest.main(testRunner=VppTestRunner)