7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
9 from scapy.layers.inet import IP, TCP, UDP, ICMP
10 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
12 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
13 from scapy.layers.l2 import Ether, ARP, GRE
14 from scapy.data import IP_PROTOS
15 from scapy.packet import bind_layers
17 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
18 from time import sleep
19 from util import ip4_range
20 from util import mactobinary
23 class MethodHolder(VppTestCase):
24 """ NAT create capture and verify method holder """
28 super(MethodHolder, cls).setUpClass()
31 super(MethodHolder, self).tearDown()
33 def check_ip_checksum(self, pkt):
35 Check IP checksum of the packet
37 :param pkt: Packet to check IP checksum
39 new = pkt.__class__(str(pkt))
41 new = new.__class__(str(new))
42 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
44 def check_tcp_checksum(self, pkt):
46 Check TCP checksum in IP packet
48 :param pkt: Packet to check TCP checksum
50 new = pkt.__class__(str(pkt))
52 new = new.__class__(str(new))
53 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
55 def check_udp_checksum(self, pkt):
57 Check UDP checksum in IP packet
59 :param pkt: Packet to check UDP checksum
61 new = pkt.__class__(str(pkt))
63 new = new.__class__(str(new))
64 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
66 def check_icmp_errror_embedded(self, pkt):
68 Check ICMP error embeded packet checksum
70 :param pkt: Packet to check ICMP error embeded packet checksum
72 if pkt.haslayer(IPerror):
73 new = pkt.__class__(str(pkt))
74 del new['IPerror'].chksum
75 new = new.__class__(str(new))
76 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
78 if pkt.haslayer(TCPerror):
79 new = pkt.__class__(str(pkt))
80 del new['TCPerror'].chksum
81 new = new.__class__(str(new))
82 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
84 if pkt.haslayer(UDPerror):
85 if pkt['UDPerror'].chksum != 0:
86 new = pkt.__class__(str(pkt))
87 del new['UDPerror'].chksum
88 new = new.__class__(str(new))
89 self.assertEqual(new['UDPerror'].chksum,
90 pkt['UDPerror'].chksum)
92 if pkt.haslayer(ICMPerror):
93 del new['ICMPerror'].chksum
94 new = new.__class__(str(new))
95 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
97 def check_icmp_checksum(self, pkt):
99 Check ICMP checksum in IPv4 packet
101 :param pkt: Packet to check ICMP checksum
103 new = pkt.__class__(str(pkt))
104 del new['ICMP'].chksum
105 new = new.__class__(str(new))
106 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
107 if pkt.haslayer(IPerror):
108 self.check_icmp_errror_embedded(pkt)
110 def check_icmpv6_checksum(self, pkt):
112 Check ICMPv6 checksum in IPv4 packet
114 :param pkt: Packet to check ICMPv6 checksum
116 new = pkt.__class__(str(pkt))
117 if pkt.haslayer(ICMPv6DestUnreach):
118 del new['ICMPv6DestUnreach'].cksum
119 new = new.__class__(str(new))
120 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
121 pkt['ICMPv6DestUnreach'].cksum)
122 self.check_icmp_errror_embedded(pkt)
123 if pkt.haslayer(ICMPv6EchoRequest):
124 del new['ICMPv6EchoRequest'].cksum
125 new = new.__class__(str(new))
126 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
127 pkt['ICMPv6EchoRequest'].cksum)
128 if pkt.haslayer(ICMPv6EchoReply):
129 del new['ICMPv6EchoReply'].cksum
130 new = new.__class__(str(new))
131 self.assertEqual(new['ICMPv6EchoReply'].cksum,
132 pkt['ICMPv6EchoReply'].cksum)
134 def create_stream_in(self, in_if, out_if, ttl=64):
136 Create packet stream for inside network
138 :param in_if: Inside interface
139 :param out_if: Outside interface
140 :param ttl: TTL of generated packets
144 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
145 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
146 TCP(sport=self.tcp_port_in, dport=20))
150 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
151 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
152 UDP(sport=self.udp_port_in, dport=20))
156 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
157 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
158 ICMP(id=self.icmp_id_in, type='echo-request'))
163 def compose_ip6(self, ip4, pref, plen):
165 Compose IPv4-embedded IPv6 addresses
167 :param ip4: IPv4 address
168 :param pref: IPv6 prefix
169 :param plen: IPv6 prefix length
170 :returns: IPv4-embedded IPv6 addresses
172 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
173 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
188 pref_n[10] = ip4_n[3]
192 pref_n[10] = ip4_n[2]
193 pref_n[11] = ip4_n[3]
196 pref_n[10] = ip4_n[1]
197 pref_n[11] = ip4_n[2]
198 pref_n[12] = ip4_n[3]
200 pref_n[12] = ip4_n[0]
201 pref_n[13] = ip4_n[1]
202 pref_n[14] = ip4_n[2]
203 pref_n[15] = ip4_n[3]
204 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
206 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
208 Create IPv6 packet stream for inside network
210 :param in_if: Inside interface
211 :param out_if: Outside interface
212 :param ttl: Hop Limit of generated packets
213 :param pref: NAT64 prefix
214 :param plen: NAT64 prefix length
218 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
220 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
223 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
224 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
225 TCP(sport=self.tcp_port_in, dport=20))
229 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
230 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
231 UDP(sport=self.udp_port_in, dport=20))
235 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
236 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
237 ICMPv6EchoRequest(id=self.icmp_id_in))
242 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
244 Create packet stream for outside network
246 :param out_if: Outside interface
247 :param dst_ip: Destination IP address (Default use global NAT address)
248 :param ttl: TTL of generated packets
251 dst_ip = self.nat_addr
254 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
255 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
256 TCP(dport=self.tcp_port_out, sport=20))
260 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
261 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
262 UDP(dport=self.udp_port_out, sport=20))
266 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
267 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
268 ICMP(id=self.icmp_id_out, type='echo-reply'))
273 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
274 packet_num=3, dst_ip=None):
276 Verify captured packets on outside network
278 :param capture: Captured packets
279 :param nat_ip: Translated IP address (Default use global NAT address)
280 :param same_port: Sorce port number is not translated (Default False)
281 :param packet_num: Expected number of packets (Default 3)
282 :param dst_ip: Destination IP address (Default do not verify)
285 nat_ip = self.nat_addr
286 self.assertEqual(packet_num, len(capture))
287 for packet in capture:
289 self.check_ip_checksum(packet)
290 self.assertEqual(packet[IP].src, nat_ip)
291 if dst_ip is not None:
292 self.assertEqual(packet[IP].dst, dst_ip)
293 if packet.haslayer(TCP):
295 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
298 packet[TCP].sport, self.tcp_port_in)
299 self.tcp_port_out = packet[TCP].sport
300 self.check_tcp_checksum(packet)
301 elif packet.haslayer(UDP):
303 self.assertEqual(packet[UDP].sport, self.udp_port_in)
306 packet[UDP].sport, self.udp_port_in)
307 self.udp_port_out = packet[UDP].sport
310 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
312 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
313 self.icmp_id_out = packet[ICMP].id
314 self.check_icmp_checksum(packet)
316 self.logger.error(ppp("Unexpected or invalid packet "
317 "(outside network):", packet))
320 def verify_capture_in(self, capture, in_if, packet_num=3):
322 Verify captured packets on inside network
324 :param capture: Captured packets
325 :param in_if: Inside interface
326 :param packet_num: Expected number of packets (Default 3)
328 self.assertEqual(packet_num, len(capture))
329 for packet in capture:
331 self.check_ip_checksum(packet)
332 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
333 if packet.haslayer(TCP):
334 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
335 self.check_tcp_checksum(packet)
336 elif packet.haslayer(UDP):
337 self.assertEqual(packet[UDP].dport, self.udp_port_in)
339 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
340 self.check_icmp_checksum(packet)
342 self.logger.error(ppp("Unexpected or invalid packet "
343 "(inside network):", packet))
346 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
348 Verify captured IPv6 packets on inside network
350 :param capture: Captured packets
351 :param src_ip: Source IP
352 :param dst_ip: Destination IP address
353 :param packet_num: Expected number of packets (Default 3)
355 self.assertEqual(packet_num, len(capture))
356 for packet in capture:
358 self.assertEqual(packet[IPv6].src, src_ip)
359 self.assertEqual(packet[IPv6].dst, dst_ip)
360 if packet.haslayer(TCP):
361 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
362 self.check_tcp_checksum(packet)
363 elif packet.haslayer(UDP):
364 self.assertEqual(packet[UDP].dport, self.udp_port_in)
365 self.check_udp_checksum(packet)
367 self.assertEqual(packet[ICMPv6EchoReply].id,
369 self.check_icmpv6_checksum(packet)
371 self.logger.error(ppp("Unexpected or invalid packet "
372 "(inside network):", packet))
375 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
377 Verify captured packet that don't have to be translated
379 :param capture: Captured packets
380 :param ingress_if: Ingress interface
381 :param egress_if: Egress interface
383 for packet in capture:
385 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
386 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
387 if packet.haslayer(TCP):
388 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
389 elif packet.haslayer(UDP):
390 self.assertEqual(packet[UDP].sport, self.udp_port_in)
392 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
394 self.logger.error(ppp("Unexpected or invalid packet "
395 "(inside network):", packet))
398 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
399 packet_num=3, icmp_type=11):
401 Verify captured packets with ICMP errors on outside network
403 :param capture: Captured packets
404 :param src_ip: Translated IP address or IP address of VPP
405 (Default use global NAT address)
406 :param packet_num: Expected number of packets (Default 3)
407 :param icmp_type: Type of error ICMP packet
408 we are expecting (Default 11)
411 src_ip = self.nat_addr
412 self.assertEqual(packet_num, len(capture))
413 for packet in capture:
415 self.assertEqual(packet[IP].src, src_ip)
416 self.assertTrue(packet.haslayer(ICMP))
418 self.assertEqual(icmp.type, icmp_type)
419 self.assertTrue(icmp.haslayer(IPerror))
420 inner_ip = icmp[IPerror]
421 if inner_ip.haslayer(TCPerror):
422 self.assertEqual(inner_ip[TCPerror].dport,
424 elif inner_ip.haslayer(UDPerror):
425 self.assertEqual(inner_ip[UDPerror].dport,
428 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
430 self.logger.error(ppp("Unexpected or invalid packet "
431 "(outside network):", packet))
434 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
437 Verify captured packets with ICMP errors on inside network
439 :param capture: Captured packets
440 :param in_if: Inside interface
441 :param packet_num: Expected number of packets (Default 3)
442 :param icmp_type: Type of error ICMP packet
443 we are expecting (Default 11)
445 self.assertEqual(packet_num, len(capture))
446 for packet in capture:
448 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
449 self.assertTrue(packet.haslayer(ICMP))
451 self.assertEqual(icmp.type, icmp_type)
452 self.assertTrue(icmp.haslayer(IPerror))
453 inner_ip = icmp[IPerror]
454 if inner_ip.haslayer(TCPerror):
455 self.assertEqual(inner_ip[TCPerror].sport,
457 elif inner_ip.haslayer(UDPerror):
458 self.assertEqual(inner_ip[UDPerror].sport,
461 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
463 self.logger.error(ppp("Unexpected or invalid packet "
464 "(inside network):", packet))
467 def verify_ipfix_nat44_ses(self, data):
469 Verify IPFIX NAT44 session create/delete event
471 :param data: Decoded IPFIX data records
473 nat44_ses_create_num = 0
474 nat44_ses_delete_num = 0
475 self.assertEqual(6, len(data))
478 self.assertIn(ord(record[230]), [4, 5])
479 if ord(record[230]) == 4:
480 nat44_ses_create_num += 1
482 nat44_ses_delete_num += 1
484 self.assertEqual(self.pg0.remote_ip4n, record[8])
485 # postNATSourceIPv4Address
486 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
489 self.assertEqual(struct.pack("!I", 0), record[234])
490 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
491 if IP_PROTOS.icmp == ord(record[4]):
492 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
493 self.assertEqual(struct.pack("!H", self.icmp_id_out),
495 elif IP_PROTOS.tcp == ord(record[4]):
496 self.assertEqual(struct.pack("!H", self.tcp_port_in),
498 self.assertEqual(struct.pack("!H", self.tcp_port_out),
500 elif IP_PROTOS.udp == ord(record[4]):
501 self.assertEqual(struct.pack("!H", self.udp_port_in),
503 self.assertEqual(struct.pack("!H", self.udp_port_out),
506 self.fail("Invalid protocol")
507 self.assertEqual(3, nat44_ses_create_num)
508 self.assertEqual(3, nat44_ses_delete_num)
510 def verify_ipfix_addr_exhausted(self, data):
512 Verify IPFIX NAT addresses event
514 :param data: Decoded IPFIX data records
516 self.assertEqual(1, len(data))
519 self.assertEqual(ord(record[230]), 3)
521 self.assertEqual(struct.pack("!I", 0), record[283])
524 class TestNAT44(MethodHolder):
525 """ NAT44 Test Cases """
529 super(TestNAT44, cls).setUpClass()
532 cls.tcp_port_in = 6303
533 cls.tcp_port_out = 6303
534 cls.udp_port_in = 6304
535 cls.udp_port_out = 6304
536 cls.icmp_id_in = 6305
537 cls.icmp_id_out = 6305
538 cls.nat_addr = '10.0.0.3'
539 cls.ipfix_src_port = 4739
540 cls.ipfix_domain_id = 1
542 cls.create_pg_interfaces(range(10))
543 cls.interfaces = list(cls.pg_interfaces[0:4])
545 for i in cls.interfaces:
550 cls.pg0.generate_remote_hosts(3)
551 cls.pg0.configure_ipv4_neighbors()
553 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
554 cls.vapi.ip_table_add_del(10, is_add=1)
555 cls.vapi.ip_table_add_del(20, is_add=1)
557 cls.pg4._local_ip4 = "172.16.255.1"
558 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
559 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
560 cls.pg4.set_table_ip4(10)
561 cls.pg5._local_ip4 = "172.17.255.3"
562 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
563 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
564 cls.pg5.set_table_ip4(10)
565 cls.pg6._local_ip4 = "172.16.255.1"
566 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
567 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
568 cls.pg6.set_table_ip4(20)
569 for i in cls.overlapping_interfaces:
577 cls.pg9.generate_remote_hosts(2)
579 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
580 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
584 cls.pg9.resolve_arp()
585 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
586 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
587 cls.pg9.resolve_arp()
590 super(TestNAT44, cls).tearDownClass()
593 def clear_nat44(self):
595 Clear NAT44 configuration.
597 # I found no elegant way to do this
598 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
599 dst_address_length=32,
600 next_hop_address=self.pg7.remote_ip4n,
601 next_hop_sw_if_index=self.pg7.sw_if_index,
603 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
604 dst_address_length=32,
605 next_hop_address=self.pg8.remote_ip4n,
606 next_hop_sw_if_index=self.pg8.sw_if_index,
609 for intf in [self.pg7, self.pg8]:
610 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
612 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
617 if self.pg7.has_ip4_config:
618 self.pg7.unconfig_ip4()
620 interfaces = self.vapi.nat44_interface_addr_dump()
621 for intf in interfaces:
622 self.vapi.nat44_add_interface_addr(intf.sw_if_index, is_add=0)
624 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
625 domain_id=self.ipfix_domain_id)
626 self.ipfix_src_port = 4739
627 self.ipfix_domain_id = 1
629 interfaces = self.vapi.nat44_interface_dump()
630 for intf in interfaces:
631 if intf.is_inside > 1:
632 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
635 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
639 interfaces = self.vapi.nat44_interface_output_feature_dump()
640 for intf in interfaces:
641 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
645 static_mappings = self.vapi.nat44_static_mapping_dump()
646 for sm in static_mappings:
647 self.vapi.nat44_add_del_static_mapping(
649 sm.external_ip_address,
650 local_port=sm.local_port,
651 external_port=sm.external_port,
652 addr_only=sm.addr_only,
654 protocol=sm.protocol,
657 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
658 for lb_sm in lb_static_mappings:
659 self.vapi.nat44_add_del_lb_static_mapping(
668 adresses = self.vapi.nat44_address_dump()
669 for addr in adresses:
670 self.vapi.nat44_add_del_address_range(addr.ip_address,
674 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
675 local_port=0, external_port=0, vrf_id=0,
676 is_add=1, external_sw_if_index=0xFFFFFFFF,
679 Add/delete NAT44 static mapping
681 :param local_ip: Local IP address
682 :param external_ip: External IP address
683 :param local_port: Local port number (Optional)
684 :param external_port: External port number (Optional)
685 :param vrf_id: VRF ID (Default 0)
686 :param is_add: 1 if add, 0 if delete (Default add)
687 :param external_sw_if_index: External interface instead of IP address
688 :param proto: IP protocol (Mandatory if port specified)
691 if local_port and external_port:
693 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
694 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
695 self.vapi.nat44_add_del_static_mapping(
698 external_sw_if_index,
706 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
708 Add/delete NAT44 address
710 :param ip: IP address
711 :param is_add: 1 if add, 0 if delete (Default add)
713 nat_addr = socket.inet_pton(socket.AF_INET, ip)
714 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
717 def test_dynamic(self):
718 """ NAT44 dynamic translation test """
720 self.nat44_add_address(self.nat_addr)
721 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
722 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
726 pkts = self.create_stream_in(self.pg0, self.pg1)
727 self.pg0.add_stream(pkts)
728 self.pg_enable_capture(self.pg_interfaces)
730 capture = self.pg1.get_capture(len(pkts))
731 self.verify_capture_out(capture)
734 pkts = self.create_stream_out(self.pg1)
735 self.pg1.add_stream(pkts)
736 self.pg_enable_capture(self.pg_interfaces)
738 capture = self.pg0.get_capture(len(pkts))
739 self.verify_capture_in(capture, self.pg0)
741 def test_dynamic_icmp_errors_in2out_ttl_1(self):
742 """ NAT44 handling of client packets with TTL=1 """
744 self.nat44_add_address(self.nat_addr)
745 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
746 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
749 # Client side - generate traffic
750 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
751 self.pg0.add_stream(pkts)
752 self.pg_enable_capture(self.pg_interfaces)
755 # Client side - verify ICMP type 11 packets
756 capture = self.pg0.get_capture(len(pkts))
757 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
759 def test_dynamic_icmp_errors_out2in_ttl_1(self):
760 """ NAT44 handling of server packets with TTL=1 """
762 self.nat44_add_address(self.nat_addr)
763 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
764 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
767 # Client side - create sessions
768 pkts = self.create_stream_in(self.pg0, self.pg1)
769 self.pg0.add_stream(pkts)
770 self.pg_enable_capture(self.pg_interfaces)
773 # Server side - generate traffic
774 capture = self.pg1.get_capture(len(pkts))
775 self.verify_capture_out(capture)
776 pkts = self.create_stream_out(self.pg1, ttl=1)
777 self.pg1.add_stream(pkts)
778 self.pg_enable_capture(self.pg_interfaces)
781 # Server side - verify ICMP type 11 packets
782 capture = self.pg1.get_capture(len(pkts))
783 self.verify_capture_out_with_icmp_errors(capture,
784 src_ip=self.pg1.local_ip4)
786 def test_dynamic_icmp_errors_in2out_ttl_2(self):
787 """ NAT44 handling of error responses to client packets with TTL=2 """
789 self.nat44_add_address(self.nat_addr)
790 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
791 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
794 # Client side - generate traffic
795 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
796 self.pg0.add_stream(pkts)
797 self.pg_enable_capture(self.pg_interfaces)
800 # Server side - simulate ICMP type 11 response
801 capture = self.pg1.get_capture(len(pkts))
802 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
803 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
804 ICMP(type=11) / packet[IP] for packet in capture]
805 self.pg1.add_stream(pkts)
806 self.pg_enable_capture(self.pg_interfaces)
809 # Client side - verify ICMP type 11 packets
810 capture = self.pg0.get_capture(len(pkts))
811 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
813 def test_dynamic_icmp_errors_out2in_ttl_2(self):
814 """ NAT44 handling of error responses to server packets with TTL=2 """
816 self.nat44_add_address(self.nat_addr)
817 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
818 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
821 # Client side - create sessions
822 pkts = self.create_stream_in(self.pg0, self.pg1)
823 self.pg0.add_stream(pkts)
824 self.pg_enable_capture(self.pg_interfaces)
827 # Server side - generate traffic
828 capture = self.pg1.get_capture(len(pkts))
829 self.verify_capture_out(capture)
830 pkts = self.create_stream_out(self.pg1, ttl=2)
831 self.pg1.add_stream(pkts)
832 self.pg_enable_capture(self.pg_interfaces)
835 # Client side - simulate ICMP type 11 response
836 capture = self.pg0.get_capture(len(pkts))
837 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
838 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
839 ICMP(type=11) / packet[IP] for packet in capture]
840 self.pg0.add_stream(pkts)
841 self.pg_enable_capture(self.pg_interfaces)
844 # Server side - verify ICMP type 11 packets
845 capture = self.pg1.get_capture(len(pkts))
846 self.verify_capture_out_with_icmp_errors(capture)
848 def test_ping_out_interface_from_outside(self):
849 """ Ping NAT44 out interface from outside network """
851 self.nat44_add_address(self.nat_addr)
852 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
853 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
856 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
857 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
858 ICMP(id=self.icmp_id_out, type='echo-request'))
860 self.pg1.add_stream(pkts)
861 self.pg_enable_capture(self.pg_interfaces)
863 capture = self.pg1.get_capture(len(pkts))
864 self.assertEqual(1, len(capture))
867 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
868 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
869 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
870 self.assertEqual(packet[ICMP].type, 0) # echo reply
872 self.logger.error(ppp("Unexpected or invalid packet "
873 "(outside network):", packet))
876 def test_ping_internal_host_from_outside(self):
877 """ Ping internal host from outside network """
879 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
880 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
881 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
885 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
886 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
887 ICMP(id=self.icmp_id_out, type='echo-request'))
888 self.pg1.add_stream(pkt)
889 self.pg_enable_capture(self.pg_interfaces)
891 capture = self.pg0.get_capture(1)
892 self.verify_capture_in(capture, self.pg0, packet_num=1)
893 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
896 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
897 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
898 ICMP(id=self.icmp_id_in, type='echo-reply'))
899 self.pg0.add_stream(pkt)
900 self.pg_enable_capture(self.pg_interfaces)
902 capture = self.pg1.get_capture(1)
903 self.verify_capture_out(capture, same_port=True, packet_num=1)
904 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
906 def test_static_in(self):
907 """ 1:1 NAT initialized from inside network """
910 self.tcp_port_out = 6303
911 self.udp_port_out = 6304
912 self.icmp_id_out = 6305
914 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
915 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
916 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
920 pkts = self.create_stream_in(self.pg0, self.pg1)
921 self.pg0.add_stream(pkts)
922 self.pg_enable_capture(self.pg_interfaces)
924 capture = self.pg1.get_capture(len(pkts))
925 self.verify_capture_out(capture, nat_ip, True)
928 pkts = self.create_stream_out(self.pg1, nat_ip)
929 self.pg1.add_stream(pkts)
930 self.pg_enable_capture(self.pg_interfaces)
932 capture = self.pg0.get_capture(len(pkts))
933 self.verify_capture_in(capture, self.pg0)
935 def test_static_out(self):
936 """ 1:1 NAT initialized from outside network """
939 self.tcp_port_out = 6303
940 self.udp_port_out = 6304
941 self.icmp_id_out = 6305
943 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
944 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
945 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
949 pkts = self.create_stream_out(self.pg1, nat_ip)
950 self.pg1.add_stream(pkts)
951 self.pg_enable_capture(self.pg_interfaces)
953 capture = self.pg0.get_capture(len(pkts))
954 self.verify_capture_in(capture, self.pg0)
957 pkts = self.create_stream_in(self.pg0, self.pg1)
958 self.pg0.add_stream(pkts)
959 self.pg_enable_capture(self.pg_interfaces)
961 capture = self.pg1.get_capture(len(pkts))
962 self.verify_capture_out(capture, nat_ip, True)
964 def test_static_with_port_in(self):
965 """ 1:1 NAPT initialized from inside network """
967 self.tcp_port_out = 3606
968 self.udp_port_out = 3607
969 self.icmp_id_out = 3608
971 self.nat44_add_address(self.nat_addr)
972 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
973 self.tcp_port_in, self.tcp_port_out,
975 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
976 self.udp_port_in, self.udp_port_out,
978 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
979 self.icmp_id_in, self.icmp_id_out,
980 proto=IP_PROTOS.icmp)
981 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
982 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
986 pkts = self.create_stream_in(self.pg0, self.pg1)
987 self.pg0.add_stream(pkts)
988 self.pg_enable_capture(self.pg_interfaces)
990 capture = self.pg1.get_capture(len(pkts))
991 self.verify_capture_out(capture)
994 pkts = self.create_stream_out(self.pg1)
995 self.pg1.add_stream(pkts)
996 self.pg_enable_capture(self.pg_interfaces)
998 capture = self.pg0.get_capture(len(pkts))
999 self.verify_capture_in(capture, self.pg0)
1001 def test_static_with_port_out(self):
1002 """ 1:1 NAPT initialized from outside network """
1004 self.tcp_port_out = 30606
1005 self.udp_port_out = 30607
1006 self.icmp_id_out = 30608
1008 self.nat44_add_address(self.nat_addr)
1009 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1010 self.tcp_port_in, self.tcp_port_out,
1011 proto=IP_PROTOS.tcp)
1012 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1013 self.udp_port_in, self.udp_port_out,
1014 proto=IP_PROTOS.udp)
1015 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1016 self.icmp_id_in, self.icmp_id_out,
1017 proto=IP_PROTOS.icmp)
1018 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1019 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1023 pkts = self.create_stream_out(self.pg1)
1024 self.pg1.add_stream(pkts)
1025 self.pg_enable_capture(self.pg_interfaces)
1027 capture = self.pg0.get_capture(len(pkts))
1028 self.verify_capture_in(capture, self.pg0)
1031 pkts = self.create_stream_in(self.pg0, self.pg1)
1032 self.pg0.add_stream(pkts)
1033 self.pg_enable_capture(self.pg_interfaces)
1035 capture = self.pg1.get_capture(len(pkts))
1036 self.verify_capture_out(capture)
1038 def test_static_vrf_aware(self):
1039 """ 1:1 NAT VRF awareness """
1041 nat_ip1 = "10.0.0.30"
1042 nat_ip2 = "10.0.0.40"
1043 self.tcp_port_out = 6303
1044 self.udp_port_out = 6304
1045 self.icmp_id_out = 6305
1047 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1049 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1051 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1053 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1054 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1056 # inside interface VRF match NAT44 static mapping VRF
1057 pkts = self.create_stream_in(self.pg4, self.pg3)
1058 self.pg4.add_stream(pkts)
1059 self.pg_enable_capture(self.pg_interfaces)
1061 capture = self.pg3.get_capture(len(pkts))
1062 self.verify_capture_out(capture, nat_ip1, True)
1064 # inside interface VRF don't match NAT44 static mapping VRF (packets
1066 pkts = self.create_stream_in(self.pg0, self.pg3)
1067 self.pg0.add_stream(pkts)
1068 self.pg_enable_capture(self.pg_interfaces)
1070 self.pg3.assert_nothing_captured()
1072 def test_static_lb(self):
1073 """ NAT44 local service load balancing """
1074 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1077 server1 = self.pg0.remote_hosts[0]
1078 server2 = self.pg0.remote_hosts[1]
1080 locals = [{'addr': server1.ip4n,
1083 {'addr': server2.ip4n,
1087 self.nat44_add_address(self.nat_addr)
1088 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1091 local_num=len(locals),
1093 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1094 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1097 # from client to service
1098 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1099 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1100 TCP(sport=12345, dport=external_port))
1101 self.pg1.add_stream(p)
1102 self.pg_enable_capture(self.pg_interfaces)
1104 capture = self.pg0.get_capture(1)
1110 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1111 if ip.dst == server1.ip4:
1115 self.assertEqual(tcp.dport, local_port)
1116 self.check_tcp_checksum(p)
1117 self.check_ip_checksum(p)
1119 self.logger.error(ppp("Unexpected or invalid packet:", p))
1122 # from service back to client
1123 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1124 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1125 TCP(sport=local_port, dport=12345))
1126 self.pg0.add_stream(p)
1127 self.pg_enable_capture(self.pg_interfaces)
1129 capture = self.pg1.get_capture(1)
1134 self.assertEqual(ip.src, self.nat_addr)
1135 self.assertEqual(tcp.sport, external_port)
1136 self.check_tcp_checksum(p)
1137 self.check_ip_checksum(p)
1139 self.logger.error(ppp("Unexpected or invalid packet:", p))
1145 clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1147 for client in clients:
1148 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1149 IP(src=client, dst=self.nat_addr) /
1150 TCP(sport=12345, dport=external_port))
1152 self.pg1.add_stream(pkts)
1153 self.pg_enable_capture(self.pg_interfaces)
1155 capture = self.pg0.get_capture(len(pkts))
1157 if p[IP].dst == server1.ip4:
1161 self.assertTrue(server1_n > server2_n)
1163 def test_multiple_inside_interfaces(self):
1164 """ NAT44 multiple non-overlapping address space inside interfaces """
1166 self.nat44_add_address(self.nat_addr)
1167 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1168 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1169 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1172 # between two NAT44 inside interfaces (no translation)
1173 pkts = self.create_stream_in(self.pg0, self.pg1)
1174 self.pg0.add_stream(pkts)
1175 self.pg_enable_capture(self.pg_interfaces)
1177 capture = self.pg1.get_capture(len(pkts))
1178 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1180 # from NAT44 inside to interface without NAT44 feature (no translation)
1181 pkts = self.create_stream_in(self.pg0, self.pg2)
1182 self.pg0.add_stream(pkts)
1183 self.pg_enable_capture(self.pg_interfaces)
1185 capture = self.pg2.get_capture(len(pkts))
1186 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1188 # in2out 1st interface
1189 pkts = self.create_stream_in(self.pg0, self.pg3)
1190 self.pg0.add_stream(pkts)
1191 self.pg_enable_capture(self.pg_interfaces)
1193 capture = self.pg3.get_capture(len(pkts))
1194 self.verify_capture_out(capture)
1196 # out2in 1st interface
1197 pkts = self.create_stream_out(self.pg3)
1198 self.pg3.add_stream(pkts)
1199 self.pg_enable_capture(self.pg_interfaces)
1201 capture = self.pg0.get_capture(len(pkts))
1202 self.verify_capture_in(capture, self.pg0)
1204 # in2out 2nd interface
1205 pkts = self.create_stream_in(self.pg1, self.pg3)
1206 self.pg1.add_stream(pkts)
1207 self.pg_enable_capture(self.pg_interfaces)
1209 capture = self.pg3.get_capture(len(pkts))
1210 self.verify_capture_out(capture)
1212 # out2in 2nd interface
1213 pkts = self.create_stream_out(self.pg3)
1214 self.pg3.add_stream(pkts)
1215 self.pg_enable_capture(self.pg_interfaces)
1217 capture = self.pg1.get_capture(len(pkts))
1218 self.verify_capture_in(capture, self.pg1)
1220 def test_inside_overlapping_interfaces(self):
1221 """ NAT44 multiple inside interfaces with overlapping address space """
1223 static_nat_ip = "10.0.0.10"
1224 self.nat44_add_address(self.nat_addr)
1225 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1227 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1228 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1229 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1230 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1233 # between NAT44 inside interfaces with same VRF (no translation)
1234 pkts = self.create_stream_in(self.pg4, self.pg5)
1235 self.pg4.add_stream(pkts)
1236 self.pg_enable_capture(self.pg_interfaces)
1238 capture = self.pg5.get_capture(len(pkts))
1239 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1241 # between NAT44 inside interfaces with different VRF (hairpinning)
1242 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1243 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1244 TCP(sport=1234, dport=5678))
1245 self.pg4.add_stream(p)
1246 self.pg_enable_capture(self.pg_interfaces)
1248 capture = self.pg6.get_capture(1)
1253 self.assertEqual(ip.src, self.nat_addr)
1254 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1255 self.assertNotEqual(tcp.sport, 1234)
1256 self.assertEqual(tcp.dport, 5678)
1258 self.logger.error(ppp("Unexpected or invalid packet:", p))
1261 # in2out 1st interface
1262 pkts = self.create_stream_in(self.pg4, self.pg3)
1263 self.pg4.add_stream(pkts)
1264 self.pg_enable_capture(self.pg_interfaces)
1266 capture = self.pg3.get_capture(len(pkts))
1267 self.verify_capture_out(capture)
1269 # out2in 1st interface
1270 pkts = self.create_stream_out(self.pg3)
1271 self.pg3.add_stream(pkts)
1272 self.pg_enable_capture(self.pg_interfaces)
1274 capture = self.pg4.get_capture(len(pkts))
1275 self.verify_capture_in(capture, self.pg4)
1277 # in2out 2nd interface
1278 pkts = self.create_stream_in(self.pg5, self.pg3)
1279 self.pg5.add_stream(pkts)
1280 self.pg_enable_capture(self.pg_interfaces)
1282 capture = self.pg3.get_capture(len(pkts))
1283 self.verify_capture_out(capture)
1285 # out2in 2nd interface
1286 pkts = self.create_stream_out(self.pg3)
1287 self.pg3.add_stream(pkts)
1288 self.pg_enable_capture(self.pg_interfaces)
1290 capture = self.pg5.get_capture(len(pkts))
1291 self.verify_capture_in(capture, self.pg5)
1294 addresses = self.vapi.nat44_address_dump()
1295 self.assertEqual(len(addresses), 1)
1296 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1297 self.assertEqual(len(sessions), 3)
1298 for session in sessions:
1299 self.assertFalse(session.is_static)
1300 self.assertEqual(session.inside_ip_address[0:4],
1301 self.pg5.remote_ip4n)
1302 self.assertEqual(session.outside_ip_address,
1303 addresses[0].ip_address)
1304 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1305 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1306 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1307 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1308 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1309 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1310 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1311 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1312 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1314 # in2out 3rd interface
1315 pkts = self.create_stream_in(self.pg6, self.pg3)
1316 self.pg6.add_stream(pkts)
1317 self.pg_enable_capture(self.pg_interfaces)
1319 capture = self.pg3.get_capture(len(pkts))
1320 self.verify_capture_out(capture, static_nat_ip, True)
1322 # out2in 3rd interface
1323 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1324 self.pg3.add_stream(pkts)
1325 self.pg_enable_capture(self.pg_interfaces)
1327 capture = self.pg6.get_capture(len(pkts))
1328 self.verify_capture_in(capture, self.pg6)
1330 # general user and session dump verifications
1331 users = self.vapi.nat44_user_dump()
1332 self.assertTrue(len(users) >= 3)
1333 addresses = self.vapi.nat44_address_dump()
1334 self.assertEqual(len(addresses), 1)
1336 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1338 for session in sessions:
1339 self.assertEqual(user.ip_address, session.inside_ip_address)
1340 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1341 self.assertTrue(session.protocol in
1342 [IP_PROTOS.tcp, IP_PROTOS.udp,
1346 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1347 self.assertTrue(len(sessions) >= 4)
1348 for session in sessions:
1349 self.assertFalse(session.is_static)
1350 self.assertEqual(session.inside_ip_address[0:4],
1351 self.pg4.remote_ip4n)
1352 self.assertEqual(session.outside_ip_address,
1353 addresses[0].ip_address)
1356 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1357 self.assertTrue(len(sessions) >= 3)
1358 for session in sessions:
1359 self.assertTrue(session.is_static)
1360 self.assertEqual(session.inside_ip_address[0:4],
1361 self.pg6.remote_ip4n)
1362 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1363 map(int, static_nat_ip.split('.')))
1364 self.assertTrue(session.inside_port in
1365 [self.tcp_port_in, self.udp_port_in,
1368 def test_hairpinning(self):
1369 """ NAT44 hairpinning - 1:1 NAPT """
1371 host = self.pg0.remote_hosts[0]
1372 server = self.pg0.remote_hosts[1]
1375 server_in_port = 5678
1376 server_out_port = 8765
1378 self.nat44_add_address(self.nat_addr)
1379 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1380 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1382 # add static mapping for server
1383 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1384 server_in_port, server_out_port,
1385 proto=IP_PROTOS.tcp)
1387 # send packet from host to server
1388 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1389 IP(src=host.ip4, dst=self.nat_addr) /
1390 TCP(sport=host_in_port, dport=server_out_port))
1391 self.pg0.add_stream(p)
1392 self.pg_enable_capture(self.pg_interfaces)
1394 capture = self.pg0.get_capture(1)
1399 self.assertEqual(ip.src, self.nat_addr)
1400 self.assertEqual(ip.dst, server.ip4)
1401 self.assertNotEqual(tcp.sport, host_in_port)
1402 self.assertEqual(tcp.dport, server_in_port)
1403 self.check_tcp_checksum(p)
1404 host_out_port = tcp.sport
1406 self.logger.error(ppp("Unexpected or invalid packet:", p))
1409 # send reply from server to host
1410 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1411 IP(src=server.ip4, dst=self.nat_addr) /
1412 TCP(sport=server_in_port, dport=host_out_port))
1413 self.pg0.add_stream(p)
1414 self.pg_enable_capture(self.pg_interfaces)
1416 capture = self.pg0.get_capture(1)
1421 self.assertEqual(ip.src, self.nat_addr)
1422 self.assertEqual(ip.dst, host.ip4)
1423 self.assertEqual(tcp.sport, server_out_port)
1424 self.assertEqual(tcp.dport, host_in_port)
1425 self.check_tcp_checksum(p)
1427 self.logger.error(ppp("Unexpected or invalid packet:"), p)
1430 def test_hairpinning2(self):
1431 """ NAT44 hairpinning - 1:1 NAT"""
1433 server1_nat_ip = "10.0.0.10"
1434 server2_nat_ip = "10.0.0.11"
1435 host = self.pg0.remote_hosts[0]
1436 server1 = self.pg0.remote_hosts[1]
1437 server2 = self.pg0.remote_hosts[2]
1438 server_tcp_port = 22
1439 server_udp_port = 20
1441 self.nat44_add_address(self.nat_addr)
1442 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1443 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1446 # add static mapping for servers
1447 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1448 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1452 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1453 IP(src=host.ip4, dst=server1_nat_ip) /
1454 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1456 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1457 IP(src=host.ip4, dst=server1_nat_ip) /
1458 UDP(sport=self.udp_port_in, dport=server_udp_port))
1460 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1461 IP(src=host.ip4, dst=server1_nat_ip) /
1462 ICMP(id=self.icmp_id_in, type='echo-request'))
1464 self.pg0.add_stream(pkts)
1465 self.pg_enable_capture(self.pg_interfaces)
1467 capture = self.pg0.get_capture(len(pkts))
1468 for packet in capture:
1470 self.assertEqual(packet[IP].src, self.nat_addr)
1471 self.assertEqual(packet[IP].dst, server1.ip4)
1472 if packet.haslayer(TCP):
1473 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1474 self.assertEqual(packet[TCP].dport, server_tcp_port)
1475 self.tcp_port_out = packet[TCP].sport
1476 self.check_tcp_checksum(packet)
1477 elif packet.haslayer(UDP):
1478 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1479 self.assertEqual(packet[UDP].dport, server_udp_port)
1480 self.udp_port_out = packet[UDP].sport
1482 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1483 self.icmp_id_out = packet[ICMP].id
1485 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1490 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1491 IP(src=server1.ip4, dst=self.nat_addr) /
1492 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1494 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1495 IP(src=server1.ip4, dst=self.nat_addr) /
1496 UDP(sport=server_udp_port, dport=self.udp_port_out))
1498 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1499 IP(src=server1.ip4, dst=self.nat_addr) /
1500 ICMP(id=self.icmp_id_out, type='echo-reply'))
1502 self.pg0.add_stream(pkts)
1503 self.pg_enable_capture(self.pg_interfaces)
1505 capture = self.pg0.get_capture(len(pkts))
1506 for packet in capture:
1508 self.assertEqual(packet[IP].src, server1_nat_ip)
1509 self.assertEqual(packet[IP].dst, host.ip4)
1510 if packet.haslayer(TCP):
1511 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1512 self.assertEqual(packet[TCP].sport, server_tcp_port)
1513 self.check_tcp_checksum(packet)
1514 elif packet.haslayer(UDP):
1515 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1516 self.assertEqual(packet[UDP].sport, server_udp_port)
1518 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1520 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1523 # server2 to server1
1525 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1526 IP(src=server2.ip4, dst=server1_nat_ip) /
1527 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1529 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1530 IP(src=server2.ip4, dst=server1_nat_ip) /
1531 UDP(sport=self.udp_port_in, dport=server_udp_port))
1533 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1534 IP(src=server2.ip4, dst=server1_nat_ip) /
1535 ICMP(id=self.icmp_id_in, type='echo-request'))
1537 self.pg0.add_stream(pkts)
1538 self.pg_enable_capture(self.pg_interfaces)
1540 capture = self.pg0.get_capture(len(pkts))
1541 for packet in capture:
1543 self.assertEqual(packet[IP].src, server2_nat_ip)
1544 self.assertEqual(packet[IP].dst, server1.ip4)
1545 if packet.haslayer(TCP):
1546 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1547 self.assertEqual(packet[TCP].dport, server_tcp_port)
1548 self.tcp_port_out = packet[TCP].sport
1549 self.check_tcp_checksum(packet)
1550 elif packet.haslayer(UDP):
1551 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1552 self.assertEqual(packet[UDP].dport, server_udp_port)
1553 self.udp_port_out = packet[UDP].sport
1555 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1556 self.icmp_id_out = packet[ICMP].id
1558 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1561 # server1 to server2
1563 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1564 IP(src=server1.ip4, dst=server2_nat_ip) /
1565 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1567 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1568 IP(src=server1.ip4, dst=server2_nat_ip) /
1569 UDP(sport=server_udp_port, dport=self.udp_port_out))
1571 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1572 IP(src=server1.ip4, dst=server2_nat_ip) /
1573 ICMP(id=self.icmp_id_out, type='echo-reply'))
1575 self.pg0.add_stream(pkts)
1576 self.pg_enable_capture(self.pg_interfaces)
1578 capture = self.pg0.get_capture(len(pkts))
1579 for packet in capture:
1581 self.assertEqual(packet[IP].src, server1_nat_ip)
1582 self.assertEqual(packet[IP].dst, server2.ip4)
1583 if packet.haslayer(TCP):
1584 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1585 self.assertEqual(packet[TCP].sport, server_tcp_port)
1586 self.check_tcp_checksum(packet)
1587 elif packet.haslayer(UDP):
1588 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1589 self.assertEqual(packet[UDP].sport, server_udp_port)
1591 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1593 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1596 def test_max_translations_per_user(self):
1597 """ MAX translations per user - recycle the least recently used """
1599 self.nat44_add_address(self.nat_addr)
1600 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1601 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1604 # get maximum number of translations per user
1605 nat44_config = self.vapi.nat_show_config()
1607 # send more than maximum number of translations per user packets
1608 pkts_num = nat44_config.max_translations_per_user + 5
1610 for port in range(0, pkts_num):
1611 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1612 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1613 TCP(sport=1025 + port))
1615 self.pg0.add_stream(pkts)
1616 self.pg_enable_capture(self.pg_interfaces)
1619 # verify number of translated packet
1620 self.pg1.get_capture(pkts_num)
1622 def test_interface_addr(self):
1623 """ Acquire NAT44 addresses from interface """
1624 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1626 # no address in NAT pool
1627 adresses = self.vapi.nat44_address_dump()
1628 self.assertEqual(0, len(adresses))
1630 # configure interface address and check NAT address pool
1631 self.pg7.config_ip4()
1632 adresses = self.vapi.nat44_address_dump()
1633 self.assertEqual(1, len(adresses))
1634 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1636 # remove interface address and check NAT address pool
1637 self.pg7.unconfig_ip4()
1638 adresses = self.vapi.nat44_address_dump()
1639 self.assertEqual(0, len(adresses))
1641 def test_interface_addr_static_mapping(self):
1642 """ Static mapping with addresses from interface """
1643 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1644 self.nat44_add_static_mapping(
1646 external_sw_if_index=self.pg7.sw_if_index)
1648 # static mappings with external interface
1649 static_mappings = self.vapi.nat44_static_mapping_dump()
1650 self.assertEqual(1, len(static_mappings))
1651 self.assertEqual(self.pg7.sw_if_index,
1652 static_mappings[0].external_sw_if_index)
1654 # configure interface address and check static mappings
1655 self.pg7.config_ip4()
1656 static_mappings = self.vapi.nat44_static_mapping_dump()
1657 self.assertEqual(1, len(static_mappings))
1658 self.assertEqual(static_mappings[0].external_ip_address[0:4],
1659 self.pg7.local_ip4n)
1660 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1662 # remove interface address and check static mappings
1663 self.pg7.unconfig_ip4()
1664 static_mappings = self.vapi.nat44_static_mapping_dump()
1665 self.assertEqual(0, len(static_mappings))
1667 def test_ipfix_nat44_sess(self):
1668 """ IPFIX logging NAT44 session created/delted """
1669 self.ipfix_domain_id = 10
1670 self.ipfix_src_port = 20202
1671 colector_port = 30303
1672 bind_layers(UDP, IPFIX, dport=30303)
1673 self.nat44_add_address(self.nat_addr)
1674 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1675 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1677 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1678 src_address=self.pg3.local_ip4n,
1680 template_interval=10,
1681 collector_port=colector_port)
1682 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1683 src_port=self.ipfix_src_port)
1685 pkts = self.create_stream_in(self.pg0, self.pg1)
1686 self.pg0.add_stream(pkts)
1687 self.pg_enable_capture(self.pg_interfaces)
1689 capture = self.pg1.get_capture(len(pkts))
1690 self.verify_capture_out(capture)
1691 self.nat44_add_address(self.nat_addr, is_add=0)
1692 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1693 capture = self.pg3.get_capture(3)
1694 ipfix = IPFIXDecoder()
1695 # first load template
1697 self.assertTrue(p.haslayer(IPFIX))
1698 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1699 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1700 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1701 self.assertEqual(p[UDP].dport, colector_port)
1702 self.assertEqual(p[IPFIX].observationDomainID,
1703 self.ipfix_domain_id)
1704 if p.haslayer(Template):
1705 ipfix.add_template(p.getlayer(Template))
1706 # verify events in data set
1708 if p.haslayer(Data):
1709 data = ipfix.decode_data_set(p.getlayer(Set))
1710 self.verify_ipfix_nat44_ses(data)
1712 def test_ipfix_addr_exhausted(self):
1713 """ IPFIX logging NAT addresses exhausted """
1714 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1715 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1717 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1718 src_address=self.pg3.local_ip4n,
1720 template_interval=10)
1721 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1722 src_port=self.ipfix_src_port)
1724 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1725 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1727 self.pg0.add_stream(p)
1728 self.pg_enable_capture(self.pg_interfaces)
1730 capture = self.pg1.get_capture(0)
1731 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1732 capture = self.pg3.get_capture(3)
1733 ipfix = IPFIXDecoder()
1734 # first load template
1736 self.assertTrue(p.haslayer(IPFIX))
1737 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1738 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1739 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1740 self.assertEqual(p[UDP].dport, 4739)
1741 self.assertEqual(p[IPFIX].observationDomainID,
1742 self.ipfix_domain_id)
1743 if p.haslayer(Template):
1744 ipfix.add_template(p.getlayer(Template))
1745 # verify events in data set
1747 if p.haslayer(Data):
1748 data = ipfix.decode_data_set(p.getlayer(Set))
1749 self.verify_ipfix_addr_exhausted(data)
1751 def test_pool_addr_fib(self):
1752 """ NAT44 add pool addresses to FIB """
1753 static_addr = '10.0.0.10'
1754 self.nat44_add_address(self.nat_addr)
1755 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1756 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1758 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
1761 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1762 ARP(op=ARP.who_has, pdst=self.nat_addr,
1763 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1764 self.pg1.add_stream(p)
1765 self.pg_enable_capture(self.pg_interfaces)
1767 capture = self.pg1.get_capture(1)
1768 self.assertTrue(capture[0].haslayer(ARP))
1769 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1772 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1773 ARP(op=ARP.who_has, pdst=static_addr,
1774 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1775 self.pg1.add_stream(p)
1776 self.pg_enable_capture(self.pg_interfaces)
1778 capture = self.pg1.get_capture(1)
1779 self.assertTrue(capture[0].haslayer(ARP))
1780 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1782 # send ARP to non-NAT44 interface
1783 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1784 ARP(op=ARP.who_has, pdst=self.nat_addr,
1785 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1786 self.pg2.add_stream(p)
1787 self.pg_enable_capture(self.pg_interfaces)
1789 capture = self.pg1.get_capture(0)
1791 # remove addresses and verify
1792 self.nat44_add_address(self.nat_addr, is_add=0)
1793 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
1796 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1797 ARP(op=ARP.who_has, pdst=self.nat_addr,
1798 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1799 self.pg1.add_stream(p)
1800 self.pg_enable_capture(self.pg_interfaces)
1802 capture = self.pg1.get_capture(0)
1804 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1805 ARP(op=ARP.who_has, pdst=static_addr,
1806 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1807 self.pg1.add_stream(p)
1808 self.pg_enable_capture(self.pg_interfaces)
1810 capture = self.pg1.get_capture(0)
1812 def test_vrf_mode(self):
1813 """ NAT44 tenant VRF aware address pool mode """
1817 nat_ip1 = "10.0.0.10"
1818 nat_ip2 = "10.0.0.11"
1820 self.pg0.unconfig_ip4()
1821 self.pg1.unconfig_ip4()
1822 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
1823 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
1824 self.pg0.set_table_ip4(vrf_id1)
1825 self.pg1.set_table_ip4(vrf_id2)
1826 self.pg0.config_ip4()
1827 self.pg1.config_ip4()
1829 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
1830 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
1831 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1832 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1833 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1837 pkts = self.create_stream_in(self.pg0, self.pg2)
1838 self.pg0.add_stream(pkts)
1839 self.pg_enable_capture(self.pg_interfaces)
1841 capture = self.pg2.get_capture(len(pkts))
1842 self.verify_capture_out(capture, nat_ip1)
1845 pkts = self.create_stream_in(self.pg1, self.pg2)
1846 self.pg1.add_stream(pkts)
1847 self.pg_enable_capture(self.pg_interfaces)
1849 capture = self.pg2.get_capture(len(pkts))
1850 self.verify_capture_out(capture, nat_ip2)
1852 self.pg0.unconfig_ip4()
1853 self.pg1.unconfig_ip4()
1854 self.pg0.set_table_ip4(0)
1855 self.pg1.set_table_ip4(0)
1856 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
1857 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
1859 def test_vrf_feature_independent(self):
1860 """ NAT44 tenant VRF independent address pool mode """
1862 nat_ip1 = "10.0.0.10"
1863 nat_ip2 = "10.0.0.11"
1865 self.nat44_add_address(nat_ip1)
1866 self.nat44_add_address(nat_ip2)
1867 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1868 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1869 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1873 pkts = self.create_stream_in(self.pg0, self.pg2)
1874 self.pg0.add_stream(pkts)
1875 self.pg_enable_capture(self.pg_interfaces)
1877 capture = self.pg2.get_capture(len(pkts))
1878 self.verify_capture_out(capture, nat_ip1)
1881 pkts = self.create_stream_in(self.pg1, self.pg2)
1882 self.pg1.add_stream(pkts)
1883 self.pg_enable_capture(self.pg_interfaces)
1885 capture = self.pg2.get_capture(len(pkts))
1886 self.verify_capture_out(capture, nat_ip1)
1888 def test_dynamic_ipless_interfaces(self):
1889 """ NAT44 interfaces without configured IP address """
1891 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1892 mactobinary(self.pg7.remote_mac),
1893 self.pg7.remote_ip4n,
1895 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1896 mactobinary(self.pg8.remote_mac),
1897 self.pg8.remote_ip4n,
1900 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1901 dst_address_length=32,
1902 next_hop_address=self.pg7.remote_ip4n,
1903 next_hop_sw_if_index=self.pg7.sw_if_index)
1904 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1905 dst_address_length=32,
1906 next_hop_address=self.pg8.remote_ip4n,
1907 next_hop_sw_if_index=self.pg8.sw_if_index)
1909 self.nat44_add_address(self.nat_addr)
1910 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1911 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1915 pkts = self.create_stream_in(self.pg7, self.pg8)
1916 self.pg7.add_stream(pkts)
1917 self.pg_enable_capture(self.pg_interfaces)
1919 capture = self.pg8.get_capture(len(pkts))
1920 self.verify_capture_out(capture)
1923 pkts = self.create_stream_out(self.pg8, self.nat_addr)
1924 self.pg8.add_stream(pkts)
1925 self.pg_enable_capture(self.pg_interfaces)
1927 capture = self.pg7.get_capture(len(pkts))
1928 self.verify_capture_in(capture, self.pg7)
1930 def test_static_ipless_interfaces(self):
1931 """ NAT44 interfaces without configured IP address - 1:1 NAT """
1933 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1934 mactobinary(self.pg7.remote_mac),
1935 self.pg7.remote_ip4n,
1937 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1938 mactobinary(self.pg8.remote_mac),
1939 self.pg8.remote_ip4n,
1942 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1943 dst_address_length=32,
1944 next_hop_address=self.pg7.remote_ip4n,
1945 next_hop_sw_if_index=self.pg7.sw_if_index)
1946 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1947 dst_address_length=32,
1948 next_hop_address=self.pg8.remote_ip4n,
1949 next_hop_sw_if_index=self.pg8.sw_if_index)
1951 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
1952 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1953 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1957 pkts = self.create_stream_out(self.pg8)
1958 self.pg8.add_stream(pkts)
1959 self.pg_enable_capture(self.pg_interfaces)
1961 capture = self.pg7.get_capture(len(pkts))
1962 self.verify_capture_in(capture, self.pg7)
1965 pkts = self.create_stream_in(self.pg7, self.pg8)
1966 self.pg7.add_stream(pkts)
1967 self.pg_enable_capture(self.pg_interfaces)
1969 capture = self.pg8.get_capture(len(pkts))
1970 self.verify_capture_out(capture, self.nat_addr, True)
1972 def test_static_with_port_ipless_interfaces(self):
1973 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
1975 self.tcp_port_out = 30606
1976 self.udp_port_out = 30607
1977 self.icmp_id_out = 30608
1979 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1980 mactobinary(self.pg7.remote_mac),
1981 self.pg7.remote_ip4n,
1983 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1984 mactobinary(self.pg8.remote_mac),
1985 self.pg8.remote_ip4n,
1988 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1989 dst_address_length=32,
1990 next_hop_address=self.pg7.remote_ip4n,
1991 next_hop_sw_if_index=self.pg7.sw_if_index)
1992 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1993 dst_address_length=32,
1994 next_hop_address=self.pg8.remote_ip4n,
1995 next_hop_sw_if_index=self.pg8.sw_if_index)
1997 self.nat44_add_address(self.nat_addr)
1998 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1999 self.tcp_port_in, self.tcp_port_out,
2000 proto=IP_PROTOS.tcp)
2001 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2002 self.udp_port_in, self.udp_port_out,
2003 proto=IP_PROTOS.udp)
2004 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2005 self.icmp_id_in, self.icmp_id_out,
2006 proto=IP_PROTOS.icmp)
2007 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2008 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2012 pkts = self.create_stream_out(self.pg8)
2013 self.pg8.add_stream(pkts)
2014 self.pg_enable_capture(self.pg_interfaces)
2016 capture = self.pg7.get_capture(len(pkts))
2017 self.verify_capture_in(capture, self.pg7)
2020 pkts = self.create_stream_in(self.pg7, self.pg8)
2021 self.pg7.add_stream(pkts)
2022 self.pg_enable_capture(self.pg_interfaces)
2024 capture = self.pg8.get_capture(len(pkts))
2025 self.verify_capture_out(capture)
2027 def test_static_unknown_proto(self):
2028 """ 1:1 NAT translate packet with unknown protocol """
2029 nat_ip = "10.0.0.10"
2030 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2031 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2032 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2036 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2037 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2039 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2040 TCP(sport=1234, dport=1234))
2041 self.pg0.add_stream(p)
2042 self.pg_enable_capture(self.pg_interfaces)
2044 p = self.pg1.get_capture(1)
2047 self.assertEqual(packet[IP].src, nat_ip)
2048 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2049 self.assertTrue(packet.haslayer(GRE))
2050 self.check_ip_checksum(packet)
2052 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2056 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2057 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2059 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2060 TCP(sport=1234, dport=1234))
2061 self.pg1.add_stream(p)
2062 self.pg_enable_capture(self.pg_interfaces)
2064 p = self.pg0.get_capture(1)
2067 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2068 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2069 self.assertTrue(packet.haslayer(GRE))
2070 self.check_ip_checksum(packet)
2072 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2075 def test_hairpinning_static_unknown_proto(self):
2076 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2078 host = self.pg0.remote_hosts[0]
2079 server = self.pg0.remote_hosts[1]
2081 host_nat_ip = "10.0.0.10"
2082 server_nat_ip = "10.0.0.11"
2084 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2085 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2086 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2087 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2091 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2092 IP(src=host.ip4, dst=server_nat_ip) /
2094 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2095 TCP(sport=1234, dport=1234))
2096 self.pg0.add_stream(p)
2097 self.pg_enable_capture(self.pg_interfaces)
2099 p = self.pg0.get_capture(1)
2102 self.assertEqual(packet[IP].src, host_nat_ip)
2103 self.assertEqual(packet[IP].dst, server.ip4)
2104 self.assertTrue(packet.haslayer(GRE))
2105 self.check_ip_checksum(packet)
2107 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2111 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2112 IP(src=server.ip4, dst=host_nat_ip) /
2114 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2115 TCP(sport=1234, dport=1234))
2116 self.pg0.add_stream(p)
2117 self.pg_enable_capture(self.pg_interfaces)
2119 p = self.pg0.get_capture(1)
2122 self.assertEqual(packet[IP].src, server_nat_ip)
2123 self.assertEqual(packet[IP].dst, host.ip4)
2124 self.assertTrue(packet.haslayer(GRE))
2125 self.check_ip_checksum(packet)
2127 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2130 def test_unknown_proto(self):
2131 """ NAT44 translate packet with unknown protocol """
2132 self.nat44_add_address(self.nat_addr)
2133 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2134 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2138 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2139 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2140 TCP(sport=self.tcp_port_in, dport=20))
2141 self.pg0.add_stream(p)
2142 self.pg_enable_capture(self.pg_interfaces)
2144 p = self.pg1.get_capture(1)
2146 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2147 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2149 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2150 TCP(sport=1234, dport=1234))
2151 self.pg0.add_stream(p)
2152 self.pg_enable_capture(self.pg_interfaces)
2154 p = self.pg1.get_capture(1)
2157 self.assertEqual(packet[IP].src, self.nat_addr)
2158 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2159 self.assertTrue(packet.haslayer(GRE))
2160 self.check_ip_checksum(packet)
2162 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2166 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2167 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2169 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2170 TCP(sport=1234, dport=1234))
2171 self.pg1.add_stream(p)
2172 self.pg_enable_capture(self.pg_interfaces)
2174 p = self.pg0.get_capture(1)
2177 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2178 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2179 self.assertTrue(packet.haslayer(GRE))
2180 self.check_ip_checksum(packet)
2182 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2185 def test_hairpinning_unknown_proto(self):
2186 """ NAT44 translate packet with unknown protocol - hairpinning """
2187 host = self.pg0.remote_hosts[0]
2188 server = self.pg0.remote_hosts[1]
2191 server_in_port = 5678
2192 server_out_port = 8765
2193 server_nat_ip = "10.0.0.11"
2195 self.nat44_add_address(self.nat_addr)
2196 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2197 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2200 # add static mapping for server
2201 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2204 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2205 IP(src=host.ip4, dst=server_nat_ip) /
2206 TCP(sport=host_in_port, dport=server_out_port))
2207 self.pg0.add_stream(p)
2208 self.pg_enable_capture(self.pg_interfaces)
2210 capture = self.pg0.get_capture(1)
2212 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2213 IP(src=host.ip4, dst=server_nat_ip) /
2215 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2216 TCP(sport=1234, dport=1234))
2217 self.pg0.add_stream(p)
2218 self.pg_enable_capture(self.pg_interfaces)
2220 p = self.pg0.get_capture(1)
2223 self.assertEqual(packet[IP].src, self.nat_addr)
2224 self.assertEqual(packet[IP].dst, server.ip4)
2225 self.assertTrue(packet.haslayer(GRE))
2226 self.check_ip_checksum(packet)
2228 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2232 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2233 IP(src=server.ip4, dst=self.nat_addr) /
2235 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2236 TCP(sport=1234, dport=1234))
2237 self.pg0.add_stream(p)
2238 self.pg_enable_capture(self.pg_interfaces)
2240 p = self.pg0.get_capture(1)
2243 self.assertEqual(packet[IP].src, server_nat_ip)
2244 self.assertEqual(packet[IP].dst, host.ip4)
2245 self.assertTrue(packet.haslayer(GRE))
2246 self.check_ip_checksum(packet)
2248 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2251 def test_output_feature(self):
2252 """ NAT44 interface output feature (in2out postrouting) """
2253 self.nat44_add_address(self.nat_addr)
2254 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2255 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2256 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2260 pkts = self.create_stream_in(self.pg0, self.pg3)
2261 self.pg0.add_stream(pkts)
2262 self.pg_enable_capture(self.pg_interfaces)
2264 capture = self.pg3.get_capture(len(pkts))
2265 self.verify_capture_out(capture)
2268 pkts = self.create_stream_out(self.pg3)
2269 self.pg3.add_stream(pkts)
2270 self.pg_enable_capture(self.pg_interfaces)
2272 capture = self.pg0.get_capture(len(pkts))
2273 self.verify_capture_in(capture, self.pg0)
2275 # from non-NAT interface to NAT inside interface
2276 pkts = self.create_stream_in(self.pg2, self.pg0)
2277 self.pg2.add_stream(pkts)
2278 self.pg_enable_capture(self.pg_interfaces)
2280 capture = self.pg0.get_capture(len(pkts))
2281 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2283 def test_output_feature_vrf_aware(self):
2284 """ NAT44 interface output feature VRF aware (in2out postrouting) """
2285 nat_ip_vrf10 = "10.0.0.10"
2286 nat_ip_vrf20 = "10.0.0.20"
2288 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2289 dst_address_length=32,
2290 next_hop_address=self.pg3.remote_ip4n,
2291 next_hop_sw_if_index=self.pg3.sw_if_index,
2293 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2294 dst_address_length=32,
2295 next_hop_address=self.pg3.remote_ip4n,
2296 next_hop_sw_if_index=self.pg3.sw_if_index,
2299 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2300 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2301 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2302 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2303 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2307 pkts = self.create_stream_in(self.pg4, self.pg3)
2308 self.pg4.add_stream(pkts)
2309 self.pg_enable_capture(self.pg_interfaces)
2311 capture = self.pg3.get_capture(len(pkts))
2312 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2315 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2316 self.pg3.add_stream(pkts)
2317 self.pg_enable_capture(self.pg_interfaces)
2319 capture = self.pg4.get_capture(len(pkts))
2320 self.verify_capture_in(capture, self.pg4)
2323 pkts = self.create_stream_in(self.pg6, self.pg3)
2324 self.pg6.add_stream(pkts)
2325 self.pg_enable_capture(self.pg_interfaces)
2327 capture = self.pg3.get_capture(len(pkts))
2328 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2331 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2332 self.pg3.add_stream(pkts)
2333 self.pg_enable_capture(self.pg_interfaces)
2335 capture = self.pg6.get_capture(len(pkts))
2336 self.verify_capture_in(capture, self.pg6)
2338 def test_output_feature_hairpinning(self):
2339 """ NAT44 interface output feature hairpinning (in2out postrouting) """
2340 host = self.pg0.remote_hosts[0]
2341 server = self.pg0.remote_hosts[1]
2344 server_in_port = 5678
2345 server_out_port = 8765
2347 self.nat44_add_address(self.nat_addr)
2348 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2349 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2352 # add static mapping for server
2353 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2354 server_in_port, server_out_port,
2355 proto=IP_PROTOS.tcp)
2357 # send packet from host to server
2358 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2359 IP(src=host.ip4, dst=self.nat_addr) /
2360 TCP(sport=host_in_port, dport=server_out_port))
2361 self.pg0.add_stream(p)
2362 self.pg_enable_capture(self.pg_interfaces)
2364 capture = self.pg0.get_capture(1)
2369 self.assertEqual(ip.src, self.nat_addr)
2370 self.assertEqual(ip.dst, server.ip4)
2371 self.assertNotEqual(tcp.sport, host_in_port)
2372 self.assertEqual(tcp.dport, server_in_port)
2373 self.check_tcp_checksum(p)
2374 host_out_port = tcp.sport
2376 self.logger.error(ppp("Unexpected or invalid packet:", p))
2379 # send reply from server to host
2380 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2381 IP(src=server.ip4, dst=self.nat_addr) /
2382 TCP(sport=server_in_port, dport=host_out_port))
2383 self.pg0.add_stream(p)
2384 self.pg_enable_capture(self.pg_interfaces)
2386 capture = self.pg0.get_capture(1)
2391 self.assertEqual(ip.src, self.nat_addr)
2392 self.assertEqual(ip.dst, host.ip4)
2393 self.assertEqual(tcp.sport, server_out_port)
2394 self.assertEqual(tcp.dport, host_in_port)
2395 self.check_tcp_checksum(p)
2397 self.logger.error(ppp("Unexpected or invalid packet:"), p)
2400 def test_one_armed_nat44(self):
2401 """ One armed NAT44 """
2402 remote_host = self.pg9.remote_hosts[0]
2403 local_host = self.pg9.remote_hosts[1]
2406 self.nat44_add_address(self.nat_addr)
2407 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2408 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2412 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2413 IP(src=local_host.ip4, dst=remote_host.ip4) /
2414 TCP(sport=12345, dport=80))
2415 self.pg9.add_stream(p)
2416 self.pg_enable_capture(self.pg_interfaces)
2418 capture = self.pg9.get_capture(1)
2423 self.assertEqual(ip.src, self.nat_addr)
2424 self.assertEqual(ip.dst, remote_host.ip4)
2425 self.assertNotEqual(tcp.sport, 12345)
2426 external_port = tcp.sport
2427 self.assertEqual(tcp.dport, 80)
2428 self.check_tcp_checksum(p)
2429 self.check_ip_checksum(p)
2431 self.logger.error(ppp("Unexpected or invalid packet:", p))
2435 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2436 IP(src=remote_host.ip4, dst=self.nat_addr) /
2437 TCP(sport=80, dport=external_port))
2438 self.pg9.add_stream(p)
2439 self.pg_enable_capture(self.pg_interfaces)
2441 capture = self.pg9.get_capture(1)
2446 self.assertEqual(ip.src, remote_host.ip4)
2447 self.assertEqual(ip.dst, local_host.ip4)
2448 self.assertEqual(tcp.sport, 80)
2449 self.assertEqual(tcp.dport, 12345)
2450 self.check_tcp_checksum(p)
2451 self.check_ip_checksum(p)
2453 self.logger.error(ppp("Unexpected or invalid packet:", p))
2456 def test_del_session(self):
2457 """ Delete NAT44 session """
2458 self.nat44_add_address(self.nat_addr)
2459 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2460 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2463 pkts = self.create_stream_in(self.pg0, self.pg1)
2464 self.pg0.add_stream(pkts)
2465 self.pg_enable_capture(self.pg_interfaces)
2467 capture = self.pg1.get_capture(len(pkts))
2469 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2470 nsessions = len(sessions)
2472 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2473 sessions[0].inside_port,
2474 sessions[0].protocol)
2475 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2476 sessions[1].outside_port,
2477 sessions[1].protocol,
2480 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2481 self.assertEqual(nsessions - len(sessions), 2)
2484 super(TestNAT44, self).tearDown()
2485 if not self.vpp_dead:
2486 self.logger.info(self.vapi.cli("show nat44 verbose"))
2490 class TestDeterministicNAT(MethodHolder):
2491 """ Deterministic NAT Test Cases """
2494 def setUpConstants(cls):
2495 super(TestDeterministicNAT, cls).setUpConstants()
2496 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
2499 def setUpClass(cls):
2500 super(TestDeterministicNAT, cls).setUpClass()
2503 cls.tcp_port_in = 6303
2504 cls.tcp_external_port = 6303
2505 cls.udp_port_in = 6304
2506 cls.udp_external_port = 6304
2507 cls.icmp_id_in = 6305
2508 cls.nat_addr = '10.0.0.3'
2510 cls.create_pg_interfaces(range(3))
2511 cls.interfaces = list(cls.pg_interfaces)
2513 for i in cls.interfaces:
2518 cls.pg0.generate_remote_hosts(2)
2519 cls.pg0.configure_ipv4_neighbors()
2522 super(TestDeterministicNAT, cls).tearDownClass()
2525 def create_stream_in(self, in_if, out_if, ttl=64):
2527 Create packet stream for inside network
2529 :param in_if: Inside interface
2530 :param out_if: Outside interface
2531 :param ttl: TTL of generated packets
2535 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2536 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2537 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
2541 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2542 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2543 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2547 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2548 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2549 ICMP(id=self.icmp_id_in, type='echo-request'))
2554 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2556 Create packet stream for outside network
2558 :param out_if: Outside interface
2559 :param dst_ip: Destination IP address (Default use global NAT address)
2560 :param ttl: TTL of generated packets
2563 dst_ip = self.nat_addr
2566 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2567 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2568 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2572 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2573 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2574 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2578 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2579 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2580 ICMP(id=self.icmp_external_id, type='echo-reply'))
2585 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2587 Verify captured packets on outside network
2589 :param capture: Captured packets
2590 :param nat_ip: Translated IP address (Default use global NAT address)
2591 :param same_port: Sorce port number is not translated (Default False)
2592 :param packet_num: Expected number of packets (Default 3)
2595 nat_ip = self.nat_addr
2596 self.assertEqual(packet_num, len(capture))
2597 for packet in capture:
2599 self.assertEqual(packet[IP].src, nat_ip)
2600 if packet.haslayer(TCP):
2601 self.tcp_port_out = packet[TCP].sport
2602 elif packet.haslayer(UDP):
2603 self.udp_port_out = packet[UDP].sport
2605 self.icmp_external_id = packet[ICMP].id
2607 self.logger.error(ppp("Unexpected or invalid packet "
2608 "(outside network):", packet))
2611 def initiate_tcp_session(self, in_if, out_if):
2613 Initiates TCP session
2615 :param in_if: Inside interface
2616 :param out_if: Outside interface
2619 # SYN packet in->out
2620 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2621 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2622 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2625 self.pg_enable_capture(self.pg_interfaces)
2627 capture = out_if.get_capture(1)
2629 self.tcp_port_out = p[TCP].sport
2631 # SYN + ACK packet out->in
2632 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2633 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
2634 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2636 out_if.add_stream(p)
2637 self.pg_enable_capture(self.pg_interfaces)
2639 in_if.get_capture(1)
2641 # ACK packet in->out
2642 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2643 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2644 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2647 self.pg_enable_capture(self.pg_interfaces)
2649 out_if.get_capture(1)
2652 self.logger.error("TCP 3 way handshake failed")
2655 def verify_ipfix_max_entries_per_user(self, data):
2657 Verify IPFIX maximum entries per user exceeded event
2659 :param data: Decoded IPFIX data records
2661 self.assertEqual(1, len(data))
2664 self.assertEqual(ord(record[230]), 13)
2665 # natQuotaExceededEvent
2666 self.assertEqual('\x03\x00\x00\x00', record[466])
2668 self.assertEqual(self.pg0.remote_ip4n, record[8])
2670 def test_deterministic_mode(self):
2671 """ NAT plugin run deterministic mode """
2672 in_addr = '172.16.255.0'
2673 out_addr = '172.17.255.50'
2674 in_addr_t = '172.16.255.20'
2675 in_addr_n = socket.inet_aton(in_addr)
2676 out_addr_n = socket.inet_aton(out_addr)
2677 in_addr_t_n = socket.inet_aton(in_addr_t)
2681 nat_config = self.vapi.nat_show_config()
2682 self.assertEqual(1, nat_config.deterministic)
2684 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
2686 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
2687 self.assertEqual(rep1.out_addr[:4], out_addr_n)
2688 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
2689 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2691 deterministic_mappings = self.vapi.nat_det_map_dump()
2692 self.assertEqual(len(deterministic_mappings), 1)
2693 dsm = deterministic_mappings[0]
2694 self.assertEqual(in_addr_n, dsm.in_addr[:4])
2695 self.assertEqual(in_plen, dsm.in_plen)
2696 self.assertEqual(out_addr_n, dsm.out_addr[:4])
2697 self.assertEqual(out_plen, dsm.out_plen)
2699 self.clear_nat_det()
2700 deterministic_mappings = self.vapi.nat_det_map_dump()
2701 self.assertEqual(len(deterministic_mappings), 0)
2703 def test_set_timeouts(self):
2704 """ Set deterministic NAT timeouts """
2705 timeouts_before = self.vapi.nat_det_get_timeouts()
2707 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
2708 timeouts_before.tcp_established + 10,
2709 timeouts_before.tcp_transitory + 10,
2710 timeouts_before.icmp + 10)
2712 timeouts_after = self.vapi.nat_det_get_timeouts()
2714 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
2715 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
2716 self.assertNotEqual(timeouts_before.tcp_established,
2717 timeouts_after.tcp_established)
2718 self.assertNotEqual(timeouts_before.tcp_transitory,
2719 timeouts_after.tcp_transitory)
2721 def test_det_in(self):
2722 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
2724 nat_ip = "10.0.0.10"
2726 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2728 socket.inet_aton(nat_ip),
2730 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2731 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2735 pkts = self.create_stream_in(self.pg0, self.pg1)
2736 self.pg0.add_stream(pkts)
2737 self.pg_enable_capture(self.pg_interfaces)
2739 capture = self.pg1.get_capture(len(pkts))
2740 self.verify_capture_out(capture, nat_ip)
2743 pkts = self.create_stream_out(self.pg1, nat_ip)
2744 self.pg1.add_stream(pkts)
2745 self.pg_enable_capture(self.pg_interfaces)
2747 capture = self.pg0.get_capture(len(pkts))
2748 self.verify_capture_in(capture, self.pg0)
2751 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
2752 self.assertEqual(len(sessions), 3)
2756 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2757 self.assertEqual(s.in_port, self.tcp_port_in)
2758 self.assertEqual(s.out_port, self.tcp_port_out)
2759 self.assertEqual(s.ext_port, self.tcp_external_port)
2763 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2764 self.assertEqual(s.in_port, self.udp_port_in)
2765 self.assertEqual(s.out_port, self.udp_port_out)
2766 self.assertEqual(s.ext_port, self.udp_external_port)
2770 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2771 self.assertEqual(s.in_port, self.icmp_id_in)
2772 self.assertEqual(s.out_port, self.icmp_external_id)
2774 def test_multiple_users(self):
2775 """ Deterministic NAT multiple users """
2777 nat_ip = "10.0.0.10"
2779 external_port = 6303
2781 host0 = self.pg0.remote_hosts[0]
2782 host1 = self.pg0.remote_hosts[1]
2784 self.vapi.nat_det_add_del_map(host0.ip4n,
2786 socket.inet_aton(nat_ip),
2788 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2789 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2793 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2794 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2795 TCP(sport=port_in, dport=external_port))
2796 self.pg0.add_stream(p)
2797 self.pg_enable_capture(self.pg_interfaces)
2799 capture = self.pg1.get_capture(1)
2804 self.assertEqual(ip.src, nat_ip)
2805 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2806 self.assertEqual(tcp.dport, external_port)
2807 port_out0 = tcp.sport
2809 self.logger.error(ppp("Unexpected or invalid packet:", p))
2813 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2814 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2815 TCP(sport=port_in, dport=external_port))
2816 self.pg0.add_stream(p)
2817 self.pg_enable_capture(self.pg_interfaces)
2819 capture = self.pg1.get_capture(1)
2824 self.assertEqual(ip.src, nat_ip)
2825 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2826 self.assertEqual(tcp.dport, external_port)
2827 port_out1 = tcp.sport
2829 self.logger.error(ppp("Unexpected or invalid packet:", p))
2832 dms = self.vapi.nat_det_map_dump()
2833 self.assertEqual(1, len(dms))
2834 self.assertEqual(2, dms[0].ses_num)
2837 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2838 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2839 TCP(sport=external_port, dport=port_out0))
2840 self.pg1.add_stream(p)
2841 self.pg_enable_capture(self.pg_interfaces)
2843 capture = self.pg0.get_capture(1)
2848 self.assertEqual(ip.src, self.pg1.remote_ip4)
2849 self.assertEqual(ip.dst, host0.ip4)
2850 self.assertEqual(tcp.dport, port_in)
2851 self.assertEqual(tcp.sport, external_port)
2853 self.logger.error(ppp("Unexpected or invalid packet:", p))
2857 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2858 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2859 TCP(sport=external_port, dport=port_out1))
2860 self.pg1.add_stream(p)
2861 self.pg_enable_capture(self.pg_interfaces)
2863 capture = self.pg0.get_capture(1)
2868 self.assertEqual(ip.src, self.pg1.remote_ip4)
2869 self.assertEqual(ip.dst, host1.ip4)
2870 self.assertEqual(tcp.dport, port_in)
2871 self.assertEqual(tcp.sport, external_port)
2873 self.logger.error(ppp("Unexpected or invalid packet", p))
2876 # session close api test
2877 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
2879 self.pg1.remote_ip4n,
2881 dms = self.vapi.nat_det_map_dump()
2882 self.assertEqual(dms[0].ses_num, 1)
2884 self.vapi.nat_det_close_session_in(host0.ip4n,
2886 self.pg1.remote_ip4n,
2888 dms = self.vapi.nat_det_map_dump()
2889 self.assertEqual(dms[0].ses_num, 0)
2891 def test_tcp_session_close_detection_in(self):
2892 """ Deterministic NAT TCP session close from inside network """
2893 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2895 socket.inet_aton(self.nat_addr),
2897 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2898 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2901 self.initiate_tcp_session(self.pg0, self.pg1)
2903 # close the session from inside
2905 # FIN packet in -> out
2906 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2907 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2908 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2910 self.pg0.add_stream(p)
2911 self.pg_enable_capture(self.pg_interfaces)
2913 self.pg1.get_capture(1)
2917 # ACK packet out -> in
2918 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2919 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2920 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2924 # FIN packet out -> in
2925 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2926 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2927 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2931 self.pg1.add_stream(pkts)
2932 self.pg_enable_capture(self.pg_interfaces)
2934 self.pg0.get_capture(2)
2936 # ACK packet in -> out
2937 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2938 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2939 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2941 self.pg0.add_stream(p)
2942 self.pg_enable_capture(self.pg_interfaces)
2944 self.pg1.get_capture(1)
2946 # Check if deterministic NAT44 closed the session
2947 dms = self.vapi.nat_det_map_dump()
2948 self.assertEqual(0, dms[0].ses_num)
2950 self.logger.error("TCP session termination failed")
2953 def test_tcp_session_close_detection_out(self):
2954 """ Deterministic NAT TCP session close from outside network """
2955 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2957 socket.inet_aton(self.nat_addr),
2959 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2960 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2963 self.initiate_tcp_session(self.pg0, self.pg1)
2965 # close the session from outside
2967 # FIN packet out -> in
2968 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2969 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2970 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2972 self.pg1.add_stream(p)
2973 self.pg_enable_capture(self.pg_interfaces)
2975 self.pg0.get_capture(1)
2979 # ACK packet in -> out
2980 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2981 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2982 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2986 # ACK packet in -> out
2987 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2988 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2989 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2993 self.pg0.add_stream(pkts)
2994 self.pg_enable_capture(self.pg_interfaces)
2996 self.pg1.get_capture(2)
2998 # ACK packet out -> in
2999 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3000 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3001 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3003 self.pg1.add_stream(p)
3004 self.pg_enable_capture(self.pg_interfaces)
3006 self.pg0.get_capture(1)
3008 # Check if deterministic NAT44 closed the session
3009 dms = self.vapi.nat_det_map_dump()
3010 self.assertEqual(0, dms[0].ses_num)
3012 self.logger.error("TCP session termination failed")
3015 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3016 def test_session_timeout(self):
3017 """ Deterministic NAT session timeouts """
3018 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3020 socket.inet_aton(self.nat_addr),
3022 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3023 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3026 self.initiate_tcp_session(self.pg0, self.pg1)
3027 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
3028 pkts = self.create_stream_in(self.pg0, self.pg1)
3029 self.pg0.add_stream(pkts)
3030 self.pg_enable_capture(self.pg_interfaces)
3032 capture = self.pg1.get_capture(len(pkts))
3035 dms = self.vapi.nat_det_map_dump()
3036 self.assertEqual(0, dms[0].ses_num)
3038 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3039 def test_session_limit_per_user(self):
3040 """ Deterministic NAT maximum sessions per user limit """
3041 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3043 socket.inet_aton(self.nat_addr),
3045 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3046 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3048 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
3049 src_address=self.pg2.local_ip4n,
3051 template_interval=10)
3052 self.vapi.nat_ipfix()
3055 for port in range(1025, 2025):
3056 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3057 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3058 UDP(sport=port, dport=port))
3061 self.pg0.add_stream(pkts)
3062 self.pg_enable_capture(self.pg_interfaces)
3064 capture = self.pg1.get_capture(len(pkts))
3066 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3067 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3068 UDP(sport=3001, dport=3002))
3069 self.pg0.add_stream(p)
3070 self.pg_enable_capture(self.pg_interfaces)
3072 capture = self.pg1.assert_nothing_captured()
3074 # verify ICMP error packet
3075 capture = self.pg0.get_capture(1)
3077 self.assertTrue(p.haslayer(ICMP))
3079 self.assertEqual(icmp.type, 3)
3080 self.assertEqual(icmp.code, 1)
3081 self.assertTrue(icmp.haslayer(IPerror))
3082 inner_ip = icmp[IPerror]
3083 self.assertEqual(inner_ip[UDPerror].sport, 3001)
3084 self.assertEqual(inner_ip[UDPerror].dport, 3002)
3086 dms = self.vapi.nat_det_map_dump()
3088 self.assertEqual(1000, dms[0].ses_num)
3090 # verify IPFIX logging
3091 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3093 capture = self.pg2.get_capture(2)
3094 ipfix = IPFIXDecoder()
3095 # first load template
3097 self.assertTrue(p.haslayer(IPFIX))
3098 if p.haslayer(Template):
3099 ipfix.add_template(p.getlayer(Template))
3100 # verify events in data set
3102 if p.haslayer(Data):
3103 data = ipfix.decode_data_set(p.getlayer(Set))
3104 self.verify_ipfix_max_entries_per_user(data)
3106 def clear_nat_det(self):
3108 Clear deterministic NAT configuration.
3110 self.vapi.nat_ipfix(enable=0)
3111 self.vapi.nat_det_set_timeouts()
3112 deterministic_mappings = self.vapi.nat_det_map_dump()
3113 for dsm in deterministic_mappings:
3114 self.vapi.nat_det_add_del_map(dsm.in_addr,
3120 interfaces = self.vapi.nat44_interface_dump()
3121 for intf in interfaces:
3122 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
3127 super(TestDeterministicNAT, self).tearDown()
3128 if not self.vpp_dead:
3129 self.logger.info(self.vapi.cli("show nat44 detail"))
3130 self.clear_nat_det()
3133 class TestNAT64(MethodHolder):
3134 """ NAT64 Test Cases """
3137 def setUpClass(cls):
3138 super(TestNAT64, cls).setUpClass()
3141 cls.tcp_port_in = 6303
3142 cls.tcp_port_out = 6303
3143 cls.udp_port_in = 6304
3144 cls.udp_port_out = 6304
3145 cls.icmp_id_in = 6305
3146 cls.icmp_id_out = 6305
3147 cls.nat_addr = '10.0.0.3'
3148 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3150 cls.vrf1_nat_addr = '10.0.10.3'
3151 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
3154 cls.create_pg_interfaces(range(4))
3155 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
3156 cls.ip6_interfaces.append(cls.pg_interfaces[2])
3157 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
3159 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
3161 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
3163 cls.pg0.generate_remote_hosts(2)
3165 for i in cls.ip6_interfaces:
3168 i.configure_ipv6_neighbors()
3170 for i in cls.ip4_interfaces:
3176 cls.pg3.config_ip4()
3177 cls.pg3.resolve_arp()
3178 cls.pg3.config_ip6()
3179 cls.pg3.configure_ipv6_neighbors()
3182 super(TestNAT64, cls).tearDownClass()
3185 def test_pool(self):
3186 """ Add/delete address to NAT64 pool """
3187 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
3189 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
3191 addresses = self.vapi.nat64_pool_addr_dump()
3192 self.assertEqual(len(addresses), 1)
3193 self.assertEqual(addresses[0].address, nat_addr)
3195 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
3197 addresses = self.vapi.nat64_pool_addr_dump()
3198 self.assertEqual(len(addresses), 0)
3200 def test_interface(self):
3201 """ Enable/disable NAT64 feature on the interface """
3202 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3203 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3205 interfaces = self.vapi.nat64_interface_dump()
3206 self.assertEqual(len(interfaces), 2)
3209 for intf in interfaces:
3210 if intf.sw_if_index == self.pg0.sw_if_index:
3211 self.assertEqual(intf.is_inside, 1)
3213 elif intf.sw_if_index == self.pg1.sw_if_index:
3214 self.assertEqual(intf.is_inside, 0)
3216 self.assertTrue(pg0_found)
3217 self.assertTrue(pg1_found)
3219 features = self.vapi.cli("show interface features pg0")
3220 self.assertNotEqual(features.find('nat64-in2out'), -1)
3221 features = self.vapi.cli("show interface features pg1")
3222 self.assertNotEqual(features.find('nat64-out2in'), -1)
3224 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
3225 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
3227 interfaces = self.vapi.nat64_interface_dump()
3228 self.assertEqual(len(interfaces), 0)
3230 def test_static_bib(self):
3231 """ Add/delete static BIB entry """
3232 in_addr = socket.inet_pton(socket.AF_INET6,
3233 '2001:db8:85a3::8a2e:370:7334')
3234 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
3237 proto = IP_PROTOS.tcp
3239 self.vapi.nat64_add_del_static_bib(in_addr,
3244 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3249 self.assertEqual(bibe.i_addr, in_addr)
3250 self.assertEqual(bibe.o_addr, out_addr)
3251 self.assertEqual(bibe.i_port, in_port)
3252 self.assertEqual(bibe.o_port, out_port)
3253 self.assertEqual(static_bib_num, 1)
3255 self.vapi.nat64_add_del_static_bib(in_addr,
3261 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3266 self.assertEqual(static_bib_num, 0)
3268 def test_set_timeouts(self):
3269 """ Set NAT64 timeouts """
3270 # verify default values
3271 timeouts = self.vapi.nat64_get_timeouts()
3272 self.assertEqual(timeouts.udp, 300)
3273 self.assertEqual(timeouts.icmp, 60)
3274 self.assertEqual(timeouts.tcp_trans, 240)
3275 self.assertEqual(timeouts.tcp_est, 7440)
3276 self.assertEqual(timeouts.tcp_incoming_syn, 6)
3278 # set and verify custom values
3279 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
3280 tcp_est=7450, tcp_incoming_syn=10)
3281 timeouts = self.vapi.nat64_get_timeouts()
3282 self.assertEqual(timeouts.udp, 200)
3283 self.assertEqual(timeouts.icmp, 30)
3284 self.assertEqual(timeouts.tcp_trans, 250)
3285 self.assertEqual(timeouts.tcp_est, 7450)
3286 self.assertEqual(timeouts.tcp_incoming_syn, 10)
3288 def test_dynamic(self):
3289 """ NAT64 dynamic translation test """
3290 self.tcp_port_in = 6303
3291 self.udp_port_in = 6304
3292 self.icmp_id_in = 6305
3294 ses_num_start = self.nat64_get_ses_num()
3296 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3298 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3299 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3302 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3303 self.pg0.add_stream(pkts)
3304 self.pg_enable_capture(self.pg_interfaces)
3306 capture = self.pg1.get_capture(len(pkts))
3307 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3308 dst_ip=self.pg1.remote_ip4)
3311 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3312 self.pg1.add_stream(pkts)
3313 self.pg_enable_capture(self.pg_interfaces)
3315 capture = self.pg0.get_capture(len(pkts))
3316 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3317 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3320 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3321 self.pg0.add_stream(pkts)
3322 self.pg_enable_capture(self.pg_interfaces)
3324 capture = self.pg1.get_capture(len(pkts))
3325 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3326 dst_ip=self.pg1.remote_ip4)
3329 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3330 self.pg1.add_stream(pkts)
3331 self.pg_enable_capture(self.pg_interfaces)
3333 capture = self.pg0.get_capture(len(pkts))
3334 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3336 ses_num_end = self.nat64_get_ses_num()
3338 self.assertEqual(ses_num_end - ses_num_start, 3)
3340 # tenant with specific VRF
3341 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3342 self.vrf1_nat_addr_n,
3343 vrf_id=self.vrf1_id)
3344 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3346 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
3347 self.pg2.add_stream(pkts)
3348 self.pg_enable_capture(self.pg_interfaces)
3350 capture = self.pg1.get_capture(len(pkts))
3351 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3352 dst_ip=self.pg1.remote_ip4)
3354 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3355 self.pg1.add_stream(pkts)
3356 self.pg_enable_capture(self.pg_interfaces)
3358 capture = self.pg2.get_capture(len(pkts))
3359 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
3361 def test_static(self):
3362 """ NAT64 static translation test """
3363 self.tcp_port_in = 60303
3364 self.udp_port_in = 60304
3365 self.icmp_id_in = 60305
3366 self.tcp_port_out = 60303
3367 self.udp_port_out = 60304
3368 self.icmp_id_out = 60305
3370 ses_num_start = self.nat64_get_ses_num()
3372 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3374 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3375 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3377 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3382 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3387 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3394 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3395 self.pg0.add_stream(pkts)
3396 self.pg_enable_capture(self.pg_interfaces)
3398 capture = self.pg1.get_capture(len(pkts))
3399 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3400 dst_ip=self.pg1.remote_ip4, same_port=True)
3403 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3404 self.pg1.add_stream(pkts)
3405 self.pg_enable_capture(self.pg_interfaces)
3407 capture = self.pg0.get_capture(len(pkts))
3408 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3409 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3411 ses_num_end = self.nat64_get_ses_num()
3413 self.assertEqual(ses_num_end - ses_num_start, 3)
3415 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3416 def test_session_timeout(self):
3417 """ NAT64 session timeout """
3418 self.icmp_id_in = 1234
3419 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3421 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3422 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3423 self.vapi.nat64_set_timeouts(icmp=5)
3425 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3426 self.pg0.add_stream(pkts)
3427 self.pg_enable_capture(self.pg_interfaces)
3429 capture = self.pg1.get_capture(len(pkts))
3431 ses_num_before_timeout = self.nat64_get_ses_num()
3435 # ICMP session after timeout
3436 ses_num_after_timeout = self.nat64_get_ses_num()
3437 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
3439 def test_icmp_error(self):
3440 """ NAT64 ICMP Error message translation """
3441 self.tcp_port_in = 6303
3442 self.udp_port_in = 6304
3443 self.icmp_id_in = 6305
3445 ses_num_start = self.nat64_get_ses_num()
3447 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3449 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3450 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3452 # send some packets to create sessions
3453 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3454 self.pg0.add_stream(pkts)
3455 self.pg_enable_capture(self.pg_interfaces)
3457 capture_ip4 = self.pg1.get_capture(len(pkts))
3458 self.verify_capture_out(capture_ip4,
3459 nat_ip=self.nat_addr,
3460 dst_ip=self.pg1.remote_ip4)
3462 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3463 self.pg1.add_stream(pkts)
3464 self.pg_enable_capture(self.pg_interfaces)
3466 capture_ip6 = self.pg0.get_capture(len(pkts))
3467 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3468 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
3469 self.pg0.remote_ip6)
3472 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3473 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
3474 ICMPv6DestUnreach(code=1) /
3475 packet[IPv6] for packet in capture_ip6]
3476 self.pg0.add_stream(pkts)
3477 self.pg_enable_capture(self.pg_interfaces)
3479 capture = self.pg1.get_capture(len(pkts))
3480 for packet in capture:
3482 self.assertEqual(packet[IP].src, self.nat_addr)
3483 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3484 self.assertEqual(packet[ICMP].type, 3)
3485 self.assertEqual(packet[ICMP].code, 13)
3486 inner = packet[IPerror]
3487 self.assertEqual(inner.src, self.pg1.remote_ip4)
3488 self.assertEqual(inner.dst, self.nat_addr)
3489 self.check_icmp_checksum(packet)
3490 if inner.haslayer(TCPerror):
3491 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
3492 elif inner.haslayer(UDPerror):
3493 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
3495 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
3497 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3501 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3502 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3503 ICMP(type=3, code=13) /
3504 packet[IP] for packet in capture_ip4]
3505 self.pg1.add_stream(pkts)
3506 self.pg_enable_capture(self.pg_interfaces)
3508 capture = self.pg0.get_capture(len(pkts))
3509 for packet in capture:
3511 self.assertEqual(packet[IPv6].src, ip.src)
3512 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3513 icmp = packet[ICMPv6DestUnreach]
3514 self.assertEqual(icmp.code, 1)
3515 inner = icmp[IPerror6]
3516 self.assertEqual(inner.src, self.pg0.remote_ip6)
3517 self.assertEqual(inner.dst, ip.src)
3518 self.check_icmpv6_checksum(packet)
3519 if inner.haslayer(TCPerror):
3520 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
3521 elif inner.haslayer(UDPerror):
3522 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
3524 self.assertEqual(inner[ICMPv6EchoRequest].id,
3527 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3530 def test_hairpinning(self):
3531 """ NAT64 hairpinning """
3533 client = self.pg0.remote_hosts[0]
3534 server = self.pg0.remote_hosts[1]
3535 server_tcp_in_port = 22
3536 server_tcp_out_port = 4022
3537 server_udp_in_port = 23
3538 server_udp_out_port = 4023
3539 client_tcp_in_port = 1234
3540 client_udp_in_port = 1235
3541 client_tcp_out_port = 0
3542 client_udp_out_port = 0
3543 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
3544 nat_addr_ip6 = ip.src
3546 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3548 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3549 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3551 self.vapi.nat64_add_del_static_bib(server.ip6n,
3554 server_tcp_out_port,
3556 self.vapi.nat64_add_del_static_bib(server.ip6n,
3559 server_udp_out_port,
3564 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3565 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3566 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3568 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3569 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3570 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3572 self.pg0.add_stream(pkts)
3573 self.pg_enable_capture(self.pg_interfaces)
3575 capture = self.pg0.get_capture(len(pkts))
3576 for packet in capture:
3578 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3579 self.assertEqual(packet[IPv6].dst, server.ip6)
3580 if packet.haslayer(TCP):
3581 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3582 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3583 self.check_tcp_checksum(packet)
3584 client_tcp_out_port = packet[TCP].sport
3586 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3587 self.assertEqual(packet[UDP].dport, server_udp_in_port)
3588 self.check_udp_checksum(packet)
3589 client_udp_out_port = packet[UDP].sport
3591 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3596 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3597 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3598 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3600 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3601 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3602 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3604 self.pg0.add_stream(pkts)
3605 self.pg_enable_capture(self.pg_interfaces)
3607 capture = self.pg0.get_capture(len(pkts))
3608 for packet in capture:
3610 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3611 self.assertEqual(packet[IPv6].dst, client.ip6)
3612 if packet.haslayer(TCP):
3613 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3614 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3615 self.check_tcp_checksum(packet)
3617 self.assertEqual(packet[UDP].sport, server_udp_out_port)
3618 self.assertEqual(packet[UDP].dport, client_udp_in_port)
3619 self.check_udp_checksum(packet)
3621 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3626 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3627 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3628 ICMPv6DestUnreach(code=1) /
3629 packet[IPv6] for packet in capture]
3630 self.pg0.add_stream(pkts)
3631 self.pg_enable_capture(self.pg_interfaces)
3633 capture = self.pg0.get_capture(len(pkts))
3634 for packet in capture:
3636 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3637 self.assertEqual(packet[IPv6].dst, server.ip6)
3638 icmp = packet[ICMPv6DestUnreach]
3639 self.assertEqual(icmp.code, 1)
3640 inner = icmp[IPerror6]
3641 self.assertEqual(inner.src, server.ip6)
3642 self.assertEqual(inner.dst, nat_addr_ip6)
3643 self.check_icmpv6_checksum(packet)
3644 if inner.haslayer(TCPerror):
3645 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3646 self.assertEqual(inner[TCPerror].dport,
3647 client_tcp_out_port)
3649 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3650 self.assertEqual(inner[UDPerror].dport,
3651 client_udp_out_port)
3653 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3656 def test_prefix(self):
3657 """ NAT64 Network-Specific Prefix """
3659 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3661 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3662 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3663 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3664 self.vrf1_nat_addr_n,
3665 vrf_id=self.vrf1_id)
3666 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3669 global_pref64 = "2001:db8::"
3670 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
3671 global_pref64_len = 32
3672 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
3674 prefix = self.vapi.nat64_prefix_dump()
3675 self.assertEqual(len(prefix), 1)
3676 self.assertEqual(prefix[0].prefix, global_pref64_n)
3677 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
3678 self.assertEqual(prefix[0].vrf_id, 0)
3680 # Add tenant specific prefix
3681 vrf1_pref64 = "2001:db8:122:300::"
3682 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
3683 vrf1_pref64_len = 56
3684 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
3686 vrf_id=self.vrf1_id)
3687 prefix = self.vapi.nat64_prefix_dump()
3688 self.assertEqual(len(prefix), 2)
3691 pkts = self.create_stream_in_ip6(self.pg0,
3694 plen=global_pref64_len)
3695 self.pg0.add_stream(pkts)
3696 self.pg_enable_capture(self.pg_interfaces)
3698 capture = self.pg1.get_capture(len(pkts))
3699 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3700 dst_ip=self.pg1.remote_ip4)
3702 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3703 self.pg1.add_stream(pkts)
3704 self.pg_enable_capture(self.pg_interfaces)
3706 capture = self.pg0.get_capture(len(pkts))
3707 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3710 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
3712 # Tenant specific prefix
3713 pkts = self.create_stream_in_ip6(self.pg2,
3716 plen=vrf1_pref64_len)
3717 self.pg2.add_stream(pkts)
3718 self.pg_enable_capture(self.pg_interfaces)
3720 capture = self.pg1.get_capture(len(pkts))
3721 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3722 dst_ip=self.pg1.remote_ip4)
3724 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3725 self.pg1.add_stream(pkts)
3726 self.pg_enable_capture(self.pg_interfaces)
3728 capture = self.pg2.get_capture(len(pkts))
3729 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3732 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
3734 def test_unknown_proto(self):
3735 """ NAT64 translate packet with unknown protocol """
3737 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3739 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3740 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3741 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
3744 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3745 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
3746 TCP(sport=self.tcp_port_in, dport=20))
3747 self.pg0.add_stream(p)
3748 self.pg_enable_capture(self.pg_interfaces)
3750 p = self.pg1.get_capture(1)
3752 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3753 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
3755 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3756 TCP(sport=1234, dport=1234))
3757 self.pg0.add_stream(p)
3758 self.pg_enable_capture(self.pg_interfaces)
3760 p = self.pg1.get_capture(1)
3763 self.assertEqual(packet[IP].src, self.nat_addr)
3764 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3765 self.assertTrue(packet.haslayer(GRE))
3766 self.check_ip_checksum(packet)
3768 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3772 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3773 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3775 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3776 TCP(sport=1234, dport=1234))
3777 self.pg1.add_stream(p)
3778 self.pg_enable_capture(self.pg_interfaces)
3780 p = self.pg0.get_capture(1)
3783 self.assertEqual(packet[IPv6].src, remote_ip6)
3784 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3785 self.assertEqual(packet[IPv6].nh, 47)
3787 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3790 def test_hairpinning_unknown_proto(self):
3791 """ NAT64 translate packet with unknown protocol - hairpinning """
3793 client = self.pg0.remote_hosts[0]
3794 server = self.pg0.remote_hosts[1]
3795 server_tcp_in_port = 22
3796 server_tcp_out_port = 4022
3797 client_tcp_in_port = 1234
3798 client_tcp_out_port = 1235
3799 server_nat_ip = "10.0.0.100"
3800 client_nat_ip = "10.0.0.110"
3801 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
3802 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
3803 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
3804 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
3806 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
3808 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3809 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3811 self.vapi.nat64_add_del_static_bib(server.ip6n,
3814 server_tcp_out_port,
3817 self.vapi.nat64_add_del_static_bib(server.ip6n,
3823 self.vapi.nat64_add_del_static_bib(client.ip6n,
3826 client_tcp_out_port,
3830 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3831 IPv6(src=client.ip6, dst=server_nat_ip6) /
3832 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3833 self.pg0.add_stream(p)
3834 self.pg_enable_capture(self.pg_interfaces)
3836 p = self.pg0.get_capture(1)
3838 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3839 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
3841 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3842 TCP(sport=1234, dport=1234))
3843 self.pg0.add_stream(p)
3844 self.pg_enable_capture(self.pg_interfaces)
3846 p = self.pg0.get_capture(1)
3849 self.assertEqual(packet[IPv6].src, client_nat_ip6)
3850 self.assertEqual(packet[IPv6].dst, server.ip6)
3851 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3853 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3857 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3858 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
3860 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3861 TCP(sport=1234, dport=1234))
3862 self.pg0.add_stream(p)
3863 self.pg_enable_capture(self.pg_interfaces)
3865 p = self.pg0.get_capture(1)
3868 self.assertEqual(packet[IPv6].src, server_nat_ip6)
3869 self.assertEqual(packet[IPv6].dst, client.ip6)
3870 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3872 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3875 def test_one_armed_nat64(self):
3876 """ One armed NAT64 """
3878 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
3882 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3884 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
3885 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
3888 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
3889 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
3890 TCP(sport=12345, dport=80))
3891 self.pg3.add_stream(p)
3892 self.pg_enable_capture(self.pg_interfaces)
3894 capture = self.pg3.get_capture(1)
3899 self.assertEqual(ip.src, self.nat_addr)
3900 self.assertEqual(ip.dst, self.pg3.remote_ip4)
3901 self.assertNotEqual(tcp.sport, 12345)
3902 external_port = tcp.sport
3903 self.assertEqual(tcp.dport, 80)
3904 self.check_tcp_checksum(p)
3905 self.check_ip_checksum(p)
3907 self.logger.error(ppp("Unexpected or invalid packet:", p))
3911 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
3912 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
3913 TCP(sport=80, dport=external_port))
3914 self.pg3.add_stream(p)
3915 self.pg_enable_capture(self.pg_interfaces)
3917 capture = self.pg3.get_capture(1)
3922 self.assertEqual(ip.src, remote_host_ip6)
3923 self.assertEqual(ip.dst, self.pg3.remote_ip6)
3924 self.assertEqual(tcp.sport, 80)
3925 self.assertEqual(tcp.dport, 12345)
3926 self.check_tcp_checksum(p)
3928 self.logger.error(ppp("Unexpected or invalid packet:", p))
3931 def nat64_get_ses_num(self):
3933 Return number of active NAT64 sessions.
3935 st = self.vapi.nat64_st_dump()
3938 def clear_nat64(self):
3940 Clear NAT64 configuration.
3942 self.vapi.nat64_set_timeouts()
3944 interfaces = self.vapi.nat64_interface_dump()
3945 for intf in interfaces:
3946 if intf.is_inside > 1:
3947 self.vapi.nat64_add_del_interface(intf.sw_if_index,
3950 self.vapi.nat64_add_del_interface(intf.sw_if_index,
3954 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3957 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3965 bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
3968 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3976 bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
3979 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3987 adresses = self.vapi.nat64_pool_addr_dump()
3988 for addr in adresses:
3989 self.vapi.nat64_add_del_pool_addr_range(addr.address,
3994 prefixes = self.vapi.nat64_prefix_dump()
3995 for prefix in prefixes:
3996 self.vapi.nat64_add_del_prefix(prefix.prefix,
3998 vrf_id=prefix.vrf_id,
4002 super(TestNAT64, self).tearDown()
4003 if not self.vpp_dead:
4004 self.logger.info(self.vapi.cli("show nat64 pool"))
4005 self.logger.info(self.vapi.cli("show nat64 interfaces"))
4006 self.logger.info(self.vapi.cli("show nat64 prefix"))
4007 self.logger.info(self.vapi.cli("show nat64 bib all"))
4008 self.logger.info(self.vapi.cli("show nat64 session table all"))
4012 class TestDSlite(MethodHolder):
4013 """ DS-Lite Test Cases """
4016 def setUpClass(cls):
4017 super(TestDSlite, cls).setUpClass()
4020 cls.nat_addr = '10.0.0.3'
4021 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4023 cls.create_pg_interfaces(range(2))
4025 cls.pg0.config_ip4()
4026 cls.pg0.resolve_arp()
4028 cls.pg1.config_ip6()
4029 cls.pg1.generate_remote_hosts(2)
4030 cls.pg1.configure_ipv6_neighbors()
4033 super(TestDSlite, cls).tearDownClass()
4036 def test_dslite(self):
4037 """ Test DS-Lite """
4038 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
4040 aftr_ip4 = '192.0.0.1'
4041 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
4042 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
4043 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
4044 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
4047 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4048 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
4049 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4050 UDP(sport=20000, dport=10000))
4051 self.pg1.add_stream(p)
4052 self.pg_enable_capture(self.pg_interfaces)
4054 capture = self.pg0.get_capture(1)
4055 capture = capture[0]
4056 self.assertFalse(capture.haslayer(IPv6))
4057 self.assertEqual(capture[IP].src, self.nat_addr)
4058 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4059 self.assertNotEqual(capture[UDP].sport, 20000)
4060 self.assertEqual(capture[UDP].dport, 10000)
4061 self.check_ip_checksum(capture)
4062 out_port = capture[UDP].sport
4064 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4065 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4066 UDP(sport=10000, dport=out_port))
4067 self.pg0.add_stream(p)
4068 self.pg_enable_capture(self.pg_interfaces)
4070 capture = self.pg1.get_capture(1)
4071 capture = capture[0]
4072 self.assertEqual(capture[IPv6].src, aftr_ip6)
4073 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
4074 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4075 self.assertEqual(capture[IP].dst, '192.168.1.1')
4076 self.assertEqual(capture[UDP].sport, 10000)
4077 self.assertEqual(capture[UDP].dport, 20000)
4078 self.check_ip_checksum(capture)
4081 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4082 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
4083 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4084 TCP(sport=20001, dport=10001))
4085 self.pg1.add_stream(p)
4086 self.pg_enable_capture(self.pg_interfaces)
4088 capture = self.pg0.get_capture(1)
4089 capture = capture[0]
4090 self.assertFalse(capture.haslayer(IPv6))
4091 self.assertEqual(capture[IP].src, self.nat_addr)
4092 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4093 self.assertNotEqual(capture[TCP].sport, 20001)
4094 self.assertEqual(capture[TCP].dport, 10001)
4095 self.check_ip_checksum(capture)
4096 self.check_tcp_checksum(capture)
4097 out_port = capture[TCP].sport
4099 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4100 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4101 TCP(sport=10001, dport=out_port))
4102 self.pg0.add_stream(p)
4103 self.pg_enable_capture(self.pg_interfaces)
4105 capture = self.pg1.get_capture(1)
4106 capture = capture[0]
4107 self.assertEqual(capture[IPv6].src, aftr_ip6)
4108 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4109 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4110 self.assertEqual(capture[IP].dst, '192.168.1.1')
4111 self.assertEqual(capture[TCP].sport, 10001)
4112 self.assertEqual(capture[TCP].dport, 20001)
4113 self.check_ip_checksum(capture)
4114 self.check_tcp_checksum(capture)
4117 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4118 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
4119 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4120 ICMP(id=4000, type='echo-request'))
4121 self.pg1.add_stream(p)
4122 self.pg_enable_capture(self.pg_interfaces)
4124 capture = self.pg0.get_capture(1)
4125 capture = capture[0]
4126 self.assertFalse(capture.haslayer(IPv6))
4127 self.assertEqual(capture[IP].src, self.nat_addr)
4128 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4129 self.assertNotEqual(capture[ICMP].id, 4000)
4130 self.check_ip_checksum(capture)
4131 self.check_icmp_checksum(capture)
4132 out_id = capture[ICMP].id
4134 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4135 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4136 ICMP(id=out_id, type='echo-reply'))
4137 self.pg0.add_stream(p)
4138 self.pg_enable_capture(self.pg_interfaces)
4140 capture = self.pg1.get_capture(1)
4141 capture = capture[0]
4142 self.assertEqual(capture[IPv6].src, aftr_ip6)
4143 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4144 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4145 self.assertEqual(capture[IP].dst, '192.168.1.1')
4146 self.assertEqual(capture[ICMP].id, 4000)
4147 self.check_ip_checksum(capture)
4148 self.check_icmp_checksum(capture)
4151 super(TestDSlite, self).tearDown()
4152 if not self.vpp_dead:
4153 self.logger.info(self.vapi.cli("show dslite pool"))
4155 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
4156 self.logger.info(self.vapi.cli("show dslite sessions"))
4158 if __name__ == '__main__':
4159 unittest.main(testRunner=VppTestRunner)