7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
12 from scapy.layers.l2 import Ether, ARP, GRE
13 from scapy.data import IP_PROTOS
14 from scapy.packet import bind_layers
16 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
17 from time import sleep
20 class MethodHolder(VppTestCase):
21 """ NAT create capture and verify method holder """
25 super(MethodHolder, cls).setUpClass()
28 super(MethodHolder, self).tearDown()
30 def check_ip_checksum(self, pkt):
32 Check IP checksum of the packet
34 :param pkt: Packet to check IP checksum
36 new = pkt.__class__(str(pkt))
38 new = new.__class__(str(new))
39 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
41 def check_tcp_checksum(self, pkt):
43 Check TCP checksum in IP packet
45 :param pkt: Packet to check TCP checksum
47 new = pkt.__class__(str(pkt))
49 new = new.__class__(str(new))
50 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
52 def check_udp_checksum(self, pkt):
54 Check UDP checksum in IP packet
56 :param pkt: Packet to check UDP checksum
58 new = pkt.__class__(str(pkt))
60 new = new.__class__(str(new))
61 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
63 def check_icmp_errror_embedded(self, pkt):
65 Check ICMP error embeded packet checksum
67 :param pkt: Packet to check ICMP error embeded packet checksum
69 if pkt.haslayer(IPerror):
70 new = pkt.__class__(str(pkt))
71 del new['IPerror'].chksum
72 new = new.__class__(str(new))
73 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
75 if pkt.haslayer(TCPerror):
76 new = pkt.__class__(str(pkt))
77 del new['TCPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
81 if pkt.haslayer(UDPerror):
82 if pkt['UDPerror'].chksum != 0:
83 new = pkt.__class__(str(pkt))
84 del new['UDPerror'].chksum
85 new = new.__class__(str(new))
86 self.assertEqual(new['UDPerror'].chksum,
87 pkt['UDPerror'].chksum)
89 if pkt.haslayer(ICMPerror):
90 del new['ICMPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
94 def check_icmp_checksum(self, pkt):
96 Check ICMP checksum in IPv4 packet
98 :param pkt: Packet to check ICMP checksum
100 new = pkt.__class__(str(pkt))
101 del new['ICMP'].chksum
102 new = new.__class__(str(new))
103 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
104 if pkt.haslayer(IPerror):
105 self.check_icmp_errror_embedded(pkt)
107 def check_icmpv6_checksum(self, pkt):
109 Check ICMPv6 checksum in IPv4 packet
111 :param pkt: Packet to check ICMPv6 checksum
113 new = pkt.__class__(str(pkt))
114 if pkt.haslayer(ICMPv6DestUnreach):
115 del new['ICMPv6DestUnreach'].cksum
116 new = new.__class__(str(new))
117 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
118 pkt['ICMPv6DestUnreach'].cksum)
119 self.check_icmp_errror_embedded(pkt)
120 if pkt.haslayer(ICMPv6EchoRequest):
121 del new['ICMPv6EchoRequest'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
124 pkt['ICMPv6EchoRequest'].cksum)
125 if pkt.haslayer(ICMPv6EchoReply):
126 del new['ICMPv6EchoReply'].cksum
127 new = new.__class__(str(new))
128 self.assertEqual(new['ICMPv6EchoReply'].cksum,
129 pkt['ICMPv6EchoReply'].cksum)
131 def create_stream_in(self, in_if, out_if, ttl=64):
133 Create packet stream for inside network
135 :param in_if: Inside interface
136 :param out_if: Outside interface
137 :param ttl: TTL of generated packets
141 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
142 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
143 TCP(sport=self.tcp_port_in, dport=20))
147 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
148 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
149 UDP(sport=self.udp_port_in, dport=20))
153 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
154 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
155 ICMP(id=self.icmp_id_in, type='echo-request'))
160 def compose_ip6(self, ip4, pref, plen):
162 Compose IPv4-embedded IPv6 addresses
164 :param ip4: IPv4 address
165 :param pref: IPv6 prefix
166 :param plen: IPv6 prefix length
167 :returns: IPv4-embedded IPv6 addresses
169 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
170 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
185 pref_n[10] = ip4_n[3]
189 pref_n[10] = ip4_n[2]
190 pref_n[11] = ip4_n[3]
193 pref_n[10] = ip4_n[1]
194 pref_n[11] = ip4_n[2]
195 pref_n[12] = ip4_n[3]
197 pref_n[12] = ip4_n[0]
198 pref_n[13] = ip4_n[1]
199 pref_n[14] = ip4_n[2]
200 pref_n[15] = ip4_n[3]
201 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
203 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
205 Create IPv6 packet stream for inside network
207 :param in_if: Inside interface
208 :param out_if: Outside interface
209 :param ttl: Hop Limit of generated packets
210 :param pref: NAT64 prefix
211 :param plen: NAT64 prefix length
215 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
217 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
220 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
221 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
222 TCP(sport=self.tcp_port_in, dport=20))
226 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
227 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
228 UDP(sport=self.udp_port_in, dport=20))
232 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
233 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
234 ICMPv6EchoRequest(id=self.icmp_id_in))
239 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
241 Create packet stream for outside network
243 :param out_if: Outside interface
244 :param dst_ip: Destination IP address (Default use global NAT address)
245 :param ttl: TTL of generated packets
248 dst_ip = self.nat_addr
251 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
252 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
253 TCP(dport=self.tcp_port_out, sport=20))
257 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
258 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
259 UDP(dport=self.udp_port_out, sport=20))
263 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
264 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
265 ICMP(id=self.icmp_id_out, type='echo-reply'))
270 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
271 packet_num=3, dst_ip=None):
273 Verify captured packets on outside network
275 :param capture: Captured packets
276 :param nat_ip: Translated IP address (Default use global NAT address)
277 :param same_port: Sorce port number is not translated (Default False)
278 :param packet_num: Expected number of packets (Default 3)
279 :param dst_ip: Destination IP address (Default do not verify)
282 nat_ip = self.nat_addr
283 self.assertEqual(packet_num, len(capture))
284 for packet in capture:
286 self.check_ip_checksum(packet)
287 self.assertEqual(packet[IP].src, nat_ip)
288 if dst_ip is not None:
289 self.assertEqual(packet[IP].dst, dst_ip)
290 if packet.haslayer(TCP):
292 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
295 packet[TCP].sport, self.tcp_port_in)
296 self.tcp_port_out = packet[TCP].sport
297 self.check_tcp_checksum(packet)
298 elif packet.haslayer(UDP):
300 self.assertEqual(packet[UDP].sport, self.udp_port_in)
303 packet[UDP].sport, self.udp_port_in)
304 self.udp_port_out = packet[UDP].sport
307 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
309 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
310 self.icmp_id_out = packet[ICMP].id
311 self.check_icmp_checksum(packet)
313 self.logger.error(ppp("Unexpected or invalid packet "
314 "(outside network):", packet))
317 def verify_capture_in(self, capture, in_if, packet_num=3):
319 Verify captured packets on inside network
321 :param capture: Captured packets
322 :param in_if: Inside interface
323 :param packet_num: Expected number of packets (Default 3)
325 self.assertEqual(packet_num, len(capture))
326 for packet in capture:
328 self.check_ip_checksum(packet)
329 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
330 if packet.haslayer(TCP):
331 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
332 self.check_tcp_checksum(packet)
333 elif packet.haslayer(UDP):
334 self.assertEqual(packet[UDP].dport, self.udp_port_in)
336 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
337 self.check_icmp_checksum(packet)
339 self.logger.error(ppp("Unexpected or invalid packet "
340 "(inside network):", packet))
343 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
345 Verify captured IPv6 packets on inside network
347 :param capture: Captured packets
348 :param src_ip: Source IP
349 :param dst_ip: Destination IP address
350 :param packet_num: Expected number of packets (Default 3)
352 self.assertEqual(packet_num, len(capture))
353 for packet in capture:
355 self.assertEqual(packet[IPv6].src, src_ip)
356 self.assertEqual(packet[IPv6].dst, dst_ip)
357 if packet.haslayer(TCP):
358 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
359 self.check_tcp_checksum(packet)
360 elif packet.haslayer(UDP):
361 self.assertEqual(packet[UDP].dport, self.udp_port_in)
362 self.check_udp_checksum(packet)
364 self.assertEqual(packet[ICMPv6EchoReply].id,
366 self.check_icmpv6_checksum(packet)
368 self.logger.error(ppp("Unexpected or invalid packet "
369 "(inside network):", packet))
372 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
374 Verify captured packet that don't have to be translated
376 :param capture: Captured packets
377 :param ingress_if: Ingress interface
378 :param egress_if: Egress interface
380 for packet in capture:
382 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
383 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
384 if packet.haslayer(TCP):
385 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
386 elif packet.haslayer(UDP):
387 self.assertEqual(packet[UDP].sport, self.udp_port_in)
389 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
391 self.logger.error(ppp("Unexpected or invalid packet "
392 "(inside network):", packet))
395 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
396 packet_num=3, icmp_type=11):
398 Verify captured packets with ICMP errors on outside network
400 :param capture: Captured packets
401 :param src_ip: Translated IP address or IP address of VPP
402 (Default use global NAT address)
403 :param packet_num: Expected number of packets (Default 3)
404 :param icmp_type: Type of error ICMP packet
405 we are expecting (Default 11)
408 src_ip = self.nat_addr
409 self.assertEqual(packet_num, len(capture))
410 for packet in capture:
412 self.assertEqual(packet[IP].src, src_ip)
413 self.assertTrue(packet.haslayer(ICMP))
415 self.assertEqual(icmp.type, icmp_type)
416 self.assertTrue(icmp.haslayer(IPerror))
417 inner_ip = icmp[IPerror]
418 if inner_ip.haslayer(TCPerror):
419 self.assertEqual(inner_ip[TCPerror].dport,
421 elif inner_ip.haslayer(UDPerror):
422 self.assertEqual(inner_ip[UDPerror].dport,
425 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
427 self.logger.error(ppp("Unexpected or invalid packet "
428 "(outside network):", packet))
431 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
434 Verify captured packets with ICMP errors on inside network
436 :param capture: Captured packets
437 :param in_if: Inside interface
438 :param packet_num: Expected number of packets (Default 3)
439 :param icmp_type: Type of error ICMP packet
440 we are expecting (Default 11)
442 self.assertEqual(packet_num, len(capture))
443 for packet in capture:
445 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
446 self.assertTrue(packet.haslayer(ICMP))
448 self.assertEqual(icmp.type, icmp_type)
449 self.assertTrue(icmp.haslayer(IPerror))
450 inner_ip = icmp[IPerror]
451 if inner_ip.haslayer(TCPerror):
452 self.assertEqual(inner_ip[TCPerror].sport,
454 elif inner_ip.haslayer(UDPerror):
455 self.assertEqual(inner_ip[UDPerror].sport,
458 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
460 self.logger.error(ppp("Unexpected or invalid packet "
461 "(inside network):", packet))
464 def verify_ipfix_nat44_ses(self, data):
466 Verify IPFIX NAT44 session create/delete event
468 :param data: Decoded IPFIX data records
470 nat44_ses_create_num = 0
471 nat44_ses_delete_num = 0
472 self.assertEqual(6, len(data))
475 self.assertIn(ord(record[230]), [4, 5])
476 if ord(record[230]) == 4:
477 nat44_ses_create_num += 1
479 nat44_ses_delete_num += 1
481 self.assertEqual(self.pg0.remote_ip4n, record[8])
482 # postNATSourceIPv4Address
483 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
486 self.assertEqual(struct.pack("!I", 0), record[234])
487 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
488 if IP_PROTOS.icmp == ord(record[4]):
489 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
490 self.assertEqual(struct.pack("!H", self.icmp_id_out),
492 elif IP_PROTOS.tcp == ord(record[4]):
493 self.assertEqual(struct.pack("!H", self.tcp_port_in),
495 self.assertEqual(struct.pack("!H", self.tcp_port_out),
497 elif IP_PROTOS.udp == ord(record[4]):
498 self.assertEqual(struct.pack("!H", self.udp_port_in),
500 self.assertEqual(struct.pack("!H", self.udp_port_out),
503 self.fail("Invalid protocol")
504 self.assertEqual(3, nat44_ses_create_num)
505 self.assertEqual(3, nat44_ses_delete_num)
507 def verify_ipfix_addr_exhausted(self, data):
509 Verify IPFIX NAT addresses event
511 :param data: Decoded IPFIX data records
513 self.assertEqual(1, len(data))
516 self.assertEqual(ord(record[230]), 3)
518 self.assertEqual(struct.pack("!I", 0), record[283])
521 class TestNAT44(MethodHolder):
522 """ NAT44 Test Cases """
526 super(TestNAT44, cls).setUpClass()
529 cls.tcp_port_in = 6303
530 cls.tcp_port_out = 6303
531 cls.udp_port_in = 6304
532 cls.udp_port_out = 6304
533 cls.icmp_id_in = 6305
534 cls.icmp_id_out = 6305
535 cls.nat_addr = '10.0.0.3'
536 cls.ipfix_src_port = 4739
537 cls.ipfix_domain_id = 1
539 cls.create_pg_interfaces(range(9))
540 cls.interfaces = list(cls.pg_interfaces[0:4])
542 for i in cls.interfaces:
547 cls.pg0.generate_remote_hosts(3)
548 cls.pg0.configure_ipv4_neighbors()
550 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
552 cls.pg4._local_ip4 = "172.16.255.1"
553 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
554 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
555 cls.pg4.set_table_ip4(10)
556 cls.pg5._local_ip4 = "172.17.255.3"
557 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
558 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
559 cls.pg5.set_table_ip4(10)
560 cls.pg6._local_ip4 = "172.16.255.1"
561 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
562 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
563 cls.pg6.set_table_ip4(20)
564 for i in cls.overlapping_interfaces:
573 super(TestNAT44, cls).tearDownClass()
576 def clear_nat44(self):
578 Clear NAT44 configuration.
580 # I found no elegant way to do this
581 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
582 dst_address_length=32,
583 next_hop_address=self.pg7.remote_ip4n,
584 next_hop_sw_if_index=self.pg7.sw_if_index,
586 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
587 dst_address_length=32,
588 next_hop_address=self.pg8.remote_ip4n,
589 next_hop_sw_if_index=self.pg8.sw_if_index,
592 for intf in [self.pg7, self.pg8]:
593 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
595 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
600 if self.pg7.has_ip4_config:
601 self.pg7.unconfig_ip4()
603 interfaces = self.vapi.nat44_interface_addr_dump()
604 for intf in interfaces:
605 self.vapi.nat44_add_interface_addr(intf.sw_if_index, is_add=0)
607 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
608 domain_id=self.ipfix_domain_id)
609 self.ipfix_src_port = 4739
610 self.ipfix_domain_id = 1
612 interfaces = self.vapi.nat44_interface_dump()
613 for intf in interfaces:
614 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
618 interfaces = self.vapi.nat44_interface_output_feature_dump()
619 for intf in interfaces:
620 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
624 static_mappings = self.vapi.nat44_static_mapping_dump()
625 for sm in static_mappings:
626 self.vapi.nat44_add_del_static_mapping(
628 sm.external_ip_address,
629 local_port=sm.local_port,
630 external_port=sm.external_port,
631 addr_only=sm.addr_only,
633 protocol=sm.protocol,
636 adresses = self.vapi.nat44_address_dump()
637 for addr in adresses:
638 self.vapi.nat44_add_del_address_range(addr.ip_address,
642 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
643 local_port=0, external_port=0, vrf_id=0,
644 is_add=1, external_sw_if_index=0xFFFFFFFF,
647 Add/delete NAT44 static mapping
649 :param local_ip: Local IP address
650 :param external_ip: External IP address
651 :param local_port: Local port number (Optional)
652 :param external_port: External port number (Optional)
653 :param vrf_id: VRF ID (Default 0)
654 :param is_add: 1 if add, 0 if delete (Default add)
655 :param external_sw_if_index: External interface instead of IP address
656 :param proto: IP protocol (Mandatory if port specified)
659 if local_port and external_port:
661 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
662 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
663 self.vapi.nat44_add_del_static_mapping(
666 external_sw_if_index,
674 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
676 Add/delete NAT44 address
678 :param ip: IP address
679 :param is_add: 1 if add, 0 if delete (Default add)
681 nat_addr = socket.inet_pton(socket.AF_INET, ip)
682 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
685 def test_dynamic(self):
686 """ NAT44 dynamic translation test """
688 self.nat44_add_address(self.nat_addr)
689 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
690 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
694 pkts = self.create_stream_in(self.pg0, self.pg1)
695 self.pg0.add_stream(pkts)
696 self.pg_enable_capture(self.pg_interfaces)
698 capture = self.pg1.get_capture(len(pkts))
699 self.verify_capture_out(capture)
702 pkts = self.create_stream_out(self.pg1)
703 self.pg1.add_stream(pkts)
704 self.pg_enable_capture(self.pg_interfaces)
706 capture = self.pg0.get_capture(len(pkts))
707 self.verify_capture_in(capture, self.pg0)
709 def test_dynamic_icmp_errors_in2out_ttl_1(self):
710 """ NAT44 handling of client packets with TTL=1 """
712 self.nat44_add_address(self.nat_addr)
713 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
714 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
717 # Client side - generate traffic
718 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
719 self.pg0.add_stream(pkts)
720 self.pg_enable_capture(self.pg_interfaces)
723 # Client side - verify ICMP type 11 packets
724 capture = self.pg0.get_capture(len(pkts))
725 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
727 def test_dynamic_icmp_errors_out2in_ttl_1(self):
728 """ NAT44 handling of server packets with TTL=1 """
730 self.nat44_add_address(self.nat_addr)
731 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
732 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
735 # Client side - create sessions
736 pkts = self.create_stream_in(self.pg0, self.pg1)
737 self.pg0.add_stream(pkts)
738 self.pg_enable_capture(self.pg_interfaces)
741 # Server side - generate traffic
742 capture = self.pg1.get_capture(len(pkts))
743 self.verify_capture_out(capture)
744 pkts = self.create_stream_out(self.pg1, ttl=1)
745 self.pg1.add_stream(pkts)
746 self.pg_enable_capture(self.pg_interfaces)
749 # Server side - verify ICMP type 11 packets
750 capture = self.pg1.get_capture(len(pkts))
751 self.verify_capture_out_with_icmp_errors(capture,
752 src_ip=self.pg1.local_ip4)
754 def test_dynamic_icmp_errors_in2out_ttl_2(self):
755 """ NAT44 handling of error responses to client packets with TTL=2 """
757 self.nat44_add_address(self.nat_addr)
758 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
759 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
762 # Client side - generate traffic
763 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
764 self.pg0.add_stream(pkts)
765 self.pg_enable_capture(self.pg_interfaces)
768 # Server side - simulate ICMP type 11 response
769 capture = self.pg1.get_capture(len(pkts))
770 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
771 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
772 ICMP(type=11) / packet[IP] for packet in capture]
773 self.pg1.add_stream(pkts)
774 self.pg_enable_capture(self.pg_interfaces)
777 # Client side - verify ICMP type 11 packets
778 capture = self.pg0.get_capture(len(pkts))
779 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
781 def test_dynamic_icmp_errors_out2in_ttl_2(self):
782 """ NAT44 handling of error responses to server packets with TTL=2 """
784 self.nat44_add_address(self.nat_addr)
785 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
786 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
789 # Client side - create sessions
790 pkts = self.create_stream_in(self.pg0, self.pg1)
791 self.pg0.add_stream(pkts)
792 self.pg_enable_capture(self.pg_interfaces)
795 # Server side - generate traffic
796 capture = self.pg1.get_capture(len(pkts))
797 self.verify_capture_out(capture)
798 pkts = self.create_stream_out(self.pg1, ttl=2)
799 self.pg1.add_stream(pkts)
800 self.pg_enable_capture(self.pg_interfaces)
803 # Client side - simulate ICMP type 11 response
804 capture = self.pg0.get_capture(len(pkts))
805 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
806 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
807 ICMP(type=11) / packet[IP] for packet in capture]
808 self.pg0.add_stream(pkts)
809 self.pg_enable_capture(self.pg_interfaces)
812 # Server side - verify ICMP type 11 packets
813 capture = self.pg1.get_capture(len(pkts))
814 self.verify_capture_out_with_icmp_errors(capture)
816 def test_ping_out_interface_from_outside(self):
817 """ Ping NAT44 out interface from outside network """
819 self.nat44_add_address(self.nat_addr)
820 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
821 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
824 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
825 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
826 ICMP(id=self.icmp_id_out, type='echo-request'))
828 self.pg1.add_stream(pkts)
829 self.pg_enable_capture(self.pg_interfaces)
831 capture = self.pg1.get_capture(len(pkts))
832 self.assertEqual(1, len(capture))
835 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
836 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
837 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
838 self.assertEqual(packet[ICMP].type, 0) # echo reply
840 self.logger.error(ppp("Unexpected or invalid packet "
841 "(outside network):", packet))
844 def test_ping_internal_host_from_outside(self):
845 """ Ping internal host from outside network """
847 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
848 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
849 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
853 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
854 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
855 ICMP(id=self.icmp_id_out, type='echo-request'))
856 self.pg1.add_stream(pkt)
857 self.pg_enable_capture(self.pg_interfaces)
859 capture = self.pg0.get_capture(1)
860 self.verify_capture_in(capture, self.pg0, packet_num=1)
861 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
864 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
865 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
866 ICMP(id=self.icmp_id_in, type='echo-reply'))
867 self.pg0.add_stream(pkt)
868 self.pg_enable_capture(self.pg_interfaces)
870 capture = self.pg1.get_capture(1)
871 self.verify_capture_out(capture, same_port=True, packet_num=1)
872 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
874 def test_static_in(self):
875 """ 1:1 NAT initialized from inside network """
878 self.tcp_port_out = 6303
879 self.udp_port_out = 6304
880 self.icmp_id_out = 6305
882 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
883 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
884 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
888 pkts = self.create_stream_in(self.pg0, self.pg1)
889 self.pg0.add_stream(pkts)
890 self.pg_enable_capture(self.pg_interfaces)
892 capture = self.pg1.get_capture(len(pkts))
893 self.verify_capture_out(capture, nat_ip, True)
896 pkts = self.create_stream_out(self.pg1, nat_ip)
897 self.pg1.add_stream(pkts)
898 self.pg_enable_capture(self.pg_interfaces)
900 capture = self.pg0.get_capture(len(pkts))
901 self.verify_capture_in(capture, self.pg0)
903 def test_static_out(self):
904 """ 1:1 NAT initialized from outside network """
907 self.tcp_port_out = 6303
908 self.udp_port_out = 6304
909 self.icmp_id_out = 6305
911 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
912 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
913 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
917 pkts = self.create_stream_out(self.pg1, nat_ip)
918 self.pg1.add_stream(pkts)
919 self.pg_enable_capture(self.pg_interfaces)
921 capture = self.pg0.get_capture(len(pkts))
922 self.verify_capture_in(capture, self.pg0)
925 pkts = self.create_stream_in(self.pg0, self.pg1)
926 self.pg0.add_stream(pkts)
927 self.pg_enable_capture(self.pg_interfaces)
929 capture = self.pg1.get_capture(len(pkts))
930 self.verify_capture_out(capture, nat_ip, True)
932 def test_static_with_port_in(self):
933 """ 1:1 NAPT initialized from inside network """
935 self.tcp_port_out = 3606
936 self.udp_port_out = 3607
937 self.icmp_id_out = 3608
939 self.nat44_add_address(self.nat_addr)
940 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
941 self.tcp_port_in, self.tcp_port_out,
943 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
944 self.udp_port_in, self.udp_port_out,
946 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
947 self.icmp_id_in, self.icmp_id_out,
948 proto=IP_PROTOS.icmp)
949 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
950 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
954 pkts = self.create_stream_in(self.pg0, self.pg1)
955 self.pg0.add_stream(pkts)
956 self.pg_enable_capture(self.pg_interfaces)
958 capture = self.pg1.get_capture(len(pkts))
959 self.verify_capture_out(capture)
962 pkts = self.create_stream_out(self.pg1)
963 self.pg1.add_stream(pkts)
964 self.pg_enable_capture(self.pg_interfaces)
966 capture = self.pg0.get_capture(len(pkts))
967 self.verify_capture_in(capture, self.pg0)
969 def test_static_with_port_out(self):
970 """ 1:1 NAPT initialized from outside network """
972 self.tcp_port_out = 30606
973 self.udp_port_out = 30607
974 self.icmp_id_out = 30608
976 self.nat44_add_address(self.nat_addr)
977 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
978 self.tcp_port_in, self.tcp_port_out,
980 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
981 self.udp_port_in, self.udp_port_out,
983 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
984 self.icmp_id_in, self.icmp_id_out,
985 proto=IP_PROTOS.icmp)
986 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
987 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
991 pkts = self.create_stream_out(self.pg1)
992 self.pg1.add_stream(pkts)
993 self.pg_enable_capture(self.pg_interfaces)
995 capture = self.pg0.get_capture(len(pkts))
996 self.verify_capture_in(capture, self.pg0)
999 pkts = self.create_stream_in(self.pg0, self.pg1)
1000 self.pg0.add_stream(pkts)
1001 self.pg_enable_capture(self.pg_interfaces)
1003 capture = self.pg1.get_capture(len(pkts))
1004 self.verify_capture_out(capture)
1006 def test_static_vrf_aware(self):
1007 """ 1:1 NAT VRF awareness """
1009 nat_ip1 = "10.0.0.30"
1010 nat_ip2 = "10.0.0.40"
1011 self.tcp_port_out = 6303
1012 self.udp_port_out = 6304
1013 self.icmp_id_out = 6305
1015 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1017 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1019 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1021 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1022 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1024 # inside interface VRF match NAT44 static mapping VRF
1025 pkts = self.create_stream_in(self.pg4, self.pg3)
1026 self.pg4.add_stream(pkts)
1027 self.pg_enable_capture(self.pg_interfaces)
1029 capture = self.pg3.get_capture(len(pkts))
1030 self.verify_capture_out(capture, nat_ip1, True)
1032 # inside interface VRF don't match NAT44 static mapping VRF (packets
1034 pkts = self.create_stream_in(self.pg0, self.pg3)
1035 self.pg0.add_stream(pkts)
1036 self.pg_enable_capture(self.pg_interfaces)
1038 self.pg3.assert_nothing_captured()
1040 def test_multiple_inside_interfaces(self):
1041 """ NAT44 multiple non-overlapping address space inside interfaces """
1043 self.nat44_add_address(self.nat_addr)
1044 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1045 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1046 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1049 # between two NAT44 inside interfaces (no translation)
1050 pkts = self.create_stream_in(self.pg0, self.pg1)
1051 self.pg0.add_stream(pkts)
1052 self.pg_enable_capture(self.pg_interfaces)
1054 capture = self.pg1.get_capture(len(pkts))
1055 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1057 # from NAT44 inside to interface without NAT44 feature (no translation)
1058 pkts = self.create_stream_in(self.pg0, self.pg2)
1059 self.pg0.add_stream(pkts)
1060 self.pg_enable_capture(self.pg_interfaces)
1062 capture = self.pg2.get_capture(len(pkts))
1063 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1065 # in2out 1st interface
1066 pkts = self.create_stream_in(self.pg0, self.pg3)
1067 self.pg0.add_stream(pkts)
1068 self.pg_enable_capture(self.pg_interfaces)
1070 capture = self.pg3.get_capture(len(pkts))
1071 self.verify_capture_out(capture)
1073 # out2in 1st interface
1074 pkts = self.create_stream_out(self.pg3)
1075 self.pg3.add_stream(pkts)
1076 self.pg_enable_capture(self.pg_interfaces)
1078 capture = self.pg0.get_capture(len(pkts))
1079 self.verify_capture_in(capture, self.pg0)
1081 # in2out 2nd interface
1082 pkts = self.create_stream_in(self.pg1, self.pg3)
1083 self.pg1.add_stream(pkts)
1084 self.pg_enable_capture(self.pg_interfaces)
1086 capture = self.pg3.get_capture(len(pkts))
1087 self.verify_capture_out(capture)
1089 # out2in 2nd interface
1090 pkts = self.create_stream_out(self.pg3)
1091 self.pg3.add_stream(pkts)
1092 self.pg_enable_capture(self.pg_interfaces)
1094 capture = self.pg1.get_capture(len(pkts))
1095 self.verify_capture_in(capture, self.pg1)
1097 def test_inside_overlapping_interfaces(self):
1098 """ NAT44 multiple inside interfaces with overlapping address space """
1100 static_nat_ip = "10.0.0.10"
1101 self.nat44_add_address(self.nat_addr)
1102 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1104 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1105 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1106 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1107 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1110 # between NAT44 inside interfaces with same VRF (no translation)
1111 pkts = self.create_stream_in(self.pg4, self.pg5)
1112 self.pg4.add_stream(pkts)
1113 self.pg_enable_capture(self.pg_interfaces)
1115 capture = self.pg5.get_capture(len(pkts))
1116 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1118 # between NAT44 inside interfaces with different VRF (hairpinning)
1119 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1120 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1121 TCP(sport=1234, dport=5678))
1122 self.pg4.add_stream(p)
1123 self.pg_enable_capture(self.pg_interfaces)
1125 capture = self.pg6.get_capture(1)
1130 self.assertEqual(ip.src, self.nat_addr)
1131 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1132 self.assertNotEqual(tcp.sport, 1234)
1133 self.assertEqual(tcp.dport, 5678)
1135 self.logger.error(ppp("Unexpected or invalid packet:", p))
1138 # in2out 1st interface
1139 pkts = self.create_stream_in(self.pg4, self.pg3)
1140 self.pg4.add_stream(pkts)
1141 self.pg_enable_capture(self.pg_interfaces)
1143 capture = self.pg3.get_capture(len(pkts))
1144 self.verify_capture_out(capture)
1146 # out2in 1st interface
1147 pkts = self.create_stream_out(self.pg3)
1148 self.pg3.add_stream(pkts)
1149 self.pg_enable_capture(self.pg_interfaces)
1151 capture = self.pg4.get_capture(len(pkts))
1152 self.verify_capture_in(capture, self.pg4)
1154 # in2out 2nd interface
1155 pkts = self.create_stream_in(self.pg5, self.pg3)
1156 self.pg5.add_stream(pkts)
1157 self.pg_enable_capture(self.pg_interfaces)
1159 capture = self.pg3.get_capture(len(pkts))
1160 self.verify_capture_out(capture)
1162 # out2in 2nd interface
1163 pkts = self.create_stream_out(self.pg3)
1164 self.pg3.add_stream(pkts)
1165 self.pg_enable_capture(self.pg_interfaces)
1167 capture = self.pg5.get_capture(len(pkts))
1168 self.verify_capture_in(capture, self.pg5)
1171 addresses = self.vapi.nat44_address_dump()
1172 self.assertEqual(len(addresses), 1)
1173 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1174 self.assertEqual(len(sessions), 3)
1175 for session in sessions:
1176 self.assertFalse(session.is_static)
1177 self.assertEqual(session.inside_ip_address[0:4],
1178 self.pg5.remote_ip4n)
1179 self.assertEqual(session.outside_ip_address,
1180 addresses[0].ip_address)
1181 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1182 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1183 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1184 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1185 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1186 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1187 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1188 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1189 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1191 # in2out 3rd interface
1192 pkts = self.create_stream_in(self.pg6, self.pg3)
1193 self.pg6.add_stream(pkts)
1194 self.pg_enable_capture(self.pg_interfaces)
1196 capture = self.pg3.get_capture(len(pkts))
1197 self.verify_capture_out(capture, static_nat_ip, True)
1199 # out2in 3rd interface
1200 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1201 self.pg3.add_stream(pkts)
1202 self.pg_enable_capture(self.pg_interfaces)
1204 capture = self.pg6.get_capture(len(pkts))
1205 self.verify_capture_in(capture, self.pg6)
1207 # general user and session dump verifications
1208 users = self.vapi.nat44_user_dump()
1209 self.assertTrue(len(users) >= 3)
1210 addresses = self.vapi.nat44_address_dump()
1211 self.assertEqual(len(addresses), 1)
1213 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1215 for session in sessions:
1216 self.assertEqual(user.ip_address, session.inside_ip_address)
1217 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1218 self.assertTrue(session.protocol in
1219 [IP_PROTOS.tcp, IP_PROTOS.udp,
1223 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1224 self.assertTrue(len(sessions) >= 4)
1225 for session in sessions:
1226 self.assertFalse(session.is_static)
1227 self.assertEqual(session.inside_ip_address[0:4],
1228 self.pg4.remote_ip4n)
1229 self.assertEqual(session.outside_ip_address,
1230 addresses[0].ip_address)
1233 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1234 self.assertTrue(len(sessions) >= 3)
1235 for session in sessions:
1236 self.assertTrue(session.is_static)
1237 self.assertEqual(session.inside_ip_address[0:4],
1238 self.pg6.remote_ip4n)
1239 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1240 map(int, static_nat_ip.split('.')))
1241 self.assertTrue(session.inside_port in
1242 [self.tcp_port_in, self.udp_port_in,
1245 def test_hairpinning(self):
1246 """ NAT44 hairpinning - 1:1 NAPT """
1248 host = self.pg0.remote_hosts[0]
1249 server = self.pg0.remote_hosts[1]
1252 server_in_port = 5678
1253 server_out_port = 8765
1255 self.nat44_add_address(self.nat_addr)
1256 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1257 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1259 # add static mapping for server
1260 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1261 server_in_port, server_out_port,
1262 proto=IP_PROTOS.tcp)
1264 # send packet from host to server
1265 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1266 IP(src=host.ip4, dst=self.nat_addr) /
1267 TCP(sport=host_in_port, dport=server_out_port))
1268 self.pg0.add_stream(p)
1269 self.pg_enable_capture(self.pg_interfaces)
1271 capture = self.pg0.get_capture(1)
1276 self.assertEqual(ip.src, self.nat_addr)
1277 self.assertEqual(ip.dst, server.ip4)
1278 self.assertNotEqual(tcp.sport, host_in_port)
1279 self.assertEqual(tcp.dport, server_in_port)
1280 self.check_tcp_checksum(p)
1281 host_out_port = tcp.sport
1283 self.logger.error(ppp("Unexpected or invalid packet:", p))
1286 # send reply from server to host
1287 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1288 IP(src=server.ip4, dst=self.nat_addr) /
1289 TCP(sport=server_in_port, dport=host_out_port))
1290 self.pg0.add_stream(p)
1291 self.pg_enable_capture(self.pg_interfaces)
1293 capture = self.pg0.get_capture(1)
1298 self.assertEqual(ip.src, self.nat_addr)
1299 self.assertEqual(ip.dst, host.ip4)
1300 self.assertEqual(tcp.sport, server_out_port)
1301 self.assertEqual(tcp.dport, host_in_port)
1302 self.check_tcp_checksum(p)
1304 self.logger.error(ppp("Unexpected or invalid packet:"), p)
1307 def test_hairpinning2(self):
1308 """ NAT44 hairpinning - 1:1 NAT"""
1310 server1_nat_ip = "10.0.0.10"
1311 server2_nat_ip = "10.0.0.11"
1312 host = self.pg0.remote_hosts[0]
1313 server1 = self.pg0.remote_hosts[1]
1314 server2 = self.pg0.remote_hosts[2]
1315 server_tcp_port = 22
1316 server_udp_port = 20
1318 self.nat44_add_address(self.nat_addr)
1319 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1320 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1323 # add static mapping for servers
1324 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1325 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1329 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1330 IP(src=host.ip4, dst=server1_nat_ip) /
1331 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1333 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1334 IP(src=host.ip4, dst=server1_nat_ip) /
1335 UDP(sport=self.udp_port_in, dport=server_udp_port))
1337 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1338 IP(src=host.ip4, dst=server1_nat_ip) /
1339 ICMP(id=self.icmp_id_in, type='echo-request'))
1341 self.pg0.add_stream(pkts)
1342 self.pg_enable_capture(self.pg_interfaces)
1344 capture = self.pg0.get_capture(len(pkts))
1345 for packet in capture:
1347 self.assertEqual(packet[IP].src, self.nat_addr)
1348 self.assertEqual(packet[IP].dst, server1.ip4)
1349 if packet.haslayer(TCP):
1350 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1351 self.assertEqual(packet[TCP].dport, server_tcp_port)
1352 self.tcp_port_out = packet[TCP].sport
1353 self.check_tcp_checksum(packet)
1354 elif packet.haslayer(UDP):
1355 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1356 self.assertEqual(packet[UDP].dport, server_udp_port)
1357 self.udp_port_out = packet[UDP].sport
1359 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1360 self.icmp_id_out = packet[ICMP].id
1362 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1367 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1368 IP(src=server1.ip4, dst=self.nat_addr) /
1369 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1371 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1372 IP(src=server1.ip4, dst=self.nat_addr) /
1373 UDP(sport=server_udp_port, dport=self.udp_port_out))
1375 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1376 IP(src=server1.ip4, dst=self.nat_addr) /
1377 ICMP(id=self.icmp_id_out, type='echo-reply'))
1379 self.pg0.add_stream(pkts)
1380 self.pg_enable_capture(self.pg_interfaces)
1382 capture = self.pg0.get_capture(len(pkts))
1383 for packet in capture:
1385 self.assertEqual(packet[IP].src, server1_nat_ip)
1386 self.assertEqual(packet[IP].dst, host.ip4)
1387 if packet.haslayer(TCP):
1388 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1389 self.assertEqual(packet[TCP].sport, server_tcp_port)
1390 self.check_tcp_checksum(packet)
1391 elif packet.haslayer(UDP):
1392 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1393 self.assertEqual(packet[UDP].sport, server_udp_port)
1395 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1397 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1400 # server2 to server1
1402 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1403 IP(src=server2.ip4, dst=server1_nat_ip) /
1404 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1406 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1407 IP(src=server2.ip4, dst=server1_nat_ip) /
1408 UDP(sport=self.udp_port_in, dport=server_udp_port))
1410 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1411 IP(src=server2.ip4, dst=server1_nat_ip) /
1412 ICMP(id=self.icmp_id_in, type='echo-request'))
1414 self.pg0.add_stream(pkts)
1415 self.pg_enable_capture(self.pg_interfaces)
1417 capture = self.pg0.get_capture(len(pkts))
1418 for packet in capture:
1420 self.assertEqual(packet[IP].src, server2_nat_ip)
1421 self.assertEqual(packet[IP].dst, server1.ip4)
1422 if packet.haslayer(TCP):
1423 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1424 self.assertEqual(packet[TCP].dport, server_tcp_port)
1425 self.tcp_port_out = packet[TCP].sport
1426 self.check_tcp_checksum(packet)
1427 elif packet.haslayer(UDP):
1428 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1429 self.assertEqual(packet[UDP].dport, server_udp_port)
1430 self.udp_port_out = packet[UDP].sport
1432 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1433 self.icmp_id_out = packet[ICMP].id
1435 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1438 # server1 to server2
1440 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1441 IP(src=server1.ip4, dst=server2_nat_ip) /
1442 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1444 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1445 IP(src=server1.ip4, dst=server2_nat_ip) /
1446 UDP(sport=server_udp_port, dport=self.udp_port_out))
1448 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1449 IP(src=server1.ip4, dst=server2_nat_ip) /
1450 ICMP(id=self.icmp_id_out, type='echo-reply'))
1452 self.pg0.add_stream(pkts)
1453 self.pg_enable_capture(self.pg_interfaces)
1455 capture = self.pg0.get_capture(len(pkts))
1456 for packet in capture:
1458 self.assertEqual(packet[IP].src, server1_nat_ip)
1459 self.assertEqual(packet[IP].dst, server2.ip4)
1460 if packet.haslayer(TCP):
1461 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1462 self.assertEqual(packet[TCP].sport, server_tcp_port)
1463 self.check_tcp_checksum(packet)
1464 elif packet.haslayer(UDP):
1465 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1466 self.assertEqual(packet[UDP].sport, server_udp_port)
1468 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1470 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1473 def test_max_translations_per_user(self):
1474 """ MAX translations per user - recycle the least recently used """
1476 self.nat44_add_address(self.nat_addr)
1477 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1478 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1481 # get maximum number of translations per user
1482 nat44_config = self.vapi.nat_show_config()
1484 # send more than maximum number of translations per user packets
1485 pkts_num = nat44_config.max_translations_per_user + 5
1487 for port in range(0, pkts_num):
1488 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1489 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1490 TCP(sport=1025 + port))
1492 self.pg0.add_stream(pkts)
1493 self.pg_enable_capture(self.pg_interfaces)
1496 # verify number of translated packet
1497 self.pg1.get_capture(pkts_num)
1499 def test_interface_addr(self):
1500 """ Acquire NAT44 addresses from interface """
1501 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1503 # no address in NAT pool
1504 adresses = self.vapi.nat44_address_dump()
1505 self.assertEqual(0, len(adresses))
1507 # configure interface address and check NAT address pool
1508 self.pg7.config_ip4()
1509 adresses = self.vapi.nat44_address_dump()
1510 self.assertEqual(1, len(adresses))
1511 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1513 # remove interface address and check NAT address pool
1514 self.pg7.unconfig_ip4()
1515 adresses = self.vapi.nat44_address_dump()
1516 self.assertEqual(0, len(adresses))
1518 def test_interface_addr_static_mapping(self):
1519 """ Static mapping with addresses from interface """
1520 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1521 self.nat44_add_static_mapping(
1523 external_sw_if_index=self.pg7.sw_if_index)
1525 # static mappings with external interface
1526 static_mappings = self.vapi.nat44_static_mapping_dump()
1527 self.assertEqual(1, len(static_mappings))
1528 self.assertEqual(self.pg7.sw_if_index,
1529 static_mappings[0].external_sw_if_index)
1531 # configure interface address and check static mappings
1532 self.pg7.config_ip4()
1533 static_mappings = self.vapi.nat44_static_mapping_dump()
1534 self.assertEqual(1, len(static_mappings))
1535 self.assertEqual(static_mappings[0].external_ip_address[0:4],
1536 self.pg7.local_ip4n)
1537 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1539 # remove interface address and check static mappings
1540 self.pg7.unconfig_ip4()
1541 static_mappings = self.vapi.nat44_static_mapping_dump()
1542 self.assertEqual(0, len(static_mappings))
1544 def test_ipfix_nat44_sess(self):
1545 """ IPFIX logging NAT44 session created/delted """
1546 self.ipfix_domain_id = 10
1547 self.ipfix_src_port = 20202
1548 colector_port = 30303
1549 bind_layers(UDP, IPFIX, dport=30303)
1550 self.nat44_add_address(self.nat_addr)
1551 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1552 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1554 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1555 src_address=self.pg3.local_ip4n,
1557 template_interval=10,
1558 collector_port=colector_port)
1559 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1560 src_port=self.ipfix_src_port)
1562 pkts = self.create_stream_in(self.pg0, self.pg1)
1563 self.pg0.add_stream(pkts)
1564 self.pg_enable_capture(self.pg_interfaces)
1566 capture = self.pg1.get_capture(len(pkts))
1567 self.verify_capture_out(capture)
1568 self.nat44_add_address(self.nat_addr, is_add=0)
1569 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1570 capture = self.pg3.get_capture(3)
1571 ipfix = IPFIXDecoder()
1572 # first load template
1574 self.assertTrue(p.haslayer(IPFIX))
1575 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1576 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1577 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1578 self.assertEqual(p[UDP].dport, colector_port)
1579 self.assertEqual(p[IPFIX].observationDomainID,
1580 self.ipfix_domain_id)
1581 if p.haslayer(Template):
1582 ipfix.add_template(p.getlayer(Template))
1583 # verify events in data set
1585 if p.haslayer(Data):
1586 data = ipfix.decode_data_set(p.getlayer(Set))
1587 self.verify_ipfix_nat44_ses(data)
1589 def test_ipfix_addr_exhausted(self):
1590 """ IPFIX logging NAT addresses exhausted """
1591 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1592 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1594 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1595 src_address=self.pg3.local_ip4n,
1597 template_interval=10)
1598 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1599 src_port=self.ipfix_src_port)
1601 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1602 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1604 self.pg0.add_stream(p)
1605 self.pg_enable_capture(self.pg_interfaces)
1607 capture = self.pg1.get_capture(0)
1608 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1609 capture = self.pg3.get_capture(3)
1610 ipfix = IPFIXDecoder()
1611 # first load template
1613 self.assertTrue(p.haslayer(IPFIX))
1614 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1615 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1616 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1617 self.assertEqual(p[UDP].dport, 4739)
1618 self.assertEqual(p[IPFIX].observationDomainID,
1619 self.ipfix_domain_id)
1620 if p.haslayer(Template):
1621 ipfix.add_template(p.getlayer(Template))
1622 # verify events in data set
1624 if p.haslayer(Data):
1625 data = ipfix.decode_data_set(p.getlayer(Set))
1626 self.verify_ipfix_addr_exhausted(data)
1628 def test_pool_addr_fib(self):
1629 """ NAT44 add pool addresses to FIB """
1630 static_addr = '10.0.0.10'
1631 self.nat44_add_address(self.nat_addr)
1632 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1633 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1635 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
1638 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1639 ARP(op=ARP.who_has, pdst=self.nat_addr,
1640 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1641 self.pg1.add_stream(p)
1642 self.pg_enable_capture(self.pg_interfaces)
1644 capture = self.pg1.get_capture(1)
1645 self.assertTrue(capture[0].haslayer(ARP))
1646 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1649 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1650 ARP(op=ARP.who_has, pdst=static_addr,
1651 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1652 self.pg1.add_stream(p)
1653 self.pg_enable_capture(self.pg_interfaces)
1655 capture = self.pg1.get_capture(1)
1656 self.assertTrue(capture[0].haslayer(ARP))
1657 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1659 # send ARP to non-NAT44 interface
1660 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1661 ARP(op=ARP.who_has, pdst=self.nat_addr,
1662 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1663 self.pg2.add_stream(p)
1664 self.pg_enable_capture(self.pg_interfaces)
1666 capture = self.pg1.get_capture(0)
1668 # remove addresses and verify
1669 self.nat44_add_address(self.nat_addr, is_add=0)
1670 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
1673 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1674 ARP(op=ARP.who_has, pdst=self.nat_addr,
1675 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1676 self.pg1.add_stream(p)
1677 self.pg_enable_capture(self.pg_interfaces)
1679 capture = self.pg1.get_capture(0)
1681 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1682 ARP(op=ARP.who_has, pdst=static_addr,
1683 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1684 self.pg1.add_stream(p)
1685 self.pg_enable_capture(self.pg_interfaces)
1687 capture = self.pg1.get_capture(0)
1689 def test_vrf_mode(self):
1690 """ NAT44 tenant VRF aware address pool mode """
1694 nat_ip1 = "10.0.0.10"
1695 nat_ip2 = "10.0.0.11"
1697 self.pg0.unconfig_ip4()
1698 self.pg1.unconfig_ip4()
1699 self.pg0.set_table_ip4(vrf_id1)
1700 self.pg1.set_table_ip4(vrf_id2)
1701 self.pg0.config_ip4()
1702 self.pg1.config_ip4()
1704 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
1705 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
1706 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1707 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1708 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1712 pkts = self.create_stream_in(self.pg0, self.pg2)
1713 self.pg0.add_stream(pkts)
1714 self.pg_enable_capture(self.pg_interfaces)
1716 capture = self.pg2.get_capture(len(pkts))
1717 self.verify_capture_out(capture, nat_ip1)
1720 pkts = self.create_stream_in(self.pg1, self.pg2)
1721 self.pg1.add_stream(pkts)
1722 self.pg_enable_capture(self.pg_interfaces)
1724 capture = self.pg2.get_capture(len(pkts))
1725 self.verify_capture_out(capture, nat_ip2)
1727 def test_vrf_feature_independent(self):
1728 """ NAT44 tenant VRF independent address pool mode """
1730 nat_ip1 = "10.0.0.10"
1731 nat_ip2 = "10.0.0.11"
1733 self.nat44_add_address(nat_ip1)
1734 self.nat44_add_address(nat_ip2)
1735 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1736 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1737 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1741 pkts = self.create_stream_in(self.pg0, self.pg2)
1742 self.pg0.add_stream(pkts)
1743 self.pg_enable_capture(self.pg_interfaces)
1745 capture = self.pg2.get_capture(len(pkts))
1746 self.verify_capture_out(capture, nat_ip1)
1749 pkts = self.create_stream_in(self.pg1, self.pg2)
1750 self.pg1.add_stream(pkts)
1751 self.pg_enable_capture(self.pg_interfaces)
1753 capture = self.pg2.get_capture(len(pkts))
1754 self.verify_capture_out(capture, nat_ip1)
1756 def test_dynamic_ipless_interfaces(self):
1757 """ NAT44 interfaces without configured IP address """
1759 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1760 self.pg7.remote_mac,
1761 self.pg7.remote_ip4n,
1763 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1764 self.pg8.remote_mac,
1765 self.pg8.remote_ip4n,
1768 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1769 dst_address_length=32,
1770 next_hop_address=self.pg7.remote_ip4n,
1771 next_hop_sw_if_index=self.pg7.sw_if_index)
1772 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1773 dst_address_length=32,
1774 next_hop_address=self.pg8.remote_ip4n,
1775 next_hop_sw_if_index=self.pg8.sw_if_index)
1777 self.nat44_add_address(self.nat_addr)
1778 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1779 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1783 pkts = self.create_stream_in(self.pg7, self.pg8)
1784 self.pg7.add_stream(pkts)
1785 self.pg_enable_capture(self.pg_interfaces)
1787 capture = self.pg8.get_capture(len(pkts))
1788 self.verify_capture_out(capture)
1791 pkts = self.create_stream_out(self.pg8, self.nat_addr)
1792 self.pg8.add_stream(pkts)
1793 self.pg_enable_capture(self.pg_interfaces)
1795 capture = self.pg7.get_capture(len(pkts))
1796 self.verify_capture_in(capture, self.pg7)
1798 def test_static_ipless_interfaces(self):
1799 """ NAT44 interfaces without configured IP address - 1:1 NAT """
1801 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1802 self.pg7.remote_mac,
1803 self.pg7.remote_ip4n,
1805 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1806 self.pg8.remote_mac,
1807 self.pg8.remote_ip4n,
1810 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1811 dst_address_length=32,
1812 next_hop_address=self.pg7.remote_ip4n,
1813 next_hop_sw_if_index=self.pg7.sw_if_index)
1814 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1815 dst_address_length=32,
1816 next_hop_address=self.pg8.remote_ip4n,
1817 next_hop_sw_if_index=self.pg8.sw_if_index)
1819 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
1820 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1821 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1825 pkts = self.create_stream_out(self.pg8)
1826 self.pg8.add_stream(pkts)
1827 self.pg_enable_capture(self.pg_interfaces)
1829 capture = self.pg7.get_capture(len(pkts))
1830 self.verify_capture_in(capture, self.pg7)
1833 pkts = self.create_stream_in(self.pg7, self.pg8)
1834 self.pg7.add_stream(pkts)
1835 self.pg_enable_capture(self.pg_interfaces)
1837 capture = self.pg8.get_capture(len(pkts))
1838 self.verify_capture_out(capture, self.nat_addr, True)
1840 def test_static_with_port_ipless_interfaces(self):
1841 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
1843 self.tcp_port_out = 30606
1844 self.udp_port_out = 30607
1845 self.icmp_id_out = 30608
1847 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1848 self.pg7.remote_mac,
1849 self.pg7.remote_ip4n,
1851 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1852 self.pg8.remote_mac,
1853 self.pg8.remote_ip4n,
1856 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1857 dst_address_length=32,
1858 next_hop_address=self.pg7.remote_ip4n,
1859 next_hop_sw_if_index=self.pg7.sw_if_index)
1860 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1861 dst_address_length=32,
1862 next_hop_address=self.pg8.remote_ip4n,
1863 next_hop_sw_if_index=self.pg8.sw_if_index)
1865 self.nat44_add_address(self.nat_addr)
1866 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1867 self.tcp_port_in, self.tcp_port_out,
1868 proto=IP_PROTOS.tcp)
1869 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1870 self.udp_port_in, self.udp_port_out,
1871 proto=IP_PROTOS.udp)
1872 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1873 self.icmp_id_in, self.icmp_id_out,
1874 proto=IP_PROTOS.icmp)
1875 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1876 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1880 pkts = self.create_stream_out(self.pg8)
1881 self.pg8.add_stream(pkts)
1882 self.pg_enable_capture(self.pg_interfaces)
1884 capture = self.pg7.get_capture(len(pkts))
1885 self.verify_capture_in(capture, self.pg7)
1888 pkts = self.create_stream_in(self.pg7, self.pg8)
1889 self.pg7.add_stream(pkts)
1890 self.pg_enable_capture(self.pg_interfaces)
1892 capture = self.pg8.get_capture(len(pkts))
1893 self.verify_capture_out(capture)
1895 def test_static_unknown_proto(self):
1896 """ 1:1 NAT translate packet with unknown protocol """
1897 nat_ip = "10.0.0.10"
1898 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1899 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1900 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1904 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1905 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1907 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
1908 TCP(sport=1234, dport=1234))
1909 self.pg0.add_stream(p)
1910 self.pg_enable_capture(self.pg_interfaces)
1912 p = self.pg1.get_capture(1)
1915 self.assertEqual(packet[IP].src, nat_ip)
1916 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1917 self.assertTrue(packet.haslayer(GRE))
1918 self.check_ip_checksum(packet)
1920 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1924 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1925 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
1927 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
1928 TCP(sport=1234, dport=1234))
1929 self.pg1.add_stream(p)
1930 self.pg_enable_capture(self.pg_interfaces)
1932 p = self.pg0.get_capture(1)
1935 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
1936 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
1937 self.assertTrue(packet.haslayer(GRE))
1938 self.check_ip_checksum(packet)
1940 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1943 def test_hairpinning_static_unknown_proto(self):
1944 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
1946 host = self.pg0.remote_hosts[0]
1947 server = self.pg0.remote_hosts[1]
1949 host_nat_ip = "10.0.0.10"
1950 server_nat_ip = "10.0.0.11"
1952 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
1953 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
1954 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1955 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1959 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
1960 IP(src=host.ip4, dst=server_nat_ip) /
1962 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
1963 TCP(sport=1234, dport=1234))
1964 self.pg0.add_stream(p)
1965 self.pg_enable_capture(self.pg_interfaces)
1967 p = self.pg0.get_capture(1)
1970 self.assertEqual(packet[IP].src, host_nat_ip)
1971 self.assertEqual(packet[IP].dst, server.ip4)
1972 self.assertTrue(packet.haslayer(GRE))
1973 self.check_ip_checksum(packet)
1975 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1979 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
1980 IP(src=server.ip4, dst=host_nat_ip) /
1982 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
1983 TCP(sport=1234, dport=1234))
1984 self.pg0.add_stream(p)
1985 self.pg_enable_capture(self.pg_interfaces)
1987 p = self.pg0.get_capture(1)
1990 self.assertEqual(packet[IP].src, server_nat_ip)
1991 self.assertEqual(packet[IP].dst, host.ip4)
1992 self.assertTrue(packet.haslayer(GRE))
1993 self.check_ip_checksum(packet)
1995 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1998 def test_unknown_proto(self):
1999 """ NAT44 translate packet with unknown protocol """
2000 self.nat44_add_address(self.nat_addr)
2001 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2002 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2006 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2007 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2008 TCP(sport=self.tcp_port_in, dport=20))
2009 self.pg0.add_stream(p)
2010 self.pg_enable_capture(self.pg_interfaces)
2012 p = self.pg1.get_capture(1)
2014 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2015 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2017 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2018 TCP(sport=1234, dport=1234))
2019 self.pg0.add_stream(p)
2020 self.pg_enable_capture(self.pg_interfaces)
2022 p = self.pg1.get_capture(1)
2025 self.assertEqual(packet[IP].src, self.nat_addr)
2026 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2027 self.assertTrue(packet.haslayer(GRE))
2028 self.check_ip_checksum(packet)
2030 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2034 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2035 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2037 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2038 TCP(sport=1234, dport=1234))
2039 self.pg1.add_stream(p)
2040 self.pg_enable_capture(self.pg_interfaces)
2042 p = self.pg0.get_capture(1)
2045 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2046 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2047 self.assertTrue(packet.haslayer(GRE))
2048 self.check_ip_checksum(packet)
2050 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2053 def test_hairpinning_unknown_proto(self):
2054 """ NAT44 translate packet with unknown protocol - hairpinning """
2055 host = self.pg0.remote_hosts[0]
2056 server = self.pg0.remote_hosts[1]
2059 server_in_port = 5678
2060 server_out_port = 8765
2061 server_nat_ip = "10.0.0.11"
2063 self.nat44_add_address(self.nat_addr)
2064 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2065 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2068 # add static mapping for server
2069 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2072 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2073 IP(src=host.ip4, dst=server_nat_ip) /
2074 TCP(sport=host_in_port, dport=server_out_port))
2075 self.pg0.add_stream(p)
2076 self.pg_enable_capture(self.pg_interfaces)
2078 capture = self.pg0.get_capture(1)
2080 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2081 IP(src=host.ip4, dst=server_nat_ip) /
2083 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2084 TCP(sport=1234, dport=1234))
2085 self.pg0.add_stream(p)
2086 self.pg_enable_capture(self.pg_interfaces)
2088 p = self.pg0.get_capture(1)
2091 self.assertEqual(packet[IP].src, self.nat_addr)
2092 self.assertEqual(packet[IP].dst, server.ip4)
2093 self.assertTrue(packet.haslayer(GRE))
2094 self.check_ip_checksum(packet)
2096 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2100 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2101 IP(src=server.ip4, dst=self.nat_addr) /
2103 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2104 TCP(sport=1234, dport=1234))
2105 self.pg0.add_stream(p)
2106 self.pg_enable_capture(self.pg_interfaces)
2108 p = self.pg0.get_capture(1)
2111 self.assertEqual(packet[IP].src, server_nat_ip)
2112 self.assertEqual(packet[IP].dst, host.ip4)
2113 self.assertTrue(packet.haslayer(GRE))
2114 self.check_ip_checksum(packet)
2116 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2119 def test_output_feature(self):
2120 """ NAT44 interface output feature (in2out postrouting) """
2121 self.nat44_add_address(self.nat_addr)
2122 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2123 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2127 pkts = self.create_stream_in(self.pg0, self.pg1)
2128 self.pg0.add_stream(pkts)
2129 self.pg_enable_capture(self.pg_interfaces)
2131 capture = self.pg1.get_capture(len(pkts))
2132 self.verify_capture_out(capture)
2135 pkts = self.create_stream_out(self.pg1)
2136 self.pg1.add_stream(pkts)
2137 self.pg_enable_capture(self.pg_interfaces)
2139 capture = self.pg0.get_capture(len(pkts))
2140 self.verify_capture_in(capture, self.pg0)
2142 def test_output_feature_vrf_aware(self):
2143 """ NAT44 interface output feature VRF aware (in2out postrouting) """
2144 nat_ip_vrf10 = "10.0.0.10"
2145 nat_ip_vrf20 = "10.0.0.20"
2147 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2148 dst_address_length=32,
2149 next_hop_address=self.pg3.remote_ip4n,
2150 next_hop_sw_if_index=self.pg3.sw_if_index,
2152 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2153 dst_address_length=32,
2154 next_hop_address=self.pg3.remote_ip4n,
2155 next_hop_sw_if_index=self.pg3.sw_if_index,
2158 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2159 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2160 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2161 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2162 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2166 pkts = self.create_stream_in(self.pg4, self.pg3)
2167 self.pg4.add_stream(pkts)
2168 self.pg_enable_capture(self.pg_interfaces)
2170 capture = self.pg3.get_capture(len(pkts))
2171 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2174 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2175 self.pg3.add_stream(pkts)
2176 self.pg_enable_capture(self.pg_interfaces)
2178 capture = self.pg4.get_capture(len(pkts))
2179 self.verify_capture_in(capture, self.pg4)
2182 pkts = self.create_stream_in(self.pg6, self.pg3)
2183 self.pg6.add_stream(pkts)
2184 self.pg_enable_capture(self.pg_interfaces)
2186 capture = self.pg3.get_capture(len(pkts))
2187 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2190 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2191 self.pg3.add_stream(pkts)
2192 self.pg_enable_capture(self.pg_interfaces)
2194 capture = self.pg6.get_capture(len(pkts))
2195 self.verify_capture_in(capture, self.pg6)
2197 def test_output_feature_hairpinning(self):
2198 """ NAT44 interface output feature hairpinning (in2out postrouting) """
2199 host = self.pg0.remote_hosts[0]
2200 server = self.pg0.remote_hosts[1]
2203 server_in_port = 5678
2204 server_out_port = 8765
2206 self.nat44_add_address(self.nat_addr)
2207 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2208 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2211 # add static mapping for server
2212 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2213 server_in_port, server_out_port,
2214 proto=IP_PROTOS.tcp)
2216 # send packet from host to server
2217 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2218 IP(src=host.ip4, dst=self.nat_addr) /
2219 TCP(sport=host_in_port, dport=server_out_port))
2220 self.pg0.add_stream(p)
2221 self.pg_enable_capture(self.pg_interfaces)
2223 capture = self.pg0.get_capture(1)
2228 self.assertEqual(ip.src, self.nat_addr)
2229 self.assertEqual(ip.dst, server.ip4)
2230 self.assertNotEqual(tcp.sport, host_in_port)
2231 self.assertEqual(tcp.dport, server_in_port)
2232 self.check_tcp_checksum(p)
2233 host_out_port = tcp.sport
2235 self.logger.error(ppp("Unexpected or invalid packet:", p))
2238 # send reply from server to host
2239 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2240 IP(src=server.ip4, dst=self.nat_addr) /
2241 TCP(sport=server_in_port, dport=host_out_port))
2242 self.pg0.add_stream(p)
2243 self.pg_enable_capture(self.pg_interfaces)
2245 capture = self.pg0.get_capture(1)
2250 self.assertEqual(ip.src, self.nat_addr)
2251 self.assertEqual(ip.dst, host.ip4)
2252 self.assertEqual(tcp.sport, server_out_port)
2253 self.assertEqual(tcp.dport, host_in_port)
2254 self.check_tcp_checksum(p)
2256 self.logger.error(ppp("Unexpected or invalid packet:"), p)
2260 super(TestNAT44, self).tearDown()
2261 if not self.vpp_dead:
2262 self.logger.info(self.vapi.cli("show nat44 verbose"))
2266 class TestDeterministicNAT(MethodHolder):
2267 """ Deterministic NAT Test Cases """
2270 def setUpConstants(cls):
2271 super(TestDeterministicNAT, cls).setUpConstants()
2272 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
2275 def setUpClass(cls):
2276 super(TestDeterministicNAT, cls).setUpClass()
2279 cls.tcp_port_in = 6303
2280 cls.tcp_external_port = 6303
2281 cls.udp_port_in = 6304
2282 cls.udp_external_port = 6304
2283 cls.icmp_id_in = 6305
2284 cls.nat_addr = '10.0.0.3'
2286 cls.create_pg_interfaces(range(3))
2287 cls.interfaces = list(cls.pg_interfaces)
2289 for i in cls.interfaces:
2294 cls.pg0.generate_remote_hosts(2)
2295 cls.pg0.configure_ipv4_neighbors()
2298 super(TestDeterministicNAT, cls).tearDownClass()
2301 def create_stream_in(self, in_if, out_if, ttl=64):
2303 Create packet stream for inside network
2305 :param in_if: Inside interface
2306 :param out_if: Outside interface
2307 :param ttl: TTL of generated packets
2311 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2312 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2313 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
2317 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2318 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2319 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2323 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2324 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2325 ICMP(id=self.icmp_id_in, type='echo-request'))
2330 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2332 Create packet stream for outside network
2334 :param out_if: Outside interface
2335 :param dst_ip: Destination IP address (Default use global NAT address)
2336 :param ttl: TTL of generated packets
2339 dst_ip = self.nat_addr
2342 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2343 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2344 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2348 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2349 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2350 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2354 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2355 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2356 ICMP(id=self.icmp_external_id, type='echo-reply'))
2361 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2363 Verify captured packets on outside network
2365 :param capture: Captured packets
2366 :param nat_ip: Translated IP address (Default use global NAT address)
2367 :param same_port: Sorce port number is not translated (Default False)
2368 :param packet_num: Expected number of packets (Default 3)
2371 nat_ip = self.nat_addr
2372 self.assertEqual(packet_num, len(capture))
2373 for packet in capture:
2375 self.assertEqual(packet[IP].src, nat_ip)
2376 if packet.haslayer(TCP):
2377 self.tcp_port_out = packet[TCP].sport
2378 elif packet.haslayer(UDP):
2379 self.udp_port_out = packet[UDP].sport
2381 self.icmp_external_id = packet[ICMP].id
2383 self.logger.error(ppp("Unexpected or invalid packet "
2384 "(outside network):", packet))
2387 def initiate_tcp_session(self, in_if, out_if):
2389 Initiates TCP session
2391 :param in_if: Inside interface
2392 :param out_if: Outside interface
2395 # SYN packet in->out
2396 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2397 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2398 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2401 self.pg_enable_capture(self.pg_interfaces)
2403 capture = out_if.get_capture(1)
2405 self.tcp_port_out = p[TCP].sport
2407 # SYN + ACK packet out->in
2408 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2409 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
2410 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2412 out_if.add_stream(p)
2413 self.pg_enable_capture(self.pg_interfaces)
2415 in_if.get_capture(1)
2417 # ACK packet in->out
2418 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2419 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2420 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2423 self.pg_enable_capture(self.pg_interfaces)
2425 out_if.get_capture(1)
2428 self.logger.error("TCP 3 way handshake failed")
2431 def verify_ipfix_max_entries_per_user(self, data):
2433 Verify IPFIX maximum entries per user exceeded event
2435 :param data: Decoded IPFIX data records
2437 self.assertEqual(1, len(data))
2440 self.assertEqual(ord(record[230]), 13)
2441 # natQuotaExceededEvent
2442 self.assertEqual('\x03\x00\x00\x00', record[466])
2444 self.assertEqual(self.pg0.remote_ip4n, record[8])
2446 def test_deterministic_mode(self):
2447 """ NAT plugin run deterministic mode """
2448 in_addr = '172.16.255.0'
2449 out_addr = '172.17.255.50'
2450 in_addr_t = '172.16.255.20'
2451 in_addr_n = socket.inet_aton(in_addr)
2452 out_addr_n = socket.inet_aton(out_addr)
2453 in_addr_t_n = socket.inet_aton(in_addr_t)
2457 nat_config = self.vapi.nat_show_config()
2458 self.assertEqual(1, nat_config.deterministic)
2460 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
2462 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
2463 self.assertEqual(rep1.out_addr[:4], out_addr_n)
2464 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
2465 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2467 deterministic_mappings = self.vapi.nat_det_map_dump()
2468 self.assertEqual(len(deterministic_mappings), 1)
2469 dsm = deterministic_mappings[0]
2470 self.assertEqual(in_addr_n, dsm.in_addr[:4])
2471 self.assertEqual(in_plen, dsm.in_plen)
2472 self.assertEqual(out_addr_n, dsm.out_addr[:4])
2473 self.assertEqual(out_plen, dsm.out_plen)
2475 self.clear_nat_det()
2476 deterministic_mappings = self.vapi.nat_det_map_dump()
2477 self.assertEqual(len(deterministic_mappings), 0)
2479 def test_set_timeouts(self):
2480 """ Set deterministic NAT timeouts """
2481 timeouts_before = self.vapi.nat_det_get_timeouts()
2483 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
2484 timeouts_before.tcp_established + 10,
2485 timeouts_before.tcp_transitory + 10,
2486 timeouts_before.icmp + 10)
2488 timeouts_after = self.vapi.nat_det_get_timeouts()
2490 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
2491 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
2492 self.assertNotEqual(timeouts_before.tcp_established,
2493 timeouts_after.tcp_established)
2494 self.assertNotEqual(timeouts_before.tcp_transitory,
2495 timeouts_after.tcp_transitory)
2497 def test_det_in(self):
2498 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
2500 nat_ip = "10.0.0.10"
2502 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2504 socket.inet_aton(nat_ip),
2506 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2507 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2511 pkts = self.create_stream_in(self.pg0, self.pg1)
2512 self.pg0.add_stream(pkts)
2513 self.pg_enable_capture(self.pg_interfaces)
2515 capture = self.pg1.get_capture(len(pkts))
2516 self.verify_capture_out(capture, nat_ip)
2519 pkts = self.create_stream_out(self.pg1, nat_ip)
2520 self.pg1.add_stream(pkts)
2521 self.pg_enable_capture(self.pg_interfaces)
2523 capture = self.pg0.get_capture(len(pkts))
2524 self.verify_capture_in(capture, self.pg0)
2527 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
2528 self.assertEqual(len(sessions), 3)
2532 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2533 self.assertEqual(s.in_port, self.tcp_port_in)
2534 self.assertEqual(s.out_port, self.tcp_port_out)
2535 self.assertEqual(s.ext_port, self.tcp_external_port)
2539 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2540 self.assertEqual(s.in_port, self.udp_port_in)
2541 self.assertEqual(s.out_port, self.udp_port_out)
2542 self.assertEqual(s.ext_port, self.udp_external_port)
2546 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2547 self.assertEqual(s.in_port, self.icmp_id_in)
2548 self.assertEqual(s.out_port, self.icmp_external_id)
2550 def test_multiple_users(self):
2551 """ Deterministic NAT multiple users """
2553 nat_ip = "10.0.0.10"
2555 external_port = 6303
2557 host0 = self.pg0.remote_hosts[0]
2558 host1 = self.pg0.remote_hosts[1]
2560 self.vapi.nat_det_add_del_map(host0.ip4n,
2562 socket.inet_aton(nat_ip),
2564 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2565 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2569 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2570 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2571 TCP(sport=port_in, dport=external_port))
2572 self.pg0.add_stream(p)
2573 self.pg_enable_capture(self.pg_interfaces)
2575 capture = self.pg1.get_capture(1)
2580 self.assertEqual(ip.src, nat_ip)
2581 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2582 self.assertEqual(tcp.dport, external_port)
2583 port_out0 = tcp.sport
2585 self.logger.error(ppp("Unexpected or invalid packet:", p))
2589 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2590 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2591 TCP(sport=port_in, dport=external_port))
2592 self.pg0.add_stream(p)
2593 self.pg_enable_capture(self.pg_interfaces)
2595 capture = self.pg1.get_capture(1)
2600 self.assertEqual(ip.src, nat_ip)
2601 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2602 self.assertEqual(tcp.dport, external_port)
2603 port_out1 = tcp.sport
2605 self.logger.error(ppp("Unexpected or invalid packet:", p))
2608 dms = self.vapi.nat_det_map_dump()
2609 self.assertEqual(1, len(dms))
2610 self.assertEqual(2, dms[0].ses_num)
2613 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2614 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2615 TCP(sport=external_port, dport=port_out0))
2616 self.pg1.add_stream(p)
2617 self.pg_enable_capture(self.pg_interfaces)
2619 capture = self.pg0.get_capture(1)
2624 self.assertEqual(ip.src, self.pg1.remote_ip4)
2625 self.assertEqual(ip.dst, host0.ip4)
2626 self.assertEqual(tcp.dport, port_in)
2627 self.assertEqual(tcp.sport, external_port)
2629 self.logger.error(ppp("Unexpected or invalid packet:", p))
2633 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2634 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2635 TCP(sport=external_port, dport=port_out1))
2636 self.pg1.add_stream(p)
2637 self.pg_enable_capture(self.pg_interfaces)
2639 capture = self.pg0.get_capture(1)
2644 self.assertEqual(ip.src, self.pg1.remote_ip4)
2645 self.assertEqual(ip.dst, host1.ip4)
2646 self.assertEqual(tcp.dport, port_in)
2647 self.assertEqual(tcp.sport, external_port)
2649 self.logger.error(ppp("Unexpected or invalid packet", p))
2652 # session close api test
2653 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
2655 self.pg1.remote_ip4n,
2657 dms = self.vapi.nat_det_map_dump()
2658 self.assertEqual(dms[0].ses_num, 1)
2660 self.vapi.nat_det_close_session_in(host0.ip4n,
2662 self.pg1.remote_ip4n,
2664 dms = self.vapi.nat_det_map_dump()
2665 self.assertEqual(dms[0].ses_num, 0)
2667 def test_tcp_session_close_detection_in(self):
2668 """ Deterministic NAT TCP session close from inside network """
2669 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2671 socket.inet_aton(self.nat_addr),
2673 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2674 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2677 self.initiate_tcp_session(self.pg0, self.pg1)
2679 # close the session from inside
2681 # FIN packet in -> out
2682 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2683 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2684 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2686 self.pg0.add_stream(p)
2687 self.pg_enable_capture(self.pg_interfaces)
2689 self.pg1.get_capture(1)
2693 # ACK packet out -> in
2694 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2695 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2696 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2700 # FIN packet out -> in
2701 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2702 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2703 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2707 self.pg1.add_stream(pkts)
2708 self.pg_enable_capture(self.pg_interfaces)
2710 self.pg0.get_capture(2)
2712 # ACK packet in -> out
2713 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2714 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2715 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2717 self.pg0.add_stream(p)
2718 self.pg_enable_capture(self.pg_interfaces)
2720 self.pg1.get_capture(1)
2722 # Check if deterministic NAT44 closed the session
2723 dms = self.vapi.nat_det_map_dump()
2724 self.assertEqual(0, dms[0].ses_num)
2726 self.logger.error("TCP session termination failed")
2729 def test_tcp_session_close_detection_out(self):
2730 """ Deterministic NAT TCP session close from outside network """
2731 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2733 socket.inet_aton(self.nat_addr),
2735 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2736 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2739 self.initiate_tcp_session(self.pg0, self.pg1)
2741 # close the session from outside
2743 # FIN packet out -> in
2744 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2745 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2746 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2748 self.pg1.add_stream(p)
2749 self.pg_enable_capture(self.pg_interfaces)
2751 self.pg0.get_capture(1)
2755 # ACK packet in -> out
2756 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2757 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2758 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2762 # ACK packet in -> out
2763 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2764 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2765 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2769 self.pg0.add_stream(pkts)
2770 self.pg_enable_capture(self.pg_interfaces)
2772 self.pg1.get_capture(2)
2774 # ACK packet out -> in
2775 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2776 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2777 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2779 self.pg1.add_stream(p)
2780 self.pg_enable_capture(self.pg_interfaces)
2782 self.pg0.get_capture(1)
2784 # Check if deterministic NAT44 closed the session
2785 dms = self.vapi.nat_det_map_dump()
2786 self.assertEqual(0, dms[0].ses_num)
2788 self.logger.error("TCP session termination failed")
2791 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2792 def test_session_timeout(self):
2793 """ Deterministic NAT session timeouts """
2794 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2796 socket.inet_aton(self.nat_addr),
2798 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2799 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2802 self.initiate_tcp_session(self.pg0, self.pg1)
2803 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
2804 pkts = self.create_stream_in(self.pg0, self.pg1)
2805 self.pg0.add_stream(pkts)
2806 self.pg_enable_capture(self.pg_interfaces)
2808 capture = self.pg1.get_capture(len(pkts))
2811 dms = self.vapi.nat_det_map_dump()
2812 self.assertEqual(0, dms[0].ses_num)
2814 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2815 def test_session_limit_per_user(self):
2816 """ Deterministic NAT maximum sessions per user limit """
2817 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2819 socket.inet_aton(self.nat_addr),
2821 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2822 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2824 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2825 src_address=self.pg2.local_ip4n,
2827 template_interval=10)
2828 self.vapi.nat_ipfix()
2831 for port in range(1025, 2025):
2832 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2833 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2834 UDP(sport=port, dport=port))
2837 self.pg0.add_stream(pkts)
2838 self.pg_enable_capture(self.pg_interfaces)
2840 capture = self.pg1.get_capture(len(pkts))
2842 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2843 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2844 UDP(sport=3001, dport=3002))
2845 self.pg0.add_stream(p)
2846 self.pg_enable_capture(self.pg_interfaces)
2848 capture = self.pg1.assert_nothing_captured()
2850 # verify ICMP error packet
2851 capture = self.pg0.get_capture(1)
2853 self.assertTrue(p.haslayer(ICMP))
2855 self.assertEqual(icmp.type, 3)
2856 self.assertEqual(icmp.code, 1)
2857 self.assertTrue(icmp.haslayer(IPerror))
2858 inner_ip = icmp[IPerror]
2859 self.assertEqual(inner_ip[UDPerror].sport, 3001)
2860 self.assertEqual(inner_ip[UDPerror].dport, 3002)
2862 dms = self.vapi.nat_det_map_dump()
2864 self.assertEqual(1000, dms[0].ses_num)
2866 # verify IPFIX logging
2867 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2869 capture = self.pg2.get_capture(2)
2870 ipfix = IPFIXDecoder()
2871 # first load template
2873 self.assertTrue(p.haslayer(IPFIX))
2874 if p.haslayer(Template):
2875 ipfix.add_template(p.getlayer(Template))
2876 # verify events in data set
2878 if p.haslayer(Data):
2879 data = ipfix.decode_data_set(p.getlayer(Set))
2880 self.verify_ipfix_max_entries_per_user(data)
2882 def clear_nat_det(self):
2884 Clear deterministic NAT configuration.
2886 self.vapi.nat_ipfix(enable=0)
2887 self.vapi.nat_det_set_timeouts()
2888 deterministic_mappings = self.vapi.nat_det_map_dump()
2889 for dsm in deterministic_mappings:
2890 self.vapi.nat_det_add_del_map(dsm.in_addr,
2896 interfaces = self.vapi.nat44_interface_dump()
2897 for intf in interfaces:
2898 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
2903 super(TestDeterministicNAT, self).tearDown()
2904 if not self.vpp_dead:
2905 self.logger.info(self.vapi.cli("show nat44 detail"))
2906 self.clear_nat_det()
2909 class TestNAT64(MethodHolder):
2910 """ NAT64 Test Cases """
2913 def setUpClass(cls):
2914 super(TestNAT64, cls).setUpClass()
2917 cls.tcp_port_in = 6303
2918 cls.tcp_port_out = 6303
2919 cls.udp_port_in = 6304
2920 cls.udp_port_out = 6304
2921 cls.icmp_id_in = 6305
2922 cls.icmp_id_out = 6305
2923 cls.nat_addr = '10.0.0.3'
2924 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
2926 cls.vrf1_nat_addr = '10.0.10.3'
2927 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
2930 cls.create_pg_interfaces(range(3))
2931 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
2932 cls.ip6_interfaces.append(cls.pg_interfaces[2])
2933 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
2935 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
2937 cls.pg0.generate_remote_hosts(2)
2939 for i in cls.ip6_interfaces:
2942 i.configure_ipv6_neighbors()
2944 for i in cls.ip4_interfaces:
2950 super(TestNAT64, cls).tearDownClass()
2953 def test_pool(self):
2954 """ Add/delete address to NAT64 pool """
2955 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
2957 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
2959 addresses = self.vapi.nat64_pool_addr_dump()
2960 self.assertEqual(len(addresses), 1)
2961 self.assertEqual(addresses[0].address, nat_addr)
2963 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
2965 addresses = self.vapi.nat64_pool_addr_dump()
2966 self.assertEqual(len(addresses), 0)
2968 def test_interface(self):
2969 """ Enable/disable NAT64 feature on the interface """
2970 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2971 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2973 interfaces = self.vapi.nat64_interface_dump()
2974 self.assertEqual(len(interfaces), 2)
2977 for intf in interfaces:
2978 if intf.sw_if_index == self.pg0.sw_if_index:
2979 self.assertEqual(intf.is_inside, 1)
2981 elif intf.sw_if_index == self.pg1.sw_if_index:
2982 self.assertEqual(intf.is_inside, 0)
2984 self.assertTrue(pg0_found)
2985 self.assertTrue(pg1_found)
2987 features = self.vapi.cli("show interface features pg0")
2988 self.assertNotEqual(features.find('nat64-in2out'), -1)
2989 features = self.vapi.cli("show interface features pg1")
2990 self.assertNotEqual(features.find('nat64-out2in'), -1)
2992 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
2993 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
2995 interfaces = self.vapi.nat64_interface_dump()
2996 self.assertEqual(len(interfaces), 0)
2998 def test_static_bib(self):
2999 """ Add/delete static BIB entry """
3000 in_addr = socket.inet_pton(socket.AF_INET6,
3001 '2001:db8:85a3::8a2e:370:7334')
3002 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
3005 proto = IP_PROTOS.tcp
3007 self.vapi.nat64_add_del_static_bib(in_addr,
3012 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3017 self.assertEqual(bibe.i_addr, in_addr)
3018 self.assertEqual(bibe.o_addr, out_addr)
3019 self.assertEqual(bibe.i_port, in_port)
3020 self.assertEqual(bibe.o_port, out_port)
3021 self.assertEqual(static_bib_num, 1)
3023 self.vapi.nat64_add_del_static_bib(in_addr,
3029 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3034 self.assertEqual(static_bib_num, 0)
3036 def test_set_timeouts(self):
3037 """ Set NAT64 timeouts """
3038 # verify default values
3039 timeouts = self.vapi.nat64_get_timeouts()
3040 self.assertEqual(timeouts.udp, 300)
3041 self.assertEqual(timeouts.icmp, 60)
3042 self.assertEqual(timeouts.tcp_trans, 240)
3043 self.assertEqual(timeouts.tcp_est, 7440)
3044 self.assertEqual(timeouts.tcp_incoming_syn, 6)
3046 # set and verify custom values
3047 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
3048 tcp_est=7450, tcp_incoming_syn=10)
3049 timeouts = self.vapi.nat64_get_timeouts()
3050 self.assertEqual(timeouts.udp, 200)
3051 self.assertEqual(timeouts.icmp, 30)
3052 self.assertEqual(timeouts.tcp_trans, 250)
3053 self.assertEqual(timeouts.tcp_est, 7450)
3054 self.assertEqual(timeouts.tcp_incoming_syn, 10)
3056 def test_dynamic(self):
3057 """ NAT64 dynamic translation test """
3058 self.tcp_port_in = 6303
3059 self.udp_port_in = 6304
3060 self.icmp_id_in = 6305
3062 ses_num_start = self.nat64_get_ses_num()
3064 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3066 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3067 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3070 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3071 self.pg0.add_stream(pkts)
3072 self.pg_enable_capture(self.pg_interfaces)
3074 capture = self.pg1.get_capture(len(pkts))
3075 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3076 dst_ip=self.pg1.remote_ip4)
3079 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3080 self.pg1.add_stream(pkts)
3081 self.pg_enable_capture(self.pg_interfaces)
3083 capture = self.pg0.get_capture(len(pkts))
3084 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3085 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3088 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3089 self.pg0.add_stream(pkts)
3090 self.pg_enable_capture(self.pg_interfaces)
3092 capture = self.pg1.get_capture(len(pkts))
3093 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3094 dst_ip=self.pg1.remote_ip4)
3097 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3098 self.pg1.add_stream(pkts)
3099 self.pg_enable_capture(self.pg_interfaces)
3101 capture = self.pg0.get_capture(len(pkts))
3102 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3104 ses_num_end = self.nat64_get_ses_num()
3106 self.assertEqual(ses_num_end - ses_num_start, 3)
3108 # tenant with specific VRF
3109 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3110 self.vrf1_nat_addr_n,
3111 vrf_id=self.vrf1_id)
3112 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3114 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
3115 self.pg2.add_stream(pkts)
3116 self.pg_enable_capture(self.pg_interfaces)
3118 capture = self.pg1.get_capture(len(pkts))
3119 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3120 dst_ip=self.pg1.remote_ip4)
3122 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3123 self.pg1.add_stream(pkts)
3124 self.pg_enable_capture(self.pg_interfaces)
3126 capture = self.pg2.get_capture(len(pkts))
3127 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
3129 def test_static(self):
3130 """ NAT64 static translation test """
3131 self.tcp_port_in = 60303
3132 self.udp_port_in = 60304
3133 self.icmp_id_in = 60305
3134 self.tcp_port_out = 60303
3135 self.udp_port_out = 60304
3136 self.icmp_id_out = 60305
3138 ses_num_start = self.nat64_get_ses_num()
3140 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3142 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3143 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3145 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3150 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3155 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3162 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3163 self.pg0.add_stream(pkts)
3164 self.pg_enable_capture(self.pg_interfaces)
3166 capture = self.pg1.get_capture(len(pkts))
3167 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3168 dst_ip=self.pg1.remote_ip4, same_port=True)
3171 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3172 self.pg1.add_stream(pkts)
3173 self.pg_enable_capture(self.pg_interfaces)
3175 capture = self.pg0.get_capture(len(pkts))
3176 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3177 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3179 ses_num_end = self.nat64_get_ses_num()
3181 self.assertEqual(ses_num_end - ses_num_start, 3)
3183 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3184 def test_session_timeout(self):
3185 """ NAT64 session timeout """
3186 self.icmp_id_in = 1234
3187 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3189 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3190 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3191 self.vapi.nat64_set_timeouts(icmp=5)
3193 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3194 self.pg0.add_stream(pkts)
3195 self.pg_enable_capture(self.pg_interfaces)
3197 capture = self.pg1.get_capture(len(pkts))
3199 ses_num_before_timeout = self.nat64_get_ses_num()
3203 # ICMP session after timeout
3204 ses_num_after_timeout = self.nat64_get_ses_num()
3205 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
3207 def test_icmp_error(self):
3208 """ NAT64 ICMP Error message translation """
3209 self.tcp_port_in = 6303
3210 self.udp_port_in = 6304
3211 self.icmp_id_in = 6305
3213 ses_num_start = self.nat64_get_ses_num()
3215 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3217 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3218 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3220 # send some packets to create sessions
3221 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3222 self.pg0.add_stream(pkts)
3223 self.pg_enable_capture(self.pg_interfaces)
3225 capture_ip4 = self.pg1.get_capture(len(pkts))
3226 self.verify_capture_out(capture_ip4,
3227 nat_ip=self.nat_addr,
3228 dst_ip=self.pg1.remote_ip4)
3230 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3231 self.pg1.add_stream(pkts)
3232 self.pg_enable_capture(self.pg_interfaces)
3234 capture_ip6 = self.pg0.get_capture(len(pkts))
3235 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3236 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
3237 self.pg0.remote_ip6)
3240 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3241 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
3242 ICMPv6DestUnreach(code=1) /
3243 packet[IPv6] for packet in capture_ip6]
3244 self.pg0.add_stream(pkts)
3245 self.pg_enable_capture(self.pg_interfaces)
3247 capture = self.pg1.get_capture(len(pkts))
3248 for packet in capture:
3250 self.assertEqual(packet[IP].src, self.nat_addr)
3251 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3252 self.assertEqual(packet[ICMP].type, 3)
3253 self.assertEqual(packet[ICMP].code, 13)
3254 inner = packet[IPerror]
3255 self.assertEqual(inner.src, self.pg1.remote_ip4)
3256 self.assertEqual(inner.dst, self.nat_addr)
3257 self.check_icmp_checksum(packet)
3258 if inner.haslayer(TCPerror):
3259 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
3260 elif inner.haslayer(UDPerror):
3261 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
3263 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
3265 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3269 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3270 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3271 ICMP(type=3, code=13) /
3272 packet[IP] for packet in capture_ip4]
3273 self.pg1.add_stream(pkts)
3274 self.pg_enable_capture(self.pg_interfaces)
3276 capture = self.pg0.get_capture(len(pkts))
3277 for packet in capture:
3279 self.assertEqual(packet[IPv6].src, ip.src)
3280 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3281 icmp = packet[ICMPv6DestUnreach]
3282 self.assertEqual(icmp.code, 1)
3283 inner = icmp[IPerror6]
3284 self.assertEqual(inner.src, self.pg0.remote_ip6)
3285 self.assertEqual(inner.dst, ip.src)
3286 self.check_icmpv6_checksum(packet)
3287 if inner.haslayer(TCPerror):
3288 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
3289 elif inner.haslayer(UDPerror):
3290 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
3292 self.assertEqual(inner[ICMPv6EchoRequest].id,
3295 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3298 def test_hairpinning(self):
3299 """ NAT64 hairpinning """
3301 client = self.pg0.remote_hosts[0]
3302 server = self.pg0.remote_hosts[1]
3303 server_tcp_in_port = 22
3304 server_tcp_out_port = 4022
3305 server_udp_in_port = 23
3306 server_udp_out_port = 4023
3307 client_tcp_in_port = 1234
3308 client_udp_in_port = 1235
3309 client_tcp_out_port = 0
3310 client_udp_out_port = 0
3311 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
3312 nat_addr_ip6 = ip.src
3314 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3316 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3317 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3319 self.vapi.nat64_add_del_static_bib(server.ip6n,
3322 server_tcp_out_port,
3324 self.vapi.nat64_add_del_static_bib(server.ip6n,
3327 server_udp_out_port,
3332 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3333 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3334 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3336 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3337 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3338 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3340 self.pg0.add_stream(pkts)
3341 self.pg_enable_capture(self.pg_interfaces)
3343 capture = self.pg0.get_capture(len(pkts))
3344 for packet in capture:
3346 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3347 self.assertEqual(packet[IPv6].dst, server.ip6)
3348 if packet.haslayer(TCP):
3349 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3350 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3351 self.check_tcp_checksum(packet)
3352 client_tcp_out_port = packet[TCP].sport
3354 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3355 self.assertEqual(packet[UDP].dport, server_udp_in_port)
3356 self.check_udp_checksum(packet)
3357 client_udp_out_port = packet[UDP].sport
3359 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3364 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3365 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3366 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3368 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3369 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3370 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3372 self.pg0.add_stream(pkts)
3373 self.pg_enable_capture(self.pg_interfaces)
3375 capture = self.pg0.get_capture(len(pkts))
3376 for packet in capture:
3378 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3379 self.assertEqual(packet[IPv6].dst, client.ip6)
3380 if packet.haslayer(TCP):
3381 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3382 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3383 self.check_tcp_checksum(packet)
3385 self.assertEqual(packet[UDP].sport, server_udp_out_port)
3386 self.assertEqual(packet[UDP].dport, client_udp_in_port)
3387 self.check_udp_checksum(packet)
3389 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3394 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3395 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3396 ICMPv6DestUnreach(code=1) /
3397 packet[IPv6] for packet in capture]
3398 self.pg0.add_stream(pkts)
3399 self.pg_enable_capture(self.pg_interfaces)
3401 capture = self.pg0.get_capture(len(pkts))
3402 for packet in capture:
3404 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3405 self.assertEqual(packet[IPv6].dst, server.ip6)
3406 icmp = packet[ICMPv6DestUnreach]
3407 self.assertEqual(icmp.code, 1)
3408 inner = icmp[IPerror6]
3409 self.assertEqual(inner.src, server.ip6)
3410 self.assertEqual(inner.dst, nat_addr_ip6)
3411 self.check_icmpv6_checksum(packet)
3412 if inner.haslayer(TCPerror):
3413 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3414 self.assertEqual(inner[TCPerror].dport,
3415 client_tcp_out_port)
3417 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3418 self.assertEqual(inner[UDPerror].dport,
3419 client_udp_out_port)
3421 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3424 def test_prefix(self):
3425 """ NAT64 Network-Specific Prefix """
3427 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3429 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3430 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3431 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3432 self.vrf1_nat_addr_n,
3433 vrf_id=self.vrf1_id)
3434 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3437 global_pref64 = "2001:db8::"
3438 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
3439 global_pref64_len = 32
3440 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
3442 prefix = self.vapi.nat64_prefix_dump()
3443 self.assertEqual(len(prefix), 1)
3444 self.assertEqual(prefix[0].prefix, global_pref64_n)
3445 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
3446 self.assertEqual(prefix[0].vrf_id, 0)
3448 # Add tenant specific prefix
3449 vrf1_pref64 = "2001:db8:122:300::"
3450 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
3451 vrf1_pref64_len = 56
3452 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
3454 vrf_id=self.vrf1_id)
3455 prefix = self.vapi.nat64_prefix_dump()
3456 self.assertEqual(len(prefix), 2)
3459 pkts = self.create_stream_in_ip6(self.pg0,
3462 plen=global_pref64_len)
3463 self.pg0.add_stream(pkts)
3464 self.pg_enable_capture(self.pg_interfaces)
3466 capture = self.pg1.get_capture(len(pkts))
3467 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3468 dst_ip=self.pg1.remote_ip4)
3470 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3471 self.pg1.add_stream(pkts)
3472 self.pg_enable_capture(self.pg_interfaces)
3474 capture = self.pg0.get_capture(len(pkts))
3475 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3478 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
3480 # Tenant specific prefix
3481 pkts = self.create_stream_in_ip6(self.pg2,
3484 plen=vrf1_pref64_len)
3485 self.pg2.add_stream(pkts)
3486 self.pg_enable_capture(self.pg_interfaces)
3488 capture = self.pg1.get_capture(len(pkts))
3489 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3490 dst_ip=self.pg1.remote_ip4)
3492 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3493 self.pg1.add_stream(pkts)
3494 self.pg_enable_capture(self.pg_interfaces)
3496 capture = self.pg2.get_capture(len(pkts))
3497 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3500 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
3502 def test_unknown_proto(self):
3503 """ NAT64 translate packet with unknown protocol """
3505 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3507 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3508 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3509 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
3512 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3513 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
3514 TCP(sport=self.tcp_port_in, dport=20))
3515 self.pg0.add_stream(p)
3516 self.pg_enable_capture(self.pg_interfaces)
3518 p = self.pg1.get_capture(1)
3520 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3521 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
3523 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3524 TCP(sport=1234, dport=1234))
3525 self.pg0.add_stream(p)
3526 self.pg_enable_capture(self.pg_interfaces)
3528 p = self.pg1.get_capture(1)
3531 self.assertEqual(packet[IP].src, self.nat_addr)
3532 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3533 self.assertTrue(packet.haslayer(GRE))
3534 self.check_ip_checksum(packet)
3536 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3540 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3541 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3543 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3544 TCP(sport=1234, dport=1234))
3545 self.pg1.add_stream(p)
3546 self.pg_enable_capture(self.pg_interfaces)
3548 p = self.pg0.get_capture(1)
3551 self.assertEqual(packet[IPv6].src, remote_ip6)
3552 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3553 self.assertEqual(packet[IPv6].nh, 47)
3555 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3558 def test_hairpinning_unknown_proto(self):
3559 """ NAT64 translate packet with unknown protocol - hairpinning """
3561 client = self.pg0.remote_hosts[0]
3562 server = self.pg0.remote_hosts[1]
3563 server_tcp_in_port = 22
3564 server_tcp_out_port = 4022
3565 client_tcp_in_port = 1234
3566 client_tcp_out_port = 1235
3567 server_nat_ip = "10.0.0.100"
3568 client_nat_ip = "10.0.0.110"
3569 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
3570 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
3571 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
3572 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
3574 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
3576 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3577 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3579 self.vapi.nat64_add_del_static_bib(server.ip6n,
3582 server_tcp_out_port,
3585 self.vapi.nat64_add_del_static_bib(server.ip6n,
3591 self.vapi.nat64_add_del_static_bib(client.ip6n,
3594 client_tcp_out_port,
3598 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3599 IPv6(src=client.ip6, dst=server_nat_ip6) /
3600 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3601 self.pg0.add_stream(p)
3602 self.pg_enable_capture(self.pg_interfaces)
3604 p = self.pg0.get_capture(1)
3606 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3607 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
3609 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3610 TCP(sport=1234, dport=1234))
3611 self.pg0.add_stream(p)
3612 self.pg_enable_capture(self.pg_interfaces)
3614 p = self.pg0.get_capture(1)
3617 self.assertEqual(packet[IPv6].src, client_nat_ip6)
3618 self.assertEqual(packet[IPv6].dst, server.ip6)
3619 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3621 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3625 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3626 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
3628 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3629 TCP(sport=1234, dport=1234))
3630 self.pg0.add_stream(p)
3631 self.pg_enable_capture(self.pg_interfaces)
3633 p = self.pg0.get_capture(1)
3636 self.assertEqual(packet[IPv6].src, server_nat_ip6)
3637 self.assertEqual(packet[IPv6].dst, client.ip6)
3638 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3640 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3643 def nat64_get_ses_num(self):
3645 Return number of active NAT64 sessions.
3647 st = self.vapi.nat64_st_dump()
3650 def clear_nat64(self):
3652 Clear NAT64 configuration.
3654 self.vapi.nat64_set_timeouts()
3656 interfaces = self.vapi.nat64_interface_dump()
3657 for intf in interfaces:
3658 self.vapi.nat64_add_del_interface(intf.sw_if_index,
3662 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3665 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3673 bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
3676 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3684 bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
3687 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3695 adresses = self.vapi.nat64_pool_addr_dump()
3696 for addr in adresses:
3697 self.vapi.nat64_add_del_pool_addr_range(addr.address,
3702 prefixes = self.vapi.nat64_prefix_dump()
3703 for prefix in prefixes:
3704 self.vapi.nat64_add_del_prefix(prefix.prefix,
3706 vrf_id=prefix.vrf_id,
3710 super(TestNAT64, self).tearDown()
3711 if not self.vpp_dead:
3712 self.logger.info(self.vapi.cli("show nat64 pool"))
3713 self.logger.info(self.vapi.cli("show nat64 interfaces"))
3714 self.logger.info(self.vapi.cli("show nat64 prefix"))
3715 self.logger.info(self.vapi.cli("show nat64 bib all"))
3716 self.logger.info(self.vapi.cli("show nat64 session table all"))
3719 if __name__ == '__main__':
3720 unittest.main(testRunner=VppTestRunner)