9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
31 super(MethodHolder, cls).setUpClass()
34 super(MethodHolder, self).tearDown()
36 def check_ip_checksum(self, pkt):
38 Check IP checksum of the packet
40 :param pkt: Packet to check IP checksum
42 new = pkt.__class__(str(pkt))
44 new = new.__class__(str(new))
45 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
47 def check_tcp_checksum(self, pkt):
49 Check TCP checksum in IP packet
51 :param pkt: Packet to check TCP checksum
53 new = pkt.__class__(str(pkt))
55 new = new.__class__(str(new))
56 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
58 def check_udp_checksum(self, pkt):
60 Check UDP checksum in IP packet
62 :param pkt: Packet to check UDP checksum
64 new = pkt.__class__(str(pkt))
66 new = new.__class__(str(new))
67 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
69 def check_icmp_errror_embedded(self, pkt):
71 Check ICMP error embeded packet checksum
73 :param pkt: Packet to check ICMP error embeded packet checksum
75 if pkt.haslayer(IPerror):
76 new = pkt.__class__(str(pkt))
77 del new['IPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
81 if pkt.haslayer(TCPerror):
82 new = pkt.__class__(str(pkt))
83 del new['TCPerror'].chksum
84 new = new.__class__(str(new))
85 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
87 if pkt.haslayer(UDPerror):
88 if pkt['UDPerror'].chksum != 0:
89 new = pkt.__class__(str(pkt))
90 del new['UDPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['UDPerror'].chksum,
93 pkt['UDPerror'].chksum)
95 if pkt.haslayer(ICMPerror):
96 del new['ICMPerror'].chksum
97 new = new.__class__(str(new))
98 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
100 def check_icmp_checksum(self, pkt):
102 Check ICMP checksum in IPv4 packet
104 :param pkt: Packet to check ICMP checksum
106 new = pkt.__class__(str(pkt))
107 del new['ICMP'].chksum
108 new = new.__class__(str(new))
109 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110 if pkt.haslayer(IPerror):
111 self.check_icmp_errror_embedded(pkt)
113 def check_icmpv6_checksum(self, pkt):
115 Check ICMPv6 checksum in IPv4 packet
117 :param pkt: Packet to check ICMPv6 checksum
119 new = pkt.__class__(str(pkt))
120 if pkt.haslayer(ICMPv6DestUnreach):
121 del new['ICMPv6DestUnreach'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124 pkt['ICMPv6DestUnreach'].cksum)
125 self.check_icmp_errror_embedded(pkt)
126 if pkt.haslayer(ICMPv6EchoRequest):
127 del new['ICMPv6EchoRequest'].cksum
128 new = new.__class__(str(new))
129 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130 pkt['ICMPv6EchoRequest'].cksum)
131 if pkt.haslayer(ICMPv6EchoReply):
132 del new['ICMPv6EchoReply'].cksum
133 new = new.__class__(str(new))
134 self.assertEqual(new['ICMPv6EchoReply'].cksum,
135 pkt['ICMPv6EchoReply'].cksum)
137 def create_stream_in(self, in_if, out_if, ttl=64):
139 Create packet stream for inside network
141 :param in_if: Inside interface
142 :param out_if: Outside interface
143 :param ttl: TTL of generated packets
147 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
148 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
149 TCP(sport=self.tcp_port_in, dport=20))
153 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
154 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
155 UDP(sport=self.udp_port_in, dport=20))
159 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
160 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
161 ICMP(id=self.icmp_id_in, type='echo-request'))
166 def compose_ip6(self, ip4, pref, plen):
168 Compose IPv4-embedded IPv6 addresses
170 :param ip4: IPv4 address
171 :param pref: IPv6 prefix
172 :param plen: IPv6 prefix length
173 :returns: IPv4-embedded IPv6 addresses
175 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
176 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
191 pref_n[10] = ip4_n[3]
195 pref_n[10] = ip4_n[2]
196 pref_n[11] = ip4_n[3]
199 pref_n[10] = ip4_n[1]
200 pref_n[11] = ip4_n[2]
201 pref_n[12] = ip4_n[3]
203 pref_n[12] = ip4_n[0]
204 pref_n[13] = ip4_n[1]
205 pref_n[14] = ip4_n[2]
206 pref_n[15] = ip4_n[3]
207 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
209 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
211 Create IPv6 packet stream for inside network
213 :param in_if: Inside interface
214 :param out_if: Outside interface
215 :param ttl: Hop Limit of generated packets
216 :param pref: NAT64 prefix
217 :param plen: NAT64 prefix length
221 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
223 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
226 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
227 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
228 TCP(sport=self.tcp_port_in, dport=20))
232 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
233 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
234 UDP(sport=self.udp_port_in, dport=20))
238 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
239 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
240 ICMPv6EchoRequest(id=self.icmp_id_in))
245 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
247 Create packet stream for outside network
249 :param out_if: Outside interface
250 :param dst_ip: Destination IP address (Default use global NAT address)
251 :param ttl: TTL of generated packets
254 dst_ip = self.nat_addr
257 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
258 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
259 TCP(dport=self.tcp_port_out, sport=20))
263 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
264 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
265 UDP(dport=self.udp_port_out, sport=20))
269 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
270 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
271 ICMP(id=self.icmp_id_out, type='echo-reply'))
276 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
277 packet_num=3, dst_ip=None):
279 Verify captured packets on outside network
281 :param capture: Captured packets
282 :param nat_ip: Translated IP address (Default use global NAT address)
283 :param same_port: Sorce port number is not translated (Default False)
284 :param packet_num: Expected number of packets (Default 3)
285 :param dst_ip: Destination IP address (Default do not verify)
288 nat_ip = self.nat_addr
289 self.assertEqual(packet_num, len(capture))
290 for packet in capture:
292 self.check_ip_checksum(packet)
293 self.assertEqual(packet[IP].src, nat_ip)
294 if dst_ip is not None:
295 self.assertEqual(packet[IP].dst, dst_ip)
296 if packet.haslayer(TCP):
298 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
301 packet[TCP].sport, self.tcp_port_in)
302 self.tcp_port_out = packet[TCP].sport
303 self.check_tcp_checksum(packet)
304 elif packet.haslayer(UDP):
306 self.assertEqual(packet[UDP].sport, self.udp_port_in)
309 packet[UDP].sport, self.udp_port_in)
310 self.udp_port_out = packet[UDP].sport
313 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
315 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
316 self.icmp_id_out = packet[ICMP].id
317 self.check_icmp_checksum(packet)
319 self.logger.error(ppp("Unexpected or invalid packet "
320 "(outside network):", packet))
323 def verify_capture_in(self, capture, in_if, packet_num=3):
325 Verify captured packets on inside network
327 :param capture: Captured packets
328 :param in_if: Inside interface
329 :param packet_num: Expected number of packets (Default 3)
331 self.assertEqual(packet_num, len(capture))
332 for packet in capture:
334 self.check_ip_checksum(packet)
335 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
336 if packet.haslayer(TCP):
337 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
338 self.check_tcp_checksum(packet)
339 elif packet.haslayer(UDP):
340 self.assertEqual(packet[UDP].dport, self.udp_port_in)
342 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
343 self.check_icmp_checksum(packet)
345 self.logger.error(ppp("Unexpected or invalid packet "
346 "(inside network):", packet))
349 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
351 Verify captured IPv6 packets on inside network
353 :param capture: Captured packets
354 :param src_ip: Source IP
355 :param dst_ip: Destination IP address
356 :param packet_num: Expected number of packets (Default 3)
358 self.assertEqual(packet_num, len(capture))
359 for packet in capture:
361 self.assertEqual(packet[IPv6].src, src_ip)
362 self.assertEqual(packet[IPv6].dst, dst_ip)
363 if packet.haslayer(TCP):
364 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
365 self.check_tcp_checksum(packet)
366 elif packet.haslayer(UDP):
367 self.assertEqual(packet[UDP].dport, self.udp_port_in)
368 self.check_udp_checksum(packet)
370 self.assertEqual(packet[ICMPv6EchoReply].id,
372 self.check_icmpv6_checksum(packet)
374 self.logger.error(ppp("Unexpected or invalid packet "
375 "(inside network):", packet))
378 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
380 Verify captured packet that don't have to be translated
382 :param capture: Captured packets
383 :param ingress_if: Ingress interface
384 :param egress_if: Egress interface
386 for packet in capture:
388 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
389 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
390 if packet.haslayer(TCP):
391 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
392 elif packet.haslayer(UDP):
393 self.assertEqual(packet[UDP].sport, self.udp_port_in)
395 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
397 self.logger.error(ppp("Unexpected or invalid packet "
398 "(inside network):", packet))
401 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
402 packet_num=3, icmp_type=11):
404 Verify captured packets with ICMP errors on outside network
406 :param capture: Captured packets
407 :param src_ip: Translated IP address or IP address of VPP
408 (Default use global NAT address)
409 :param packet_num: Expected number of packets (Default 3)
410 :param icmp_type: Type of error ICMP packet
411 we are expecting (Default 11)
414 src_ip = self.nat_addr
415 self.assertEqual(packet_num, len(capture))
416 for packet in capture:
418 self.assertEqual(packet[IP].src, src_ip)
419 self.assertTrue(packet.haslayer(ICMP))
421 self.assertEqual(icmp.type, icmp_type)
422 self.assertTrue(icmp.haslayer(IPerror))
423 inner_ip = icmp[IPerror]
424 if inner_ip.haslayer(TCPerror):
425 self.assertEqual(inner_ip[TCPerror].dport,
427 elif inner_ip.haslayer(UDPerror):
428 self.assertEqual(inner_ip[UDPerror].dport,
431 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
433 self.logger.error(ppp("Unexpected or invalid packet "
434 "(outside network):", packet))
437 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
440 Verify captured packets with ICMP errors on inside network
442 :param capture: Captured packets
443 :param in_if: Inside interface
444 :param packet_num: Expected number of packets (Default 3)
445 :param icmp_type: Type of error ICMP packet
446 we are expecting (Default 11)
448 self.assertEqual(packet_num, len(capture))
449 for packet in capture:
451 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
452 self.assertTrue(packet.haslayer(ICMP))
454 self.assertEqual(icmp.type, icmp_type)
455 self.assertTrue(icmp.haslayer(IPerror))
456 inner_ip = icmp[IPerror]
457 if inner_ip.haslayer(TCPerror):
458 self.assertEqual(inner_ip[TCPerror].sport,
460 elif inner_ip.haslayer(UDPerror):
461 self.assertEqual(inner_ip[UDPerror].sport,
464 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
466 self.logger.error(ppp("Unexpected or invalid packet "
467 "(inside network):", packet))
470 def create_stream_frag(self, src_if, dst, sport, dport, data):
472 Create fragmented packet stream
474 :param src_if: Source interface
475 :param dst: Destination IPv4 address
476 :param sport: Source TCP port
477 :param dport: Destination TCP port
478 :param data: Payload data
481 id = random.randint(0, 65535)
482 p = (IP(src=src_if.remote_ip4, dst=dst) /
483 TCP(sport=sport, dport=dport) /
485 p = p.__class__(str(p))
486 chksum = p['TCP'].chksum
488 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
489 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
490 TCP(sport=sport, dport=dport, chksum=chksum) /
493 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
494 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
495 proto=IP_PROTOS.tcp) /
498 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
499 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
505 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
506 pref=None, plen=0, frag_size=128):
508 Create fragmented packet stream
510 :param src_if: Source interface
511 :param dst: Destination IPv4 address
512 :param sport: Source TCP port
513 :param dport: Destination TCP port
514 :param data: Payload data
515 :param pref: NAT64 prefix
516 :param plen: NAT64 prefix length
517 :param fragsize: size of fragments
521 dst_ip6 = ''.join(['64:ff9b::', dst])
523 dst_ip6 = self.compose_ip6(dst, pref, plen)
525 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
526 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
527 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
528 TCP(sport=sport, dport=dport) /
531 return fragment6(p, frag_size)
533 def reass_frags_and_verify(self, frags, src, dst):
535 Reassemble and verify fragmented packet
537 :param frags: Captured fragments
538 :param src: Source IPv4 address to verify
539 :param dst: Destination IPv4 address to verify
541 :returns: Reassembled IPv4 packet
543 buffer = StringIO.StringIO()
545 self.assertEqual(p[IP].src, src)
546 self.assertEqual(p[IP].dst, dst)
547 self.check_ip_checksum(p)
548 buffer.seek(p[IP].frag * 8)
549 buffer.write(p[IP].payload)
550 ip = frags[0].getlayer(IP)
551 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
552 proto=frags[0][IP].proto)
553 if ip.proto == IP_PROTOS.tcp:
554 p = (ip / TCP(buffer.getvalue()))
555 self.check_tcp_checksum(p)
556 elif ip.proto == IP_PROTOS.udp:
557 p = (ip / UDP(buffer.getvalue()))
560 def reass_frags_and_verify_ip6(self, frags, src, dst):
562 Reassemble and verify fragmented packet
564 :param frags: Captured fragments
565 :param src: Source IPv6 address to verify
566 :param dst: Destination IPv6 address to verify
568 :returns: Reassembled IPv6 packet
570 buffer = StringIO.StringIO()
572 self.assertEqual(p[IPv6].src, src)
573 self.assertEqual(p[IPv6].dst, dst)
574 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
575 buffer.write(p[IPv6ExtHdrFragment].payload)
576 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
577 nh=frags[0][IPv6ExtHdrFragment].nh)
578 if ip.nh == IP_PROTOS.tcp:
579 p = (ip / TCP(buffer.getvalue()))
580 self.check_tcp_checksum(p)
581 elif ip.nh == IP_PROTOS.udp:
582 p = (ip / UDP(buffer.getvalue()))
585 def verify_ipfix_nat44_ses(self, data):
587 Verify IPFIX NAT44 session create/delete event
589 :param data: Decoded IPFIX data records
591 nat44_ses_create_num = 0
592 nat44_ses_delete_num = 0
593 self.assertEqual(6, len(data))
596 self.assertIn(ord(record[230]), [4, 5])
597 if ord(record[230]) == 4:
598 nat44_ses_create_num += 1
600 nat44_ses_delete_num += 1
602 self.assertEqual(self.pg0.remote_ip4n, record[8])
603 # postNATSourceIPv4Address
604 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
607 self.assertEqual(struct.pack("!I", 0), record[234])
608 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
609 if IP_PROTOS.icmp == ord(record[4]):
610 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
611 self.assertEqual(struct.pack("!H", self.icmp_id_out),
613 elif IP_PROTOS.tcp == ord(record[4]):
614 self.assertEqual(struct.pack("!H", self.tcp_port_in),
616 self.assertEqual(struct.pack("!H", self.tcp_port_out),
618 elif IP_PROTOS.udp == ord(record[4]):
619 self.assertEqual(struct.pack("!H", self.udp_port_in),
621 self.assertEqual(struct.pack("!H", self.udp_port_out),
624 self.fail("Invalid protocol")
625 self.assertEqual(3, nat44_ses_create_num)
626 self.assertEqual(3, nat44_ses_delete_num)
628 def verify_ipfix_addr_exhausted(self, data):
630 Verify IPFIX NAT addresses event
632 :param data: Decoded IPFIX data records
634 self.assertEqual(1, len(data))
637 self.assertEqual(ord(record[230]), 3)
639 self.assertEqual(struct.pack("!I", 0), record[283])
642 class TestNAT44(MethodHolder):
643 """ NAT44 Test Cases """
647 super(TestNAT44, cls).setUpClass()
650 cls.tcp_port_in = 6303
651 cls.tcp_port_out = 6303
652 cls.udp_port_in = 6304
653 cls.udp_port_out = 6304
654 cls.icmp_id_in = 6305
655 cls.icmp_id_out = 6305
656 cls.nat_addr = '10.0.0.3'
657 cls.ipfix_src_port = 4739
658 cls.ipfix_domain_id = 1
660 cls.create_pg_interfaces(range(10))
661 cls.interfaces = list(cls.pg_interfaces[0:4])
663 for i in cls.interfaces:
668 cls.pg0.generate_remote_hosts(3)
669 cls.pg0.configure_ipv4_neighbors()
671 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
672 cls.vapi.ip_table_add_del(10, is_add=1)
673 cls.vapi.ip_table_add_del(20, is_add=1)
675 cls.pg4._local_ip4 = "172.16.255.1"
676 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
677 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
678 cls.pg4.set_table_ip4(10)
679 cls.pg5._local_ip4 = "172.17.255.3"
680 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
681 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
682 cls.pg5.set_table_ip4(10)
683 cls.pg6._local_ip4 = "172.16.255.1"
684 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
685 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
686 cls.pg6.set_table_ip4(20)
687 for i in cls.overlapping_interfaces:
695 cls.pg9.generate_remote_hosts(2)
697 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
698 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
702 cls.pg9.resolve_arp()
703 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
704 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
705 cls.pg9.resolve_arp()
708 super(TestNAT44, cls).tearDownClass()
711 def clear_nat44(self):
713 Clear NAT44 configuration.
715 # I found no elegant way to do this
716 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
717 dst_address_length=32,
718 next_hop_address=self.pg7.remote_ip4n,
719 next_hop_sw_if_index=self.pg7.sw_if_index,
721 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
722 dst_address_length=32,
723 next_hop_address=self.pg8.remote_ip4n,
724 next_hop_sw_if_index=self.pg8.sw_if_index,
727 for intf in [self.pg7, self.pg8]:
728 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
730 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
735 if self.pg7.has_ip4_config:
736 self.pg7.unconfig_ip4()
738 interfaces = self.vapi.nat44_interface_addr_dump()
739 for intf in interfaces:
740 self.vapi.nat44_add_interface_addr(intf.sw_if_index, is_add=0)
742 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
743 domain_id=self.ipfix_domain_id)
744 self.ipfix_src_port = 4739
745 self.ipfix_domain_id = 1
747 interfaces = self.vapi.nat44_interface_dump()
748 for intf in interfaces:
749 if intf.is_inside > 1:
750 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
753 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
757 interfaces = self.vapi.nat44_interface_output_feature_dump()
758 for intf in interfaces:
759 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
763 static_mappings = self.vapi.nat44_static_mapping_dump()
764 for sm in static_mappings:
765 self.vapi.nat44_add_del_static_mapping(
767 sm.external_ip_address,
768 local_port=sm.local_port,
769 external_port=sm.external_port,
770 addr_only=sm.addr_only,
772 protocol=sm.protocol,
775 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
776 for lb_sm in lb_static_mappings:
777 self.vapi.nat44_add_del_lb_static_mapping(
786 identity_mappings = self.vapi.nat44_identity_mapping_dump()
787 for id_m in identity_mappings:
788 self.vapi.nat44_add_del_identity_mapping(
789 addr_only=id_m.addr_only,
792 sw_if_index=id_m.sw_if_index,
794 protocol=id_m.protocol,
797 adresses = self.vapi.nat44_address_dump()
798 for addr in adresses:
799 self.vapi.nat44_add_del_address_range(addr.ip_address,
803 self.vapi.nat_set_reass()
804 self.vapi.nat_set_reass(is_ip6=1)
806 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
807 local_port=0, external_port=0, vrf_id=0,
808 is_add=1, external_sw_if_index=0xFFFFFFFF,
811 Add/delete NAT44 static mapping
813 :param local_ip: Local IP address
814 :param external_ip: External IP address
815 :param local_port: Local port number (Optional)
816 :param external_port: External port number (Optional)
817 :param vrf_id: VRF ID (Default 0)
818 :param is_add: 1 if add, 0 if delete (Default add)
819 :param external_sw_if_index: External interface instead of IP address
820 :param proto: IP protocol (Mandatory if port specified)
823 if local_port and external_port:
825 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
826 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
827 self.vapi.nat44_add_del_static_mapping(
830 external_sw_if_index,
838 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
840 Add/delete NAT44 address
842 :param ip: IP address
843 :param is_add: 1 if add, 0 if delete (Default add)
845 nat_addr = socket.inet_pton(socket.AF_INET, ip)
846 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
849 def test_dynamic(self):
850 """ NAT44 dynamic translation test """
852 self.nat44_add_address(self.nat_addr)
853 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
854 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
858 pkts = self.create_stream_in(self.pg0, self.pg1)
859 self.pg0.add_stream(pkts)
860 self.pg_enable_capture(self.pg_interfaces)
862 capture = self.pg1.get_capture(len(pkts))
863 self.verify_capture_out(capture)
866 pkts = self.create_stream_out(self.pg1)
867 self.pg1.add_stream(pkts)
868 self.pg_enable_capture(self.pg_interfaces)
870 capture = self.pg0.get_capture(len(pkts))
871 self.verify_capture_in(capture, self.pg0)
873 def test_dynamic_icmp_errors_in2out_ttl_1(self):
874 """ NAT44 handling of client packets with TTL=1 """
876 self.nat44_add_address(self.nat_addr)
877 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
878 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
881 # Client side - generate traffic
882 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
883 self.pg0.add_stream(pkts)
884 self.pg_enable_capture(self.pg_interfaces)
887 # Client side - verify ICMP type 11 packets
888 capture = self.pg0.get_capture(len(pkts))
889 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
891 def test_dynamic_icmp_errors_out2in_ttl_1(self):
892 """ NAT44 handling of server packets with TTL=1 """
894 self.nat44_add_address(self.nat_addr)
895 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
896 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
899 # Client side - create sessions
900 pkts = self.create_stream_in(self.pg0, self.pg1)
901 self.pg0.add_stream(pkts)
902 self.pg_enable_capture(self.pg_interfaces)
905 # Server side - generate traffic
906 capture = self.pg1.get_capture(len(pkts))
907 self.verify_capture_out(capture)
908 pkts = self.create_stream_out(self.pg1, ttl=1)
909 self.pg1.add_stream(pkts)
910 self.pg_enable_capture(self.pg_interfaces)
913 # Server side - verify ICMP type 11 packets
914 capture = self.pg1.get_capture(len(pkts))
915 self.verify_capture_out_with_icmp_errors(capture,
916 src_ip=self.pg1.local_ip4)
918 def test_dynamic_icmp_errors_in2out_ttl_2(self):
919 """ NAT44 handling of error responses to client packets with TTL=2 """
921 self.nat44_add_address(self.nat_addr)
922 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
923 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
926 # Client side - generate traffic
927 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
928 self.pg0.add_stream(pkts)
929 self.pg_enable_capture(self.pg_interfaces)
932 # Server side - simulate ICMP type 11 response
933 capture = self.pg1.get_capture(len(pkts))
934 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
935 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
936 ICMP(type=11) / packet[IP] for packet in capture]
937 self.pg1.add_stream(pkts)
938 self.pg_enable_capture(self.pg_interfaces)
941 # Client side - verify ICMP type 11 packets
942 capture = self.pg0.get_capture(len(pkts))
943 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
945 def test_dynamic_icmp_errors_out2in_ttl_2(self):
946 """ NAT44 handling of error responses to server packets with TTL=2 """
948 self.nat44_add_address(self.nat_addr)
949 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
950 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
953 # Client side - create sessions
954 pkts = self.create_stream_in(self.pg0, self.pg1)
955 self.pg0.add_stream(pkts)
956 self.pg_enable_capture(self.pg_interfaces)
959 # Server side - generate traffic
960 capture = self.pg1.get_capture(len(pkts))
961 self.verify_capture_out(capture)
962 pkts = self.create_stream_out(self.pg1, ttl=2)
963 self.pg1.add_stream(pkts)
964 self.pg_enable_capture(self.pg_interfaces)
967 # Client side - simulate ICMP type 11 response
968 capture = self.pg0.get_capture(len(pkts))
969 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
970 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
971 ICMP(type=11) / packet[IP] for packet in capture]
972 self.pg0.add_stream(pkts)
973 self.pg_enable_capture(self.pg_interfaces)
976 # Server side - verify ICMP type 11 packets
977 capture = self.pg1.get_capture(len(pkts))
978 self.verify_capture_out_with_icmp_errors(capture)
980 def test_ping_out_interface_from_outside(self):
981 """ Ping NAT44 out interface from outside network """
983 self.nat44_add_address(self.nat_addr)
984 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
985 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
988 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
989 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
990 ICMP(id=self.icmp_id_out, type='echo-request'))
992 self.pg1.add_stream(pkts)
993 self.pg_enable_capture(self.pg_interfaces)
995 capture = self.pg1.get_capture(len(pkts))
996 self.assertEqual(1, len(capture))
999 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1000 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1001 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1002 self.assertEqual(packet[ICMP].type, 0) # echo reply
1004 self.logger.error(ppp("Unexpected or invalid packet "
1005 "(outside network):", packet))
1008 def test_ping_internal_host_from_outside(self):
1009 """ Ping internal host from outside network """
1011 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1012 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1013 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1017 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1018 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1019 ICMP(id=self.icmp_id_out, type='echo-request'))
1020 self.pg1.add_stream(pkt)
1021 self.pg_enable_capture(self.pg_interfaces)
1023 capture = self.pg0.get_capture(1)
1024 self.verify_capture_in(capture, self.pg0, packet_num=1)
1025 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1028 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1029 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1030 ICMP(id=self.icmp_id_in, type='echo-reply'))
1031 self.pg0.add_stream(pkt)
1032 self.pg_enable_capture(self.pg_interfaces)
1034 capture = self.pg1.get_capture(1)
1035 self.verify_capture_out(capture, same_port=True, packet_num=1)
1036 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1038 def test_static_in(self):
1039 """ 1:1 NAT initialized from inside network """
1041 nat_ip = "10.0.0.10"
1042 self.tcp_port_out = 6303
1043 self.udp_port_out = 6304
1044 self.icmp_id_out = 6305
1046 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1047 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1048 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1052 pkts = self.create_stream_in(self.pg0, self.pg1)
1053 self.pg0.add_stream(pkts)
1054 self.pg_enable_capture(self.pg_interfaces)
1056 capture = self.pg1.get_capture(len(pkts))
1057 self.verify_capture_out(capture, nat_ip, True)
1060 pkts = self.create_stream_out(self.pg1, nat_ip)
1061 self.pg1.add_stream(pkts)
1062 self.pg_enable_capture(self.pg_interfaces)
1064 capture = self.pg0.get_capture(len(pkts))
1065 self.verify_capture_in(capture, self.pg0)
1067 def test_static_out(self):
1068 """ 1:1 NAT initialized from outside network """
1070 nat_ip = "10.0.0.20"
1071 self.tcp_port_out = 6303
1072 self.udp_port_out = 6304
1073 self.icmp_id_out = 6305
1075 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1076 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1077 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1081 pkts = self.create_stream_out(self.pg1, nat_ip)
1082 self.pg1.add_stream(pkts)
1083 self.pg_enable_capture(self.pg_interfaces)
1085 capture = self.pg0.get_capture(len(pkts))
1086 self.verify_capture_in(capture, self.pg0)
1089 pkts = self.create_stream_in(self.pg0, self.pg1)
1090 self.pg0.add_stream(pkts)
1091 self.pg_enable_capture(self.pg_interfaces)
1093 capture = self.pg1.get_capture(len(pkts))
1094 self.verify_capture_out(capture, nat_ip, True)
1096 def test_static_with_port_in(self):
1097 """ 1:1 NAPT initialized from inside network """
1099 self.tcp_port_out = 3606
1100 self.udp_port_out = 3607
1101 self.icmp_id_out = 3608
1103 self.nat44_add_address(self.nat_addr)
1104 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1105 self.tcp_port_in, self.tcp_port_out,
1106 proto=IP_PROTOS.tcp)
1107 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1108 self.udp_port_in, self.udp_port_out,
1109 proto=IP_PROTOS.udp)
1110 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1111 self.icmp_id_in, self.icmp_id_out,
1112 proto=IP_PROTOS.icmp)
1113 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1114 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1118 pkts = self.create_stream_in(self.pg0, self.pg1)
1119 self.pg0.add_stream(pkts)
1120 self.pg_enable_capture(self.pg_interfaces)
1122 capture = self.pg1.get_capture(len(pkts))
1123 self.verify_capture_out(capture)
1126 pkts = self.create_stream_out(self.pg1)
1127 self.pg1.add_stream(pkts)
1128 self.pg_enable_capture(self.pg_interfaces)
1130 capture = self.pg0.get_capture(len(pkts))
1131 self.verify_capture_in(capture, self.pg0)
1133 def test_static_with_port_out(self):
1134 """ 1:1 NAPT initialized from outside network """
1136 self.tcp_port_out = 30606
1137 self.udp_port_out = 30607
1138 self.icmp_id_out = 30608
1140 self.nat44_add_address(self.nat_addr)
1141 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1142 self.tcp_port_in, self.tcp_port_out,
1143 proto=IP_PROTOS.tcp)
1144 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1145 self.udp_port_in, self.udp_port_out,
1146 proto=IP_PROTOS.udp)
1147 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1148 self.icmp_id_in, self.icmp_id_out,
1149 proto=IP_PROTOS.icmp)
1150 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1151 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1155 pkts = self.create_stream_out(self.pg1)
1156 self.pg1.add_stream(pkts)
1157 self.pg_enable_capture(self.pg_interfaces)
1159 capture = self.pg0.get_capture(len(pkts))
1160 self.verify_capture_in(capture, self.pg0)
1163 pkts = self.create_stream_in(self.pg0, self.pg1)
1164 self.pg0.add_stream(pkts)
1165 self.pg_enable_capture(self.pg_interfaces)
1167 capture = self.pg1.get_capture(len(pkts))
1168 self.verify_capture_out(capture)
1170 def test_static_vrf_aware(self):
1171 """ 1:1 NAT VRF awareness """
1173 nat_ip1 = "10.0.0.30"
1174 nat_ip2 = "10.0.0.40"
1175 self.tcp_port_out = 6303
1176 self.udp_port_out = 6304
1177 self.icmp_id_out = 6305
1179 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1181 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1183 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1185 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1186 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1188 # inside interface VRF match NAT44 static mapping VRF
1189 pkts = self.create_stream_in(self.pg4, self.pg3)
1190 self.pg4.add_stream(pkts)
1191 self.pg_enable_capture(self.pg_interfaces)
1193 capture = self.pg3.get_capture(len(pkts))
1194 self.verify_capture_out(capture, nat_ip1, True)
1196 # inside interface VRF don't match NAT44 static mapping VRF (packets
1198 pkts = self.create_stream_in(self.pg0, self.pg3)
1199 self.pg0.add_stream(pkts)
1200 self.pg_enable_capture(self.pg_interfaces)
1202 self.pg3.assert_nothing_captured()
1204 def test_identity_nat(self):
1205 """ Identity NAT """
1207 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1208 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1209 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1212 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1213 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1214 TCP(sport=12345, dport=56789))
1215 self.pg1.add_stream(p)
1216 self.pg_enable_capture(self.pg_interfaces)
1218 capture = self.pg0.get_capture(1)
1223 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1224 self.assertEqual(ip.src, self.pg1.remote_ip4)
1225 self.assertEqual(tcp.dport, 56789)
1226 self.assertEqual(tcp.sport, 12345)
1227 self.check_tcp_checksum(p)
1228 self.check_ip_checksum(p)
1230 self.logger.error(ppp("Unexpected or invalid packet:", p))
1233 def test_static_lb(self):
1234 """ NAT44 local service load balancing """
1235 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1238 server1 = self.pg0.remote_hosts[0]
1239 server2 = self.pg0.remote_hosts[1]
1241 locals = [{'addr': server1.ip4n,
1244 {'addr': server2.ip4n,
1248 self.nat44_add_address(self.nat_addr)
1249 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1252 local_num=len(locals),
1254 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1255 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1258 # from client to service
1259 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1260 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1261 TCP(sport=12345, dport=external_port))
1262 self.pg1.add_stream(p)
1263 self.pg_enable_capture(self.pg_interfaces)
1265 capture = self.pg0.get_capture(1)
1271 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1272 if ip.dst == server1.ip4:
1276 self.assertEqual(tcp.dport, local_port)
1277 self.check_tcp_checksum(p)
1278 self.check_ip_checksum(p)
1280 self.logger.error(ppp("Unexpected or invalid packet:", p))
1283 # from service back to client
1284 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1285 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1286 TCP(sport=local_port, dport=12345))
1287 self.pg0.add_stream(p)
1288 self.pg_enable_capture(self.pg_interfaces)
1290 capture = self.pg1.get_capture(1)
1295 self.assertEqual(ip.src, self.nat_addr)
1296 self.assertEqual(tcp.sport, external_port)
1297 self.check_tcp_checksum(p)
1298 self.check_ip_checksum(p)
1300 self.logger.error(ppp("Unexpected or invalid packet:", p))
1306 clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1308 for client in clients:
1309 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1310 IP(src=client, dst=self.nat_addr) /
1311 TCP(sport=12345, dport=external_port))
1313 self.pg1.add_stream(pkts)
1314 self.pg_enable_capture(self.pg_interfaces)
1316 capture = self.pg0.get_capture(len(pkts))
1318 if p[IP].dst == server1.ip4:
1322 self.assertTrue(server1_n > server2_n)
1324 def test_multiple_inside_interfaces(self):
1325 """ NAT44 multiple non-overlapping address space inside interfaces """
1327 self.nat44_add_address(self.nat_addr)
1328 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1329 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1330 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1333 # between two NAT44 inside interfaces (no translation)
1334 pkts = self.create_stream_in(self.pg0, self.pg1)
1335 self.pg0.add_stream(pkts)
1336 self.pg_enable_capture(self.pg_interfaces)
1338 capture = self.pg1.get_capture(len(pkts))
1339 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1341 # from NAT44 inside to interface without NAT44 feature (no translation)
1342 pkts = self.create_stream_in(self.pg0, self.pg2)
1343 self.pg0.add_stream(pkts)
1344 self.pg_enable_capture(self.pg_interfaces)
1346 capture = self.pg2.get_capture(len(pkts))
1347 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1349 # in2out 1st interface
1350 pkts = self.create_stream_in(self.pg0, self.pg3)
1351 self.pg0.add_stream(pkts)
1352 self.pg_enable_capture(self.pg_interfaces)
1354 capture = self.pg3.get_capture(len(pkts))
1355 self.verify_capture_out(capture)
1357 # out2in 1st interface
1358 pkts = self.create_stream_out(self.pg3)
1359 self.pg3.add_stream(pkts)
1360 self.pg_enable_capture(self.pg_interfaces)
1362 capture = self.pg0.get_capture(len(pkts))
1363 self.verify_capture_in(capture, self.pg0)
1365 # in2out 2nd interface
1366 pkts = self.create_stream_in(self.pg1, self.pg3)
1367 self.pg1.add_stream(pkts)
1368 self.pg_enable_capture(self.pg_interfaces)
1370 capture = self.pg3.get_capture(len(pkts))
1371 self.verify_capture_out(capture)
1373 # out2in 2nd interface
1374 pkts = self.create_stream_out(self.pg3)
1375 self.pg3.add_stream(pkts)
1376 self.pg_enable_capture(self.pg_interfaces)
1378 capture = self.pg1.get_capture(len(pkts))
1379 self.verify_capture_in(capture, self.pg1)
1381 def test_inside_overlapping_interfaces(self):
1382 """ NAT44 multiple inside interfaces with overlapping address space """
1384 static_nat_ip = "10.0.0.10"
1385 self.nat44_add_address(self.nat_addr)
1386 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1388 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1389 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1390 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1391 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1394 # between NAT44 inside interfaces with same VRF (no translation)
1395 pkts = self.create_stream_in(self.pg4, self.pg5)
1396 self.pg4.add_stream(pkts)
1397 self.pg_enable_capture(self.pg_interfaces)
1399 capture = self.pg5.get_capture(len(pkts))
1400 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1402 # between NAT44 inside interfaces with different VRF (hairpinning)
1403 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1404 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1405 TCP(sport=1234, dport=5678))
1406 self.pg4.add_stream(p)
1407 self.pg_enable_capture(self.pg_interfaces)
1409 capture = self.pg6.get_capture(1)
1414 self.assertEqual(ip.src, self.nat_addr)
1415 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1416 self.assertNotEqual(tcp.sport, 1234)
1417 self.assertEqual(tcp.dport, 5678)
1419 self.logger.error(ppp("Unexpected or invalid packet:", p))
1422 # in2out 1st interface
1423 pkts = self.create_stream_in(self.pg4, self.pg3)
1424 self.pg4.add_stream(pkts)
1425 self.pg_enable_capture(self.pg_interfaces)
1427 capture = self.pg3.get_capture(len(pkts))
1428 self.verify_capture_out(capture)
1430 # out2in 1st interface
1431 pkts = self.create_stream_out(self.pg3)
1432 self.pg3.add_stream(pkts)
1433 self.pg_enable_capture(self.pg_interfaces)
1435 capture = self.pg4.get_capture(len(pkts))
1436 self.verify_capture_in(capture, self.pg4)
1438 # in2out 2nd interface
1439 pkts = self.create_stream_in(self.pg5, self.pg3)
1440 self.pg5.add_stream(pkts)
1441 self.pg_enable_capture(self.pg_interfaces)
1443 capture = self.pg3.get_capture(len(pkts))
1444 self.verify_capture_out(capture)
1446 # out2in 2nd interface
1447 pkts = self.create_stream_out(self.pg3)
1448 self.pg3.add_stream(pkts)
1449 self.pg_enable_capture(self.pg_interfaces)
1451 capture = self.pg5.get_capture(len(pkts))
1452 self.verify_capture_in(capture, self.pg5)
1455 addresses = self.vapi.nat44_address_dump()
1456 self.assertEqual(len(addresses), 1)
1457 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1458 self.assertEqual(len(sessions), 3)
1459 for session in sessions:
1460 self.assertFalse(session.is_static)
1461 self.assertEqual(session.inside_ip_address[0:4],
1462 self.pg5.remote_ip4n)
1463 self.assertEqual(session.outside_ip_address,
1464 addresses[0].ip_address)
1465 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1466 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1467 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1468 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1469 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1470 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1471 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1472 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1473 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1475 # in2out 3rd interface
1476 pkts = self.create_stream_in(self.pg6, self.pg3)
1477 self.pg6.add_stream(pkts)
1478 self.pg_enable_capture(self.pg_interfaces)
1480 capture = self.pg3.get_capture(len(pkts))
1481 self.verify_capture_out(capture, static_nat_ip, True)
1483 # out2in 3rd interface
1484 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1485 self.pg3.add_stream(pkts)
1486 self.pg_enable_capture(self.pg_interfaces)
1488 capture = self.pg6.get_capture(len(pkts))
1489 self.verify_capture_in(capture, self.pg6)
1491 # general user and session dump verifications
1492 users = self.vapi.nat44_user_dump()
1493 self.assertTrue(len(users) >= 3)
1494 addresses = self.vapi.nat44_address_dump()
1495 self.assertEqual(len(addresses), 1)
1497 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1499 for session in sessions:
1500 self.assertEqual(user.ip_address, session.inside_ip_address)
1501 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1502 self.assertTrue(session.protocol in
1503 [IP_PROTOS.tcp, IP_PROTOS.udp,
1507 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1508 self.assertTrue(len(sessions) >= 4)
1509 for session in sessions:
1510 self.assertFalse(session.is_static)
1511 self.assertEqual(session.inside_ip_address[0:4],
1512 self.pg4.remote_ip4n)
1513 self.assertEqual(session.outside_ip_address,
1514 addresses[0].ip_address)
1517 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1518 self.assertTrue(len(sessions) >= 3)
1519 for session in sessions:
1520 self.assertTrue(session.is_static)
1521 self.assertEqual(session.inside_ip_address[0:4],
1522 self.pg6.remote_ip4n)
1523 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1524 map(int, static_nat_ip.split('.')))
1525 self.assertTrue(session.inside_port in
1526 [self.tcp_port_in, self.udp_port_in,
1529 def test_hairpinning(self):
1530 """ NAT44 hairpinning - 1:1 NAPT """
1532 host = self.pg0.remote_hosts[0]
1533 server = self.pg0.remote_hosts[1]
1536 server_in_port = 5678
1537 server_out_port = 8765
1539 self.nat44_add_address(self.nat_addr)
1540 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1541 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1543 # add static mapping for server
1544 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1545 server_in_port, server_out_port,
1546 proto=IP_PROTOS.tcp)
1548 # send packet from host to server
1549 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1550 IP(src=host.ip4, dst=self.nat_addr) /
1551 TCP(sport=host_in_port, dport=server_out_port))
1552 self.pg0.add_stream(p)
1553 self.pg_enable_capture(self.pg_interfaces)
1555 capture = self.pg0.get_capture(1)
1560 self.assertEqual(ip.src, self.nat_addr)
1561 self.assertEqual(ip.dst, server.ip4)
1562 self.assertNotEqual(tcp.sport, host_in_port)
1563 self.assertEqual(tcp.dport, server_in_port)
1564 self.check_tcp_checksum(p)
1565 host_out_port = tcp.sport
1567 self.logger.error(ppp("Unexpected or invalid packet:", p))
1570 # send reply from server to host
1571 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1572 IP(src=server.ip4, dst=self.nat_addr) /
1573 TCP(sport=server_in_port, dport=host_out_port))
1574 self.pg0.add_stream(p)
1575 self.pg_enable_capture(self.pg_interfaces)
1577 capture = self.pg0.get_capture(1)
1582 self.assertEqual(ip.src, self.nat_addr)
1583 self.assertEqual(ip.dst, host.ip4)
1584 self.assertEqual(tcp.sport, server_out_port)
1585 self.assertEqual(tcp.dport, host_in_port)
1586 self.check_tcp_checksum(p)
1588 self.logger.error(ppp("Unexpected or invalid packet:"), p)
1591 def test_hairpinning2(self):
1592 """ NAT44 hairpinning - 1:1 NAT"""
1594 server1_nat_ip = "10.0.0.10"
1595 server2_nat_ip = "10.0.0.11"
1596 host = self.pg0.remote_hosts[0]
1597 server1 = self.pg0.remote_hosts[1]
1598 server2 = self.pg0.remote_hosts[2]
1599 server_tcp_port = 22
1600 server_udp_port = 20
1602 self.nat44_add_address(self.nat_addr)
1603 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1604 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1607 # add static mapping for servers
1608 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1609 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1613 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1614 IP(src=host.ip4, dst=server1_nat_ip) /
1615 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1617 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1618 IP(src=host.ip4, dst=server1_nat_ip) /
1619 UDP(sport=self.udp_port_in, dport=server_udp_port))
1621 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1622 IP(src=host.ip4, dst=server1_nat_ip) /
1623 ICMP(id=self.icmp_id_in, type='echo-request'))
1625 self.pg0.add_stream(pkts)
1626 self.pg_enable_capture(self.pg_interfaces)
1628 capture = self.pg0.get_capture(len(pkts))
1629 for packet in capture:
1631 self.assertEqual(packet[IP].src, self.nat_addr)
1632 self.assertEqual(packet[IP].dst, server1.ip4)
1633 if packet.haslayer(TCP):
1634 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1635 self.assertEqual(packet[TCP].dport, server_tcp_port)
1636 self.tcp_port_out = packet[TCP].sport
1637 self.check_tcp_checksum(packet)
1638 elif packet.haslayer(UDP):
1639 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1640 self.assertEqual(packet[UDP].dport, server_udp_port)
1641 self.udp_port_out = packet[UDP].sport
1643 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1644 self.icmp_id_out = packet[ICMP].id
1646 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1651 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1652 IP(src=server1.ip4, dst=self.nat_addr) /
1653 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1655 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1656 IP(src=server1.ip4, dst=self.nat_addr) /
1657 UDP(sport=server_udp_port, dport=self.udp_port_out))
1659 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1660 IP(src=server1.ip4, dst=self.nat_addr) /
1661 ICMP(id=self.icmp_id_out, type='echo-reply'))
1663 self.pg0.add_stream(pkts)
1664 self.pg_enable_capture(self.pg_interfaces)
1666 capture = self.pg0.get_capture(len(pkts))
1667 for packet in capture:
1669 self.assertEqual(packet[IP].src, server1_nat_ip)
1670 self.assertEqual(packet[IP].dst, host.ip4)
1671 if packet.haslayer(TCP):
1672 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1673 self.assertEqual(packet[TCP].sport, server_tcp_port)
1674 self.check_tcp_checksum(packet)
1675 elif packet.haslayer(UDP):
1676 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1677 self.assertEqual(packet[UDP].sport, server_udp_port)
1679 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1681 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1684 # server2 to server1
1686 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1687 IP(src=server2.ip4, dst=server1_nat_ip) /
1688 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1690 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1691 IP(src=server2.ip4, dst=server1_nat_ip) /
1692 UDP(sport=self.udp_port_in, dport=server_udp_port))
1694 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1695 IP(src=server2.ip4, dst=server1_nat_ip) /
1696 ICMP(id=self.icmp_id_in, type='echo-request'))
1698 self.pg0.add_stream(pkts)
1699 self.pg_enable_capture(self.pg_interfaces)
1701 capture = self.pg0.get_capture(len(pkts))
1702 for packet in capture:
1704 self.assertEqual(packet[IP].src, server2_nat_ip)
1705 self.assertEqual(packet[IP].dst, server1.ip4)
1706 if packet.haslayer(TCP):
1707 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1708 self.assertEqual(packet[TCP].dport, server_tcp_port)
1709 self.tcp_port_out = packet[TCP].sport
1710 self.check_tcp_checksum(packet)
1711 elif packet.haslayer(UDP):
1712 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1713 self.assertEqual(packet[UDP].dport, server_udp_port)
1714 self.udp_port_out = packet[UDP].sport
1716 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1717 self.icmp_id_out = packet[ICMP].id
1719 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1722 # server1 to server2
1724 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1725 IP(src=server1.ip4, dst=server2_nat_ip) /
1726 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1728 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1729 IP(src=server1.ip4, dst=server2_nat_ip) /
1730 UDP(sport=server_udp_port, dport=self.udp_port_out))
1732 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1733 IP(src=server1.ip4, dst=server2_nat_ip) /
1734 ICMP(id=self.icmp_id_out, type='echo-reply'))
1736 self.pg0.add_stream(pkts)
1737 self.pg_enable_capture(self.pg_interfaces)
1739 capture = self.pg0.get_capture(len(pkts))
1740 for packet in capture:
1742 self.assertEqual(packet[IP].src, server1_nat_ip)
1743 self.assertEqual(packet[IP].dst, server2.ip4)
1744 if packet.haslayer(TCP):
1745 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1746 self.assertEqual(packet[TCP].sport, server_tcp_port)
1747 self.check_tcp_checksum(packet)
1748 elif packet.haslayer(UDP):
1749 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1750 self.assertEqual(packet[UDP].sport, server_udp_port)
1752 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1754 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1757 def test_max_translations_per_user(self):
1758 """ MAX translations per user - recycle the least recently used """
1760 self.nat44_add_address(self.nat_addr)
1761 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1762 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1765 # get maximum number of translations per user
1766 nat44_config = self.vapi.nat_show_config()
1768 # send more than maximum number of translations per user packets
1769 pkts_num = nat44_config.max_translations_per_user + 5
1771 for port in range(0, pkts_num):
1772 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1773 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1774 TCP(sport=1025 + port))
1776 self.pg0.add_stream(pkts)
1777 self.pg_enable_capture(self.pg_interfaces)
1780 # verify number of translated packet
1781 self.pg1.get_capture(pkts_num)
1783 def test_interface_addr(self):
1784 """ Acquire NAT44 addresses from interface """
1785 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1787 # no address in NAT pool
1788 adresses = self.vapi.nat44_address_dump()
1789 self.assertEqual(0, len(adresses))
1791 # configure interface address and check NAT address pool
1792 self.pg7.config_ip4()
1793 adresses = self.vapi.nat44_address_dump()
1794 self.assertEqual(1, len(adresses))
1795 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1797 # remove interface address and check NAT address pool
1798 self.pg7.unconfig_ip4()
1799 adresses = self.vapi.nat44_address_dump()
1800 self.assertEqual(0, len(adresses))
1802 def test_interface_addr_static_mapping(self):
1803 """ Static mapping with addresses from interface """
1804 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1805 self.nat44_add_static_mapping(
1807 external_sw_if_index=self.pg7.sw_if_index)
1809 # static mappings with external interface
1810 static_mappings = self.vapi.nat44_static_mapping_dump()
1811 self.assertEqual(1, len(static_mappings))
1812 self.assertEqual(self.pg7.sw_if_index,
1813 static_mappings[0].external_sw_if_index)
1815 # configure interface address and check static mappings
1816 self.pg7.config_ip4()
1817 static_mappings = self.vapi.nat44_static_mapping_dump()
1818 self.assertEqual(1, len(static_mappings))
1819 self.assertEqual(static_mappings[0].external_ip_address[0:4],
1820 self.pg7.local_ip4n)
1821 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1823 # remove interface address and check static mappings
1824 self.pg7.unconfig_ip4()
1825 static_mappings = self.vapi.nat44_static_mapping_dump()
1826 self.assertEqual(0, len(static_mappings))
1828 def test_interface_addr_identity_nat(self):
1829 """ Identity NAT with addresses from interface """
1832 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1833 self.vapi.nat44_add_del_identity_mapping(
1834 sw_if_index=self.pg7.sw_if_index,
1836 protocol=IP_PROTOS.tcp,
1839 # identity mappings with external interface
1840 identity_mappings = self.vapi.nat44_identity_mapping_dump()
1841 self.assertEqual(1, len(identity_mappings))
1842 self.assertEqual(self.pg7.sw_if_index,
1843 identity_mappings[0].sw_if_index)
1845 # configure interface address and check identity mappings
1846 self.pg7.config_ip4()
1847 identity_mappings = self.vapi.nat44_identity_mapping_dump()
1848 self.assertEqual(1, len(identity_mappings))
1849 self.assertEqual(identity_mappings[0].ip_address,
1850 self.pg7.local_ip4n)
1851 self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
1852 self.assertEqual(port, identity_mappings[0].port)
1853 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
1855 # remove interface address and check identity mappings
1856 self.pg7.unconfig_ip4()
1857 identity_mappings = self.vapi.nat44_identity_mapping_dump()
1858 self.assertEqual(0, len(identity_mappings))
1860 def test_ipfix_nat44_sess(self):
1861 """ IPFIX logging NAT44 session created/delted """
1862 self.ipfix_domain_id = 10
1863 self.ipfix_src_port = 20202
1864 colector_port = 30303
1865 bind_layers(UDP, IPFIX, dport=30303)
1866 self.nat44_add_address(self.nat_addr)
1867 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1868 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1870 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1871 src_address=self.pg3.local_ip4n,
1873 template_interval=10,
1874 collector_port=colector_port)
1875 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1876 src_port=self.ipfix_src_port)
1878 pkts = self.create_stream_in(self.pg0, self.pg1)
1879 self.pg0.add_stream(pkts)
1880 self.pg_enable_capture(self.pg_interfaces)
1882 capture = self.pg1.get_capture(len(pkts))
1883 self.verify_capture_out(capture)
1884 self.nat44_add_address(self.nat_addr, is_add=0)
1885 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1886 capture = self.pg3.get_capture(3)
1887 ipfix = IPFIXDecoder()
1888 # first load template
1890 self.assertTrue(p.haslayer(IPFIX))
1891 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1892 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1893 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1894 self.assertEqual(p[UDP].dport, colector_port)
1895 self.assertEqual(p[IPFIX].observationDomainID,
1896 self.ipfix_domain_id)
1897 if p.haslayer(Template):
1898 ipfix.add_template(p.getlayer(Template))
1899 # verify events in data set
1901 if p.haslayer(Data):
1902 data = ipfix.decode_data_set(p.getlayer(Set))
1903 self.verify_ipfix_nat44_ses(data)
1905 def test_ipfix_addr_exhausted(self):
1906 """ IPFIX logging NAT addresses exhausted """
1907 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1908 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1910 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1911 src_address=self.pg3.local_ip4n,
1913 template_interval=10)
1914 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1915 src_port=self.ipfix_src_port)
1917 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1918 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1920 self.pg0.add_stream(p)
1921 self.pg_enable_capture(self.pg_interfaces)
1923 capture = self.pg1.get_capture(0)
1924 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1925 capture = self.pg3.get_capture(3)
1926 ipfix = IPFIXDecoder()
1927 # first load template
1929 self.assertTrue(p.haslayer(IPFIX))
1930 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1931 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1932 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1933 self.assertEqual(p[UDP].dport, 4739)
1934 self.assertEqual(p[IPFIX].observationDomainID,
1935 self.ipfix_domain_id)
1936 if p.haslayer(Template):
1937 ipfix.add_template(p.getlayer(Template))
1938 # verify events in data set
1940 if p.haslayer(Data):
1941 data = ipfix.decode_data_set(p.getlayer(Set))
1942 self.verify_ipfix_addr_exhausted(data)
1944 def test_pool_addr_fib(self):
1945 """ NAT44 add pool addresses to FIB """
1946 static_addr = '10.0.0.10'
1947 self.nat44_add_address(self.nat_addr)
1948 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1949 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1951 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
1954 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1955 ARP(op=ARP.who_has, pdst=self.nat_addr,
1956 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1957 self.pg1.add_stream(p)
1958 self.pg_enable_capture(self.pg_interfaces)
1960 capture = self.pg1.get_capture(1)
1961 self.assertTrue(capture[0].haslayer(ARP))
1962 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1965 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1966 ARP(op=ARP.who_has, pdst=static_addr,
1967 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1968 self.pg1.add_stream(p)
1969 self.pg_enable_capture(self.pg_interfaces)
1971 capture = self.pg1.get_capture(1)
1972 self.assertTrue(capture[0].haslayer(ARP))
1973 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1975 # send ARP to non-NAT44 interface
1976 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1977 ARP(op=ARP.who_has, pdst=self.nat_addr,
1978 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1979 self.pg2.add_stream(p)
1980 self.pg_enable_capture(self.pg_interfaces)
1982 capture = self.pg1.get_capture(0)
1984 # remove addresses and verify
1985 self.nat44_add_address(self.nat_addr, is_add=0)
1986 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
1989 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1990 ARP(op=ARP.who_has, pdst=self.nat_addr,
1991 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1992 self.pg1.add_stream(p)
1993 self.pg_enable_capture(self.pg_interfaces)
1995 capture = self.pg1.get_capture(0)
1997 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1998 ARP(op=ARP.who_has, pdst=static_addr,
1999 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2000 self.pg1.add_stream(p)
2001 self.pg_enable_capture(self.pg_interfaces)
2003 capture = self.pg1.get_capture(0)
2005 def test_vrf_mode(self):
2006 """ NAT44 tenant VRF aware address pool mode """
2010 nat_ip1 = "10.0.0.10"
2011 nat_ip2 = "10.0.0.11"
2013 self.pg0.unconfig_ip4()
2014 self.pg1.unconfig_ip4()
2015 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2016 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2017 self.pg0.set_table_ip4(vrf_id1)
2018 self.pg1.set_table_ip4(vrf_id2)
2019 self.pg0.config_ip4()
2020 self.pg1.config_ip4()
2022 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2023 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2024 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2025 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2026 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2030 pkts = self.create_stream_in(self.pg0, self.pg2)
2031 self.pg0.add_stream(pkts)
2032 self.pg_enable_capture(self.pg_interfaces)
2034 capture = self.pg2.get_capture(len(pkts))
2035 self.verify_capture_out(capture, nat_ip1)
2038 pkts = self.create_stream_in(self.pg1, self.pg2)
2039 self.pg1.add_stream(pkts)
2040 self.pg_enable_capture(self.pg_interfaces)
2042 capture = self.pg2.get_capture(len(pkts))
2043 self.verify_capture_out(capture, nat_ip2)
2045 self.pg0.unconfig_ip4()
2046 self.pg1.unconfig_ip4()
2047 self.pg0.set_table_ip4(0)
2048 self.pg1.set_table_ip4(0)
2049 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2050 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2052 def test_vrf_feature_independent(self):
2053 """ NAT44 tenant VRF independent address pool mode """
2055 nat_ip1 = "10.0.0.10"
2056 nat_ip2 = "10.0.0.11"
2058 self.nat44_add_address(nat_ip1)
2059 self.nat44_add_address(nat_ip2)
2060 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2061 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2062 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2066 pkts = self.create_stream_in(self.pg0, self.pg2)
2067 self.pg0.add_stream(pkts)
2068 self.pg_enable_capture(self.pg_interfaces)
2070 capture = self.pg2.get_capture(len(pkts))
2071 self.verify_capture_out(capture, nat_ip1)
2074 pkts = self.create_stream_in(self.pg1, self.pg2)
2075 self.pg1.add_stream(pkts)
2076 self.pg_enable_capture(self.pg_interfaces)
2078 capture = self.pg2.get_capture(len(pkts))
2079 self.verify_capture_out(capture, nat_ip1)
2081 def test_dynamic_ipless_interfaces(self):
2082 """ NAT44 interfaces without configured IP address """
2084 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2085 mactobinary(self.pg7.remote_mac),
2086 self.pg7.remote_ip4n,
2088 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2089 mactobinary(self.pg8.remote_mac),
2090 self.pg8.remote_ip4n,
2093 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2094 dst_address_length=32,
2095 next_hop_address=self.pg7.remote_ip4n,
2096 next_hop_sw_if_index=self.pg7.sw_if_index)
2097 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2098 dst_address_length=32,
2099 next_hop_address=self.pg8.remote_ip4n,
2100 next_hop_sw_if_index=self.pg8.sw_if_index)
2102 self.nat44_add_address(self.nat_addr)
2103 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2104 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2108 pkts = self.create_stream_in(self.pg7, self.pg8)
2109 self.pg7.add_stream(pkts)
2110 self.pg_enable_capture(self.pg_interfaces)
2112 capture = self.pg8.get_capture(len(pkts))
2113 self.verify_capture_out(capture)
2116 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2117 self.pg8.add_stream(pkts)
2118 self.pg_enable_capture(self.pg_interfaces)
2120 capture = self.pg7.get_capture(len(pkts))
2121 self.verify_capture_in(capture, self.pg7)
2123 def test_static_ipless_interfaces(self):
2124 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2126 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2127 mactobinary(self.pg7.remote_mac),
2128 self.pg7.remote_ip4n,
2130 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2131 mactobinary(self.pg8.remote_mac),
2132 self.pg8.remote_ip4n,
2135 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2136 dst_address_length=32,
2137 next_hop_address=self.pg7.remote_ip4n,
2138 next_hop_sw_if_index=self.pg7.sw_if_index)
2139 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2140 dst_address_length=32,
2141 next_hop_address=self.pg8.remote_ip4n,
2142 next_hop_sw_if_index=self.pg8.sw_if_index)
2144 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2145 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2146 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2150 pkts = self.create_stream_out(self.pg8)
2151 self.pg8.add_stream(pkts)
2152 self.pg_enable_capture(self.pg_interfaces)
2154 capture = self.pg7.get_capture(len(pkts))
2155 self.verify_capture_in(capture, self.pg7)
2158 pkts = self.create_stream_in(self.pg7, self.pg8)
2159 self.pg7.add_stream(pkts)
2160 self.pg_enable_capture(self.pg_interfaces)
2162 capture = self.pg8.get_capture(len(pkts))
2163 self.verify_capture_out(capture, self.nat_addr, True)
2165 def test_static_with_port_ipless_interfaces(self):
2166 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2168 self.tcp_port_out = 30606
2169 self.udp_port_out = 30607
2170 self.icmp_id_out = 30608
2172 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2173 mactobinary(self.pg7.remote_mac),
2174 self.pg7.remote_ip4n,
2176 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2177 mactobinary(self.pg8.remote_mac),
2178 self.pg8.remote_ip4n,
2181 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2182 dst_address_length=32,
2183 next_hop_address=self.pg7.remote_ip4n,
2184 next_hop_sw_if_index=self.pg7.sw_if_index)
2185 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2186 dst_address_length=32,
2187 next_hop_address=self.pg8.remote_ip4n,
2188 next_hop_sw_if_index=self.pg8.sw_if_index)
2190 self.nat44_add_address(self.nat_addr)
2191 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2192 self.tcp_port_in, self.tcp_port_out,
2193 proto=IP_PROTOS.tcp)
2194 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2195 self.udp_port_in, self.udp_port_out,
2196 proto=IP_PROTOS.udp)
2197 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2198 self.icmp_id_in, self.icmp_id_out,
2199 proto=IP_PROTOS.icmp)
2200 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2201 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2205 pkts = self.create_stream_out(self.pg8)
2206 self.pg8.add_stream(pkts)
2207 self.pg_enable_capture(self.pg_interfaces)
2209 capture = self.pg7.get_capture(len(pkts))
2210 self.verify_capture_in(capture, self.pg7)
2213 pkts = self.create_stream_in(self.pg7, self.pg8)
2214 self.pg7.add_stream(pkts)
2215 self.pg_enable_capture(self.pg_interfaces)
2217 capture = self.pg8.get_capture(len(pkts))
2218 self.verify_capture_out(capture)
2220 def test_static_unknown_proto(self):
2221 """ 1:1 NAT translate packet with unknown protocol """
2222 nat_ip = "10.0.0.10"
2223 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2224 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2225 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2229 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2230 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2232 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2233 TCP(sport=1234, dport=1234))
2234 self.pg0.add_stream(p)
2235 self.pg_enable_capture(self.pg_interfaces)
2237 p = self.pg1.get_capture(1)
2240 self.assertEqual(packet[IP].src, nat_ip)
2241 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2242 self.assertTrue(packet.haslayer(GRE))
2243 self.check_ip_checksum(packet)
2245 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2249 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2250 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2252 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2253 TCP(sport=1234, dport=1234))
2254 self.pg1.add_stream(p)
2255 self.pg_enable_capture(self.pg_interfaces)
2257 p = self.pg0.get_capture(1)
2260 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2261 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2262 self.assertTrue(packet.haslayer(GRE))
2263 self.check_ip_checksum(packet)
2265 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2268 def test_hairpinning_static_unknown_proto(self):
2269 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2271 host = self.pg0.remote_hosts[0]
2272 server = self.pg0.remote_hosts[1]
2274 host_nat_ip = "10.0.0.10"
2275 server_nat_ip = "10.0.0.11"
2277 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2278 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2279 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2280 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2284 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2285 IP(src=host.ip4, dst=server_nat_ip) /
2287 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2288 TCP(sport=1234, dport=1234))
2289 self.pg0.add_stream(p)
2290 self.pg_enable_capture(self.pg_interfaces)
2292 p = self.pg0.get_capture(1)
2295 self.assertEqual(packet[IP].src, host_nat_ip)
2296 self.assertEqual(packet[IP].dst, server.ip4)
2297 self.assertTrue(packet.haslayer(GRE))
2298 self.check_ip_checksum(packet)
2300 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2304 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2305 IP(src=server.ip4, dst=host_nat_ip) /
2307 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2308 TCP(sport=1234, dport=1234))
2309 self.pg0.add_stream(p)
2310 self.pg_enable_capture(self.pg_interfaces)
2312 p = self.pg0.get_capture(1)
2315 self.assertEqual(packet[IP].src, server_nat_ip)
2316 self.assertEqual(packet[IP].dst, host.ip4)
2317 self.assertTrue(packet.haslayer(GRE))
2318 self.check_ip_checksum(packet)
2320 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2323 def test_unknown_proto(self):
2324 """ NAT44 translate packet with unknown protocol """
2325 self.nat44_add_address(self.nat_addr)
2326 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2327 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2331 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2332 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2333 TCP(sport=self.tcp_port_in, dport=20))
2334 self.pg0.add_stream(p)
2335 self.pg_enable_capture(self.pg_interfaces)
2337 p = self.pg1.get_capture(1)
2339 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2340 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2342 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2343 TCP(sport=1234, dport=1234))
2344 self.pg0.add_stream(p)
2345 self.pg_enable_capture(self.pg_interfaces)
2347 p = self.pg1.get_capture(1)
2350 self.assertEqual(packet[IP].src, self.nat_addr)
2351 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2352 self.assertTrue(packet.haslayer(GRE))
2353 self.check_ip_checksum(packet)
2355 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2359 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2360 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2362 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2363 TCP(sport=1234, dport=1234))
2364 self.pg1.add_stream(p)
2365 self.pg_enable_capture(self.pg_interfaces)
2367 p = self.pg0.get_capture(1)
2370 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2371 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2372 self.assertTrue(packet.haslayer(GRE))
2373 self.check_ip_checksum(packet)
2375 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2378 def test_hairpinning_unknown_proto(self):
2379 """ NAT44 translate packet with unknown protocol - hairpinning """
2380 host = self.pg0.remote_hosts[0]
2381 server = self.pg0.remote_hosts[1]
2384 server_in_port = 5678
2385 server_out_port = 8765
2386 server_nat_ip = "10.0.0.11"
2388 self.nat44_add_address(self.nat_addr)
2389 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2390 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2393 # add static mapping for server
2394 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2397 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2398 IP(src=host.ip4, dst=server_nat_ip) /
2399 TCP(sport=host_in_port, dport=server_out_port))
2400 self.pg0.add_stream(p)
2401 self.pg_enable_capture(self.pg_interfaces)
2403 capture = self.pg0.get_capture(1)
2405 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2406 IP(src=host.ip4, dst=server_nat_ip) /
2408 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2409 TCP(sport=1234, dport=1234))
2410 self.pg0.add_stream(p)
2411 self.pg_enable_capture(self.pg_interfaces)
2413 p = self.pg0.get_capture(1)
2416 self.assertEqual(packet[IP].src, self.nat_addr)
2417 self.assertEqual(packet[IP].dst, server.ip4)
2418 self.assertTrue(packet.haslayer(GRE))
2419 self.check_ip_checksum(packet)
2421 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2425 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2426 IP(src=server.ip4, dst=self.nat_addr) /
2428 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2429 TCP(sport=1234, dport=1234))
2430 self.pg0.add_stream(p)
2431 self.pg_enable_capture(self.pg_interfaces)
2433 p = self.pg0.get_capture(1)
2436 self.assertEqual(packet[IP].src, server_nat_ip)
2437 self.assertEqual(packet[IP].dst, host.ip4)
2438 self.assertTrue(packet.haslayer(GRE))
2439 self.check_ip_checksum(packet)
2441 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2444 def test_output_feature(self):
2445 """ NAT44 interface output feature (in2out postrouting) """
2446 self.nat44_add_address(self.nat_addr)
2447 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2448 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2449 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2453 pkts = self.create_stream_in(self.pg0, self.pg3)
2454 self.pg0.add_stream(pkts)
2455 self.pg_enable_capture(self.pg_interfaces)
2457 capture = self.pg3.get_capture(len(pkts))
2458 self.verify_capture_out(capture)
2461 pkts = self.create_stream_out(self.pg3)
2462 self.pg3.add_stream(pkts)
2463 self.pg_enable_capture(self.pg_interfaces)
2465 capture = self.pg0.get_capture(len(pkts))
2466 self.verify_capture_in(capture, self.pg0)
2468 # from non-NAT interface to NAT inside interface
2469 pkts = self.create_stream_in(self.pg2, self.pg0)
2470 self.pg2.add_stream(pkts)
2471 self.pg_enable_capture(self.pg_interfaces)
2473 capture = self.pg0.get_capture(len(pkts))
2474 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2476 def test_output_feature_vrf_aware(self):
2477 """ NAT44 interface output feature VRF aware (in2out postrouting) """
2478 nat_ip_vrf10 = "10.0.0.10"
2479 nat_ip_vrf20 = "10.0.0.20"
2481 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2482 dst_address_length=32,
2483 next_hop_address=self.pg3.remote_ip4n,
2484 next_hop_sw_if_index=self.pg3.sw_if_index,
2486 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2487 dst_address_length=32,
2488 next_hop_address=self.pg3.remote_ip4n,
2489 next_hop_sw_if_index=self.pg3.sw_if_index,
2492 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2493 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2494 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2495 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2496 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2500 pkts = self.create_stream_in(self.pg4, self.pg3)
2501 self.pg4.add_stream(pkts)
2502 self.pg_enable_capture(self.pg_interfaces)
2504 capture = self.pg3.get_capture(len(pkts))
2505 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2508 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2509 self.pg3.add_stream(pkts)
2510 self.pg_enable_capture(self.pg_interfaces)
2512 capture = self.pg4.get_capture(len(pkts))
2513 self.verify_capture_in(capture, self.pg4)
2516 pkts = self.create_stream_in(self.pg6, self.pg3)
2517 self.pg6.add_stream(pkts)
2518 self.pg_enable_capture(self.pg_interfaces)
2520 capture = self.pg3.get_capture(len(pkts))
2521 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2524 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2525 self.pg3.add_stream(pkts)
2526 self.pg_enable_capture(self.pg_interfaces)
2528 capture = self.pg6.get_capture(len(pkts))
2529 self.verify_capture_in(capture, self.pg6)
2531 def test_output_feature_hairpinning(self):
2532 """ NAT44 interface output feature hairpinning (in2out postrouting) """
2533 host = self.pg0.remote_hosts[0]
2534 server = self.pg0.remote_hosts[1]
2537 server_in_port = 5678
2538 server_out_port = 8765
2540 self.nat44_add_address(self.nat_addr)
2541 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2542 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2545 # add static mapping for server
2546 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2547 server_in_port, server_out_port,
2548 proto=IP_PROTOS.tcp)
2550 # send packet from host to server
2551 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2552 IP(src=host.ip4, dst=self.nat_addr) /
2553 TCP(sport=host_in_port, dport=server_out_port))
2554 self.pg0.add_stream(p)
2555 self.pg_enable_capture(self.pg_interfaces)
2557 capture = self.pg0.get_capture(1)
2562 self.assertEqual(ip.src, self.nat_addr)
2563 self.assertEqual(ip.dst, server.ip4)
2564 self.assertNotEqual(tcp.sport, host_in_port)
2565 self.assertEqual(tcp.dport, server_in_port)
2566 self.check_tcp_checksum(p)
2567 host_out_port = tcp.sport
2569 self.logger.error(ppp("Unexpected or invalid packet:", p))
2572 # send reply from server to host
2573 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2574 IP(src=server.ip4, dst=self.nat_addr) /
2575 TCP(sport=server_in_port, dport=host_out_port))
2576 self.pg0.add_stream(p)
2577 self.pg_enable_capture(self.pg_interfaces)
2579 capture = self.pg0.get_capture(1)
2584 self.assertEqual(ip.src, self.nat_addr)
2585 self.assertEqual(ip.dst, host.ip4)
2586 self.assertEqual(tcp.sport, server_out_port)
2587 self.assertEqual(tcp.dport, host_in_port)
2588 self.check_tcp_checksum(p)
2590 self.logger.error(ppp("Unexpected or invalid packet:"), p)
2593 def test_one_armed_nat44(self):
2594 """ One armed NAT44 """
2595 remote_host = self.pg9.remote_hosts[0]
2596 local_host = self.pg9.remote_hosts[1]
2599 self.nat44_add_address(self.nat_addr)
2600 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2601 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2605 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2606 IP(src=local_host.ip4, dst=remote_host.ip4) /
2607 TCP(sport=12345, dport=80))
2608 self.pg9.add_stream(p)
2609 self.pg_enable_capture(self.pg_interfaces)
2611 capture = self.pg9.get_capture(1)
2616 self.assertEqual(ip.src, self.nat_addr)
2617 self.assertEqual(ip.dst, remote_host.ip4)
2618 self.assertNotEqual(tcp.sport, 12345)
2619 external_port = tcp.sport
2620 self.assertEqual(tcp.dport, 80)
2621 self.check_tcp_checksum(p)
2622 self.check_ip_checksum(p)
2624 self.logger.error(ppp("Unexpected or invalid packet:", p))
2628 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2629 IP(src=remote_host.ip4, dst=self.nat_addr) /
2630 TCP(sport=80, dport=external_port))
2631 self.pg9.add_stream(p)
2632 self.pg_enable_capture(self.pg_interfaces)
2634 capture = self.pg9.get_capture(1)
2639 self.assertEqual(ip.src, remote_host.ip4)
2640 self.assertEqual(ip.dst, local_host.ip4)
2641 self.assertEqual(tcp.sport, 80)
2642 self.assertEqual(tcp.dport, 12345)
2643 self.check_tcp_checksum(p)
2644 self.check_ip_checksum(p)
2646 self.logger.error(ppp("Unexpected or invalid packet:", p))
2649 def test_del_session(self):
2650 """ Delete NAT44 session """
2651 self.nat44_add_address(self.nat_addr)
2652 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2653 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2656 pkts = self.create_stream_in(self.pg0, self.pg1)
2657 self.pg0.add_stream(pkts)
2658 self.pg_enable_capture(self.pg_interfaces)
2660 capture = self.pg1.get_capture(len(pkts))
2662 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2663 nsessions = len(sessions)
2665 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2666 sessions[0].inside_port,
2667 sessions[0].protocol)
2668 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2669 sessions[1].outside_port,
2670 sessions[1].protocol,
2673 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2674 self.assertEqual(nsessions - len(sessions), 2)
2676 def test_set_get_reass(self):
2677 """ NAT44 set/get virtual fragmentation reassembly """
2678 reas_cfg1 = self.vapi.nat_get_reass()
2680 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
2681 max_reass=reas_cfg1.ip4_max_reass * 2,
2682 max_frag=reas_cfg1.ip4_max_frag * 2)
2684 reas_cfg2 = self.vapi.nat_get_reass()
2686 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
2687 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
2688 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
2690 self.vapi.nat_set_reass(drop_frag=1)
2691 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
2693 def test_frag_in_order(self):
2694 """ NAT44 translate fragments arriving in order """
2695 self.nat44_add_address(self.nat_addr)
2696 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2697 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2700 data = "A" * 4 + "B" * 16 + "C" * 3
2701 self.tcp_port_in = random.randint(1025, 65535)
2703 reass = self.vapi.nat_reass_dump()
2704 reass_n_start = len(reass)
2707 pkts = self.create_stream_frag(self.pg0,
2708 self.pg1.remote_ip4,
2712 self.pg0.add_stream(pkts)
2713 self.pg_enable_capture(self.pg_interfaces)
2715 frags = self.pg1.get_capture(len(pkts))
2716 p = self.reass_frags_and_verify(frags,
2718 self.pg1.remote_ip4)
2719 self.assertEqual(p[TCP].dport, 20)
2720 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2721 self.tcp_port_out = p[TCP].sport
2722 self.assertEqual(data, p[Raw].load)
2725 pkts = self.create_stream_frag(self.pg1,
2730 self.pg1.add_stream(pkts)
2731 self.pg_enable_capture(self.pg_interfaces)
2733 frags = self.pg0.get_capture(len(pkts))
2734 p = self.reass_frags_and_verify(frags,
2735 self.pg1.remote_ip4,
2736 self.pg0.remote_ip4)
2737 self.assertEqual(p[TCP].sport, 20)
2738 self.assertEqual(p[TCP].dport, self.tcp_port_in)
2739 self.assertEqual(data, p[Raw].load)
2741 reass = self.vapi.nat_reass_dump()
2742 reass_n_end = len(reass)
2744 self.assertEqual(reass_n_end - reass_n_start, 2)
2746 def test_reass_hairpinning(self):
2747 """ NAT44 fragments hairpinning """
2748 host = self.pg0.remote_hosts[0]
2749 server = self.pg0.remote_hosts[1]
2750 host_in_port = random.randint(1025, 65535)
2752 server_in_port = random.randint(1025, 65535)
2753 server_out_port = random.randint(1025, 65535)
2754 data = "A" * 4 + "B" * 16 + "C" * 3
2756 self.nat44_add_address(self.nat_addr)
2757 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2758 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2760 # add static mapping for server
2761 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2762 server_in_port, server_out_port,
2763 proto=IP_PROTOS.tcp)
2765 # send packet from host to server
2766 pkts = self.create_stream_frag(self.pg0,
2771 self.pg0.add_stream(pkts)
2772 self.pg_enable_capture(self.pg_interfaces)
2774 frags = self.pg0.get_capture(len(pkts))
2775 p = self.reass_frags_and_verify(frags,
2778 self.assertNotEqual(p[TCP].sport, host_in_port)
2779 self.assertEqual(p[TCP].dport, server_in_port)
2780 self.assertEqual(data, p[Raw].load)
2782 def test_frag_out_of_order(self):
2783 """ NAT44 translate fragments arriving out of order """
2784 self.nat44_add_address(self.nat_addr)
2785 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2786 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2789 data = "A" * 4 + "B" * 16 + "C" * 3
2790 random.randint(1025, 65535)
2793 pkts = self.create_stream_frag(self.pg0,
2794 self.pg1.remote_ip4,
2799 self.pg0.add_stream(pkts)
2800 self.pg_enable_capture(self.pg_interfaces)
2802 frags = self.pg1.get_capture(len(pkts))
2803 p = self.reass_frags_and_verify(frags,
2805 self.pg1.remote_ip4)
2806 self.assertEqual(p[TCP].dport, 20)
2807 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2808 self.tcp_port_out = p[TCP].sport
2809 self.assertEqual(data, p[Raw].load)
2812 pkts = self.create_stream_frag(self.pg1,
2818 self.pg1.add_stream(pkts)
2819 self.pg_enable_capture(self.pg_interfaces)
2821 frags = self.pg0.get_capture(len(pkts))
2822 p = self.reass_frags_and_verify(frags,
2823 self.pg1.remote_ip4,
2824 self.pg0.remote_ip4)
2825 self.assertEqual(p[TCP].sport, 20)
2826 self.assertEqual(p[TCP].dport, self.tcp_port_in)
2827 self.assertEqual(data, p[Raw].load)
2829 def test_port_restricted(self):
2830 """ Port restricted NAT44 (MAP-E CE) """
2831 self.nat44_add_address(self.nat_addr)
2832 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2833 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2835 self.vapi.cli("nat44 addr-port-assignment-alg map-e psid 10 "
2836 "psid-offset 6 psid-len 6")
2838 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2839 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2840 TCP(sport=4567, dport=22))
2841 self.pg0.add_stream(p)
2842 self.pg_enable_capture(self.pg_interfaces)
2844 capture = self.pg1.get_capture(1)
2849 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2850 self.assertEqual(ip.src, self.nat_addr)
2851 self.assertEqual(tcp.dport, 22)
2852 self.assertNotEqual(tcp.sport, 4567)
2853 self.assertEqual((tcp.sport >> 6) & 63, 10)
2854 self.check_tcp_checksum(p)
2855 self.check_ip_checksum(p)
2857 self.logger.error(ppp("Unexpected or invalid packet:", p))
2861 super(TestNAT44, self).tearDown()
2862 if not self.vpp_dead:
2863 self.logger.info(self.vapi.cli("show nat44 verbose"))
2864 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
2865 self.vapi.cli("nat44 addr-port-assignment-alg default")
2869 class TestDeterministicNAT(MethodHolder):
2870 """ Deterministic NAT Test Cases """
2873 def setUpConstants(cls):
2874 super(TestDeterministicNAT, cls).setUpConstants()
2875 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
2878 def setUpClass(cls):
2879 super(TestDeterministicNAT, cls).setUpClass()
2882 cls.tcp_port_in = 6303
2883 cls.tcp_external_port = 6303
2884 cls.udp_port_in = 6304
2885 cls.udp_external_port = 6304
2886 cls.icmp_id_in = 6305
2887 cls.nat_addr = '10.0.0.3'
2889 cls.create_pg_interfaces(range(3))
2890 cls.interfaces = list(cls.pg_interfaces)
2892 for i in cls.interfaces:
2897 cls.pg0.generate_remote_hosts(2)
2898 cls.pg0.configure_ipv4_neighbors()
2901 super(TestDeterministicNAT, cls).tearDownClass()
2904 def create_stream_in(self, in_if, out_if, ttl=64):
2906 Create packet stream for inside network
2908 :param in_if: Inside interface
2909 :param out_if: Outside interface
2910 :param ttl: TTL of generated packets
2914 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2915 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2916 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
2920 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2921 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2922 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2926 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2927 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2928 ICMP(id=self.icmp_id_in, type='echo-request'))
2933 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2935 Create packet stream for outside network
2937 :param out_if: Outside interface
2938 :param dst_ip: Destination IP address (Default use global NAT address)
2939 :param ttl: TTL of generated packets
2942 dst_ip = self.nat_addr
2945 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2946 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2947 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2951 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2952 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2953 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2957 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2958 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2959 ICMP(id=self.icmp_external_id, type='echo-reply'))
2964 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2966 Verify captured packets on outside network
2968 :param capture: Captured packets
2969 :param nat_ip: Translated IP address (Default use global NAT address)
2970 :param same_port: Sorce port number is not translated (Default False)
2971 :param packet_num: Expected number of packets (Default 3)
2974 nat_ip = self.nat_addr
2975 self.assertEqual(packet_num, len(capture))
2976 for packet in capture:
2978 self.assertEqual(packet[IP].src, nat_ip)
2979 if packet.haslayer(TCP):
2980 self.tcp_port_out = packet[TCP].sport
2981 elif packet.haslayer(UDP):
2982 self.udp_port_out = packet[UDP].sport
2984 self.icmp_external_id = packet[ICMP].id
2986 self.logger.error(ppp("Unexpected or invalid packet "
2987 "(outside network):", packet))
2990 def initiate_tcp_session(self, in_if, out_if):
2992 Initiates TCP session
2994 :param in_if: Inside interface
2995 :param out_if: Outside interface
2998 # SYN packet in->out
2999 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3000 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3001 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3004 self.pg_enable_capture(self.pg_interfaces)
3006 capture = out_if.get_capture(1)
3008 self.tcp_port_out = p[TCP].sport
3010 # SYN + ACK packet out->in
3011 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
3012 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
3013 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3015 out_if.add_stream(p)
3016 self.pg_enable_capture(self.pg_interfaces)
3018 in_if.get_capture(1)
3020 # ACK packet in->out
3021 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3022 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3023 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3026 self.pg_enable_capture(self.pg_interfaces)
3028 out_if.get_capture(1)
3031 self.logger.error("TCP 3 way handshake failed")
3034 def verify_ipfix_max_entries_per_user(self, data):
3036 Verify IPFIX maximum entries per user exceeded event
3038 :param data: Decoded IPFIX data records
3040 self.assertEqual(1, len(data))
3043 self.assertEqual(ord(record[230]), 13)
3044 # natQuotaExceededEvent
3045 self.assertEqual('\x03\x00\x00\x00', record[466])
3047 self.assertEqual(self.pg0.remote_ip4n, record[8])
3049 def test_deterministic_mode(self):
3050 """ NAT plugin run deterministic mode """
3051 in_addr = '172.16.255.0'
3052 out_addr = '172.17.255.50'
3053 in_addr_t = '172.16.255.20'
3054 in_addr_n = socket.inet_aton(in_addr)
3055 out_addr_n = socket.inet_aton(out_addr)
3056 in_addr_t_n = socket.inet_aton(in_addr_t)
3060 nat_config = self.vapi.nat_show_config()
3061 self.assertEqual(1, nat_config.deterministic)
3063 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
3065 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
3066 self.assertEqual(rep1.out_addr[:4], out_addr_n)
3067 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
3068 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
3070 deterministic_mappings = self.vapi.nat_det_map_dump()
3071 self.assertEqual(len(deterministic_mappings), 1)
3072 dsm = deterministic_mappings[0]
3073 self.assertEqual(in_addr_n, dsm.in_addr[:4])
3074 self.assertEqual(in_plen, dsm.in_plen)
3075 self.assertEqual(out_addr_n, dsm.out_addr[:4])
3076 self.assertEqual(out_plen, dsm.out_plen)
3078 self.clear_nat_det()
3079 deterministic_mappings = self.vapi.nat_det_map_dump()
3080 self.assertEqual(len(deterministic_mappings), 0)
3082 def test_set_timeouts(self):
3083 """ Set deterministic NAT timeouts """
3084 timeouts_before = self.vapi.nat_det_get_timeouts()
3086 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
3087 timeouts_before.tcp_established + 10,
3088 timeouts_before.tcp_transitory + 10,
3089 timeouts_before.icmp + 10)
3091 timeouts_after = self.vapi.nat_det_get_timeouts()
3093 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
3094 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
3095 self.assertNotEqual(timeouts_before.tcp_established,
3096 timeouts_after.tcp_established)
3097 self.assertNotEqual(timeouts_before.tcp_transitory,
3098 timeouts_after.tcp_transitory)
3100 def test_det_in(self):
3101 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
3103 nat_ip = "10.0.0.10"
3105 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3107 socket.inet_aton(nat_ip),
3109 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3110 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3114 pkts = self.create_stream_in(self.pg0, self.pg1)
3115 self.pg0.add_stream(pkts)
3116 self.pg_enable_capture(self.pg_interfaces)
3118 capture = self.pg1.get_capture(len(pkts))
3119 self.verify_capture_out(capture, nat_ip)
3122 pkts = self.create_stream_out(self.pg1, nat_ip)
3123 self.pg1.add_stream(pkts)
3124 self.pg_enable_capture(self.pg_interfaces)
3126 capture = self.pg0.get_capture(len(pkts))
3127 self.verify_capture_in(capture, self.pg0)
3130 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
3131 self.assertEqual(len(sessions), 3)
3135 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3136 self.assertEqual(s.in_port, self.tcp_port_in)
3137 self.assertEqual(s.out_port, self.tcp_port_out)
3138 self.assertEqual(s.ext_port, self.tcp_external_port)
3142 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3143 self.assertEqual(s.in_port, self.udp_port_in)
3144 self.assertEqual(s.out_port, self.udp_port_out)
3145 self.assertEqual(s.ext_port, self.udp_external_port)
3149 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3150 self.assertEqual(s.in_port, self.icmp_id_in)
3151 self.assertEqual(s.out_port, self.icmp_external_id)
3153 def test_multiple_users(self):
3154 """ Deterministic NAT multiple users """
3156 nat_ip = "10.0.0.10"
3158 external_port = 6303
3160 host0 = self.pg0.remote_hosts[0]
3161 host1 = self.pg0.remote_hosts[1]
3163 self.vapi.nat_det_add_del_map(host0.ip4n,
3165 socket.inet_aton(nat_ip),
3167 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3168 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3172 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
3173 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
3174 TCP(sport=port_in, dport=external_port))
3175 self.pg0.add_stream(p)
3176 self.pg_enable_capture(self.pg_interfaces)
3178 capture = self.pg1.get_capture(1)
3183 self.assertEqual(ip.src, nat_ip)
3184 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3185 self.assertEqual(tcp.dport, external_port)
3186 port_out0 = tcp.sport
3188 self.logger.error(ppp("Unexpected or invalid packet:", p))
3192 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
3193 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
3194 TCP(sport=port_in, dport=external_port))
3195 self.pg0.add_stream(p)
3196 self.pg_enable_capture(self.pg_interfaces)
3198 capture = self.pg1.get_capture(1)
3203 self.assertEqual(ip.src, nat_ip)
3204 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3205 self.assertEqual(tcp.dport, external_port)
3206 port_out1 = tcp.sport
3208 self.logger.error(ppp("Unexpected or invalid packet:", p))
3211 dms = self.vapi.nat_det_map_dump()
3212 self.assertEqual(1, len(dms))
3213 self.assertEqual(2, dms[0].ses_num)
3216 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3217 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3218 TCP(sport=external_port, dport=port_out0))
3219 self.pg1.add_stream(p)
3220 self.pg_enable_capture(self.pg_interfaces)
3222 capture = self.pg0.get_capture(1)
3227 self.assertEqual(ip.src, self.pg1.remote_ip4)
3228 self.assertEqual(ip.dst, host0.ip4)
3229 self.assertEqual(tcp.dport, port_in)
3230 self.assertEqual(tcp.sport, external_port)
3232 self.logger.error(ppp("Unexpected or invalid packet:", p))
3236 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3237 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3238 TCP(sport=external_port, dport=port_out1))
3239 self.pg1.add_stream(p)
3240 self.pg_enable_capture(self.pg_interfaces)
3242 capture = self.pg0.get_capture(1)
3247 self.assertEqual(ip.src, self.pg1.remote_ip4)
3248 self.assertEqual(ip.dst, host1.ip4)
3249 self.assertEqual(tcp.dport, port_in)
3250 self.assertEqual(tcp.sport, external_port)
3252 self.logger.error(ppp("Unexpected or invalid packet", p))
3255 # session close api test
3256 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
3258 self.pg1.remote_ip4n,
3260 dms = self.vapi.nat_det_map_dump()
3261 self.assertEqual(dms[0].ses_num, 1)
3263 self.vapi.nat_det_close_session_in(host0.ip4n,
3265 self.pg1.remote_ip4n,
3267 dms = self.vapi.nat_det_map_dump()
3268 self.assertEqual(dms[0].ses_num, 0)
3270 def test_tcp_session_close_detection_in(self):
3271 """ Deterministic NAT TCP session close from inside network """
3272 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3274 socket.inet_aton(self.nat_addr),
3276 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3277 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3280 self.initiate_tcp_session(self.pg0, self.pg1)
3282 # close the session from inside
3284 # FIN packet in -> out
3285 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3286 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3287 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3289 self.pg0.add_stream(p)
3290 self.pg_enable_capture(self.pg_interfaces)
3292 self.pg1.get_capture(1)
3296 # ACK packet out -> in
3297 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3298 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3299 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3303 # FIN packet out -> in
3304 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3305 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3306 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3310 self.pg1.add_stream(pkts)
3311 self.pg_enable_capture(self.pg_interfaces)
3313 self.pg0.get_capture(2)
3315 # ACK packet in -> out
3316 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3317 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3318 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3320 self.pg0.add_stream(p)
3321 self.pg_enable_capture(self.pg_interfaces)
3323 self.pg1.get_capture(1)
3325 # Check if deterministic NAT44 closed the session
3326 dms = self.vapi.nat_det_map_dump()
3327 self.assertEqual(0, dms[0].ses_num)
3329 self.logger.error("TCP session termination failed")
3332 def test_tcp_session_close_detection_out(self):
3333 """ Deterministic NAT TCP session close from outside network """
3334 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3336 socket.inet_aton(self.nat_addr),
3338 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3339 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3342 self.initiate_tcp_session(self.pg0, self.pg1)
3344 # close the session from outside
3346 # FIN packet out -> in
3347 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3348 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3349 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3351 self.pg1.add_stream(p)
3352 self.pg_enable_capture(self.pg_interfaces)
3354 self.pg0.get_capture(1)
3358 # ACK packet in -> out
3359 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3360 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3361 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3365 # ACK packet in -> out
3366 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3367 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3368 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3372 self.pg0.add_stream(pkts)
3373 self.pg_enable_capture(self.pg_interfaces)
3375 self.pg1.get_capture(2)
3377 # ACK packet out -> in
3378 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3379 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3380 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3382 self.pg1.add_stream(p)
3383 self.pg_enable_capture(self.pg_interfaces)
3385 self.pg0.get_capture(1)
3387 # Check if deterministic NAT44 closed the session
3388 dms = self.vapi.nat_det_map_dump()
3389 self.assertEqual(0, dms[0].ses_num)
3391 self.logger.error("TCP session termination failed")
3394 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3395 def test_session_timeout(self):
3396 """ Deterministic NAT session timeouts """
3397 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3399 socket.inet_aton(self.nat_addr),
3401 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3402 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3405 self.initiate_tcp_session(self.pg0, self.pg1)
3406 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
3407 pkts = self.create_stream_in(self.pg0, self.pg1)
3408 self.pg0.add_stream(pkts)
3409 self.pg_enable_capture(self.pg_interfaces)
3411 capture = self.pg1.get_capture(len(pkts))
3414 dms = self.vapi.nat_det_map_dump()
3415 self.assertEqual(0, dms[0].ses_num)
3417 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3418 def test_session_limit_per_user(self):
3419 """ Deterministic NAT maximum sessions per user limit """
3420 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3422 socket.inet_aton(self.nat_addr),
3424 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3425 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3427 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
3428 src_address=self.pg2.local_ip4n,
3430 template_interval=10)
3431 self.vapi.nat_ipfix()
3434 for port in range(1025, 2025):
3435 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3436 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3437 UDP(sport=port, dport=port))
3440 self.pg0.add_stream(pkts)
3441 self.pg_enable_capture(self.pg_interfaces)
3443 capture = self.pg1.get_capture(len(pkts))
3445 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3446 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3447 UDP(sport=3001, dport=3002))
3448 self.pg0.add_stream(p)
3449 self.pg_enable_capture(self.pg_interfaces)
3451 capture = self.pg1.assert_nothing_captured()
3453 # verify ICMP error packet
3454 capture = self.pg0.get_capture(1)
3456 self.assertTrue(p.haslayer(ICMP))
3458 self.assertEqual(icmp.type, 3)
3459 self.assertEqual(icmp.code, 1)
3460 self.assertTrue(icmp.haslayer(IPerror))
3461 inner_ip = icmp[IPerror]
3462 self.assertEqual(inner_ip[UDPerror].sport, 3001)
3463 self.assertEqual(inner_ip[UDPerror].dport, 3002)
3465 dms = self.vapi.nat_det_map_dump()
3467 self.assertEqual(1000, dms[0].ses_num)
3469 # verify IPFIX logging
3470 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3472 capture = self.pg2.get_capture(2)
3473 ipfix = IPFIXDecoder()
3474 # first load template
3476 self.assertTrue(p.haslayer(IPFIX))
3477 if p.haslayer(Template):
3478 ipfix.add_template(p.getlayer(Template))
3479 # verify events in data set
3481 if p.haslayer(Data):
3482 data = ipfix.decode_data_set(p.getlayer(Set))
3483 self.verify_ipfix_max_entries_per_user(data)
3485 def clear_nat_det(self):
3487 Clear deterministic NAT configuration.
3489 self.vapi.nat_ipfix(enable=0)
3490 self.vapi.nat_det_set_timeouts()
3491 deterministic_mappings = self.vapi.nat_det_map_dump()
3492 for dsm in deterministic_mappings:
3493 self.vapi.nat_det_add_del_map(dsm.in_addr,
3499 interfaces = self.vapi.nat44_interface_dump()
3500 for intf in interfaces:
3501 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
3506 super(TestDeterministicNAT, self).tearDown()
3507 if not self.vpp_dead:
3508 self.logger.info(self.vapi.cli("show nat44 detail"))
3509 self.clear_nat_det()
3512 class TestNAT64(MethodHolder):
3513 """ NAT64 Test Cases """
3516 def setUpClass(cls):
3517 super(TestNAT64, cls).setUpClass()
3520 cls.tcp_port_in = 6303
3521 cls.tcp_port_out = 6303
3522 cls.udp_port_in = 6304
3523 cls.udp_port_out = 6304
3524 cls.icmp_id_in = 6305
3525 cls.icmp_id_out = 6305
3526 cls.nat_addr = '10.0.0.3'
3527 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3529 cls.vrf1_nat_addr = '10.0.10.3'
3530 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
3533 cls.create_pg_interfaces(range(5))
3534 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
3535 cls.ip6_interfaces.append(cls.pg_interfaces[2])
3536 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
3538 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
3540 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
3542 cls.pg0.generate_remote_hosts(2)
3544 for i in cls.ip6_interfaces:
3547 i.configure_ipv6_neighbors()
3549 for i in cls.ip4_interfaces:
3555 cls.pg3.config_ip4()
3556 cls.pg3.resolve_arp()
3557 cls.pg3.config_ip6()
3558 cls.pg3.configure_ipv6_neighbors()
3561 super(TestNAT64, cls).tearDownClass()
3564 def test_pool(self):
3565 """ Add/delete address to NAT64 pool """
3566 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
3568 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
3570 addresses = self.vapi.nat64_pool_addr_dump()
3571 self.assertEqual(len(addresses), 1)
3572 self.assertEqual(addresses[0].address, nat_addr)
3574 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
3576 addresses = self.vapi.nat64_pool_addr_dump()
3577 self.assertEqual(len(addresses), 0)
3579 def test_interface(self):
3580 """ Enable/disable NAT64 feature on the interface """
3581 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3582 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3584 interfaces = self.vapi.nat64_interface_dump()
3585 self.assertEqual(len(interfaces), 2)
3588 for intf in interfaces:
3589 if intf.sw_if_index == self.pg0.sw_if_index:
3590 self.assertEqual(intf.is_inside, 1)
3592 elif intf.sw_if_index == self.pg1.sw_if_index:
3593 self.assertEqual(intf.is_inside, 0)
3595 self.assertTrue(pg0_found)
3596 self.assertTrue(pg1_found)
3598 features = self.vapi.cli("show interface features pg0")
3599 self.assertNotEqual(features.find('nat64-in2out'), -1)
3600 features = self.vapi.cli("show interface features pg1")
3601 self.assertNotEqual(features.find('nat64-out2in'), -1)
3603 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
3604 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
3606 interfaces = self.vapi.nat64_interface_dump()
3607 self.assertEqual(len(interfaces), 0)
3609 def test_static_bib(self):
3610 """ Add/delete static BIB entry """
3611 in_addr = socket.inet_pton(socket.AF_INET6,
3612 '2001:db8:85a3::8a2e:370:7334')
3613 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
3616 proto = IP_PROTOS.tcp
3618 self.vapi.nat64_add_del_static_bib(in_addr,
3623 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3628 self.assertEqual(bibe.i_addr, in_addr)
3629 self.assertEqual(bibe.o_addr, out_addr)
3630 self.assertEqual(bibe.i_port, in_port)
3631 self.assertEqual(bibe.o_port, out_port)
3632 self.assertEqual(static_bib_num, 1)
3634 self.vapi.nat64_add_del_static_bib(in_addr,
3640 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3645 self.assertEqual(static_bib_num, 0)
3647 def test_set_timeouts(self):
3648 """ Set NAT64 timeouts """
3649 # verify default values
3650 timeouts = self.vapi.nat64_get_timeouts()
3651 self.assertEqual(timeouts.udp, 300)
3652 self.assertEqual(timeouts.icmp, 60)
3653 self.assertEqual(timeouts.tcp_trans, 240)
3654 self.assertEqual(timeouts.tcp_est, 7440)
3655 self.assertEqual(timeouts.tcp_incoming_syn, 6)
3657 # set and verify custom values
3658 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
3659 tcp_est=7450, tcp_incoming_syn=10)
3660 timeouts = self.vapi.nat64_get_timeouts()
3661 self.assertEqual(timeouts.udp, 200)
3662 self.assertEqual(timeouts.icmp, 30)
3663 self.assertEqual(timeouts.tcp_trans, 250)
3664 self.assertEqual(timeouts.tcp_est, 7450)
3665 self.assertEqual(timeouts.tcp_incoming_syn, 10)
3667 def test_dynamic(self):
3668 """ NAT64 dynamic translation test """
3669 self.tcp_port_in = 6303
3670 self.udp_port_in = 6304
3671 self.icmp_id_in = 6305
3673 ses_num_start = self.nat64_get_ses_num()
3675 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3677 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3678 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3681 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3682 self.pg0.add_stream(pkts)
3683 self.pg_enable_capture(self.pg_interfaces)
3685 capture = self.pg1.get_capture(len(pkts))
3686 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3687 dst_ip=self.pg1.remote_ip4)
3690 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3691 self.pg1.add_stream(pkts)
3692 self.pg_enable_capture(self.pg_interfaces)
3694 capture = self.pg0.get_capture(len(pkts))
3695 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3696 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3699 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3700 self.pg0.add_stream(pkts)
3701 self.pg_enable_capture(self.pg_interfaces)
3703 capture = self.pg1.get_capture(len(pkts))
3704 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3705 dst_ip=self.pg1.remote_ip4)
3708 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3709 self.pg1.add_stream(pkts)
3710 self.pg_enable_capture(self.pg_interfaces)
3712 capture = self.pg0.get_capture(len(pkts))
3713 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3715 ses_num_end = self.nat64_get_ses_num()
3717 self.assertEqual(ses_num_end - ses_num_start, 3)
3719 # tenant with specific VRF
3720 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3721 self.vrf1_nat_addr_n,
3722 vrf_id=self.vrf1_id)
3723 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3725 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
3726 self.pg2.add_stream(pkts)
3727 self.pg_enable_capture(self.pg_interfaces)
3729 capture = self.pg1.get_capture(len(pkts))
3730 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3731 dst_ip=self.pg1.remote_ip4)
3733 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3734 self.pg1.add_stream(pkts)
3735 self.pg_enable_capture(self.pg_interfaces)
3737 capture = self.pg2.get_capture(len(pkts))
3738 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
3740 def test_static(self):
3741 """ NAT64 static translation test """
3742 self.tcp_port_in = 60303
3743 self.udp_port_in = 60304
3744 self.icmp_id_in = 60305
3745 self.tcp_port_out = 60303
3746 self.udp_port_out = 60304
3747 self.icmp_id_out = 60305
3749 ses_num_start = self.nat64_get_ses_num()
3751 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3753 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3754 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3756 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3761 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3766 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3773 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3774 self.pg0.add_stream(pkts)
3775 self.pg_enable_capture(self.pg_interfaces)
3777 capture = self.pg1.get_capture(len(pkts))
3778 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3779 dst_ip=self.pg1.remote_ip4, same_port=True)
3782 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3783 self.pg1.add_stream(pkts)
3784 self.pg_enable_capture(self.pg_interfaces)
3786 capture = self.pg0.get_capture(len(pkts))
3787 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3788 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3790 ses_num_end = self.nat64_get_ses_num()
3792 self.assertEqual(ses_num_end - ses_num_start, 3)
3794 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3795 def test_session_timeout(self):
3796 """ NAT64 session timeout """
3797 self.icmp_id_in = 1234
3798 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3800 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3801 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3802 self.vapi.nat64_set_timeouts(icmp=5)
3804 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3805 self.pg0.add_stream(pkts)
3806 self.pg_enable_capture(self.pg_interfaces)
3808 capture = self.pg1.get_capture(len(pkts))
3810 ses_num_before_timeout = self.nat64_get_ses_num()
3814 # ICMP session after timeout
3815 ses_num_after_timeout = self.nat64_get_ses_num()
3816 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
3818 def test_icmp_error(self):
3819 """ NAT64 ICMP Error message translation """
3820 self.tcp_port_in = 6303
3821 self.udp_port_in = 6304
3822 self.icmp_id_in = 6305
3824 ses_num_start = self.nat64_get_ses_num()
3826 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3828 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3829 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3831 # send some packets to create sessions
3832 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3833 self.pg0.add_stream(pkts)
3834 self.pg_enable_capture(self.pg_interfaces)
3836 capture_ip4 = self.pg1.get_capture(len(pkts))
3837 self.verify_capture_out(capture_ip4,
3838 nat_ip=self.nat_addr,
3839 dst_ip=self.pg1.remote_ip4)
3841 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3842 self.pg1.add_stream(pkts)
3843 self.pg_enable_capture(self.pg_interfaces)
3845 capture_ip6 = self.pg0.get_capture(len(pkts))
3846 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3847 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
3848 self.pg0.remote_ip6)
3851 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3852 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
3853 ICMPv6DestUnreach(code=1) /
3854 packet[IPv6] for packet in capture_ip6]
3855 self.pg0.add_stream(pkts)
3856 self.pg_enable_capture(self.pg_interfaces)
3858 capture = self.pg1.get_capture(len(pkts))
3859 for packet in capture:
3861 self.assertEqual(packet[IP].src, self.nat_addr)
3862 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3863 self.assertEqual(packet[ICMP].type, 3)
3864 self.assertEqual(packet[ICMP].code, 13)
3865 inner = packet[IPerror]
3866 self.assertEqual(inner.src, self.pg1.remote_ip4)
3867 self.assertEqual(inner.dst, self.nat_addr)
3868 self.check_icmp_checksum(packet)
3869 if inner.haslayer(TCPerror):
3870 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
3871 elif inner.haslayer(UDPerror):
3872 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
3874 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
3876 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3880 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3881 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3882 ICMP(type=3, code=13) /
3883 packet[IP] for packet in capture_ip4]
3884 self.pg1.add_stream(pkts)
3885 self.pg_enable_capture(self.pg_interfaces)
3887 capture = self.pg0.get_capture(len(pkts))
3888 for packet in capture:
3890 self.assertEqual(packet[IPv6].src, ip.src)
3891 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3892 icmp = packet[ICMPv6DestUnreach]
3893 self.assertEqual(icmp.code, 1)
3894 inner = icmp[IPerror6]
3895 self.assertEqual(inner.src, self.pg0.remote_ip6)
3896 self.assertEqual(inner.dst, ip.src)
3897 self.check_icmpv6_checksum(packet)
3898 if inner.haslayer(TCPerror):
3899 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
3900 elif inner.haslayer(UDPerror):
3901 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
3903 self.assertEqual(inner[ICMPv6EchoRequest].id,
3906 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3909 def test_hairpinning(self):
3910 """ NAT64 hairpinning """
3912 client = self.pg0.remote_hosts[0]
3913 server = self.pg0.remote_hosts[1]
3914 server_tcp_in_port = 22
3915 server_tcp_out_port = 4022
3916 server_udp_in_port = 23
3917 server_udp_out_port = 4023
3918 client_tcp_in_port = 1234
3919 client_udp_in_port = 1235
3920 client_tcp_out_port = 0
3921 client_udp_out_port = 0
3922 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
3923 nat_addr_ip6 = ip.src
3925 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3927 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3928 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3930 self.vapi.nat64_add_del_static_bib(server.ip6n,
3933 server_tcp_out_port,
3935 self.vapi.nat64_add_del_static_bib(server.ip6n,
3938 server_udp_out_port,
3943 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3944 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3945 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3947 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3948 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3949 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3951 self.pg0.add_stream(pkts)
3952 self.pg_enable_capture(self.pg_interfaces)
3954 capture = self.pg0.get_capture(len(pkts))
3955 for packet in capture:
3957 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3958 self.assertEqual(packet[IPv6].dst, server.ip6)
3959 if packet.haslayer(TCP):
3960 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3961 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3962 self.check_tcp_checksum(packet)
3963 client_tcp_out_port = packet[TCP].sport
3965 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3966 self.assertEqual(packet[UDP].dport, server_udp_in_port)
3967 self.check_udp_checksum(packet)
3968 client_udp_out_port = packet[UDP].sport
3970 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3975 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3976 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3977 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3979 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3980 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3981 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3983 self.pg0.add_stream(pkts)
3984 self.pg_enable_capture(self.pg_interfaces)
3986 capture = self.pg0.get_capture(len(pkts))
3987 for packet in capture:
3989 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3990 self.assertEqual(packet[IPv6].dst, client.ip6)
3991 if packet.haslayer(TCP):
3992 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3993 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3994 self.check_tcp_checksum(packet)
3996 self.assertEqual(packet[UDP].sport, server_udp_out_port)
3997 self.assertEqual(packet[UDP].dport, client_udp_in_port)
3998 self.check_udp_checksum(packet)
4000 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4005 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4006 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4007 ICMPv6DestUnreach(code=1) /
4008 packet[IPv6] for packet in capture]
4009 self.pg0.add_stream(pkts)
4010 self.pg_enable_capture(self.pg_interfaces)
4012 capture = self.pg0.get_capture(len(pkts))
4013 for packet in capture:
4015 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4016 self.assertEqual(packet[IPv6].dst, server.ip6)
4017 icmp = packet[ICMPv6DestUnreach]
4018 self.assertEqual(icmp.code, 1)
4019 inner = icmp[IPerror6]
4020 self.assertEqual(inner.src, server.ip6)
4021 self.assertEqual(inner.dst, nat_addr_ip6)
4022 self.check_icmpv6_checksum(packet)
4023 if inner.haslayer(TCPerror):
4024 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
4025 self.assertEqual(inner[TCPerror].dport,
4026 client_tcp_out_port)
4028 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
4029 self.assertEqual(inner[UDPerror].dport,
4030 client_udp_out_port)
4032 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4035 def test_prefix(self):
4036 """ NAT64 Network-Specific Prefix """
4038 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4040 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4041 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4042 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4043 self.vrf1_nat_addr_n,
4044 vrf_id=self.vrf1_id)
4045 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4048 global_pref64 = "2001:db8::"
4049 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
4050 global_pref64_len = 32
4051 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
4053 prefix = self.vapi.nat64_prefix_dump()
4054 self.assertEqual(len(prefix), 1)
4055 self.assertEqual(prefix[0].prefix, global_pref64_n)
4056 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
4057 self.assertEqual(prefix[0].vrf_id, 0)
4059 # Add tenant specific prefix
4060 vrf1_pref64 = "2001:db8:122:300::"
4061 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
4062 vrf1_pref64_len = 56
4063 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
4065 vrf_id=self.vrf1_id)
4066 prefix = self.vapi.nat64_prefix_dump()
4067 self.assertEqual(len(prefix), 2)
4070 pkts = self.create_stream_in_ip6(self.pg0,
4073 plen=global_pref64_len)
4074 self.pg0.add_stream(pkts)
4075 self.pg_enable_capture(self.pg_interfaces)
4077 capture = self.pg1.get_capture(len(pkts))
4078 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4079 dst_ip=self.pg1.remote_ip4)
4081 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4082 self.pg1.add_stream(pkts)
4083 self.pg_enable_capture(self.pg_interfaces)
4085 capture = self.pg0.get_capture(len(pkts))
4086 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4089 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
4091 # Tenant specific prefix
4092 pkts = self.create_stream_in_ip6(self.pg2,
4095 plen=vrf1_pref64_len)
4096 self.pg2.add_stream(pkts)
4097 self.pg_enable_capture(self.pg_interfaces)
4099 capture = self.pg1.get_capture(len(pkts))
4100 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4101 dst_ip=self.pg1.remote_ip4)
4103 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4104 self.pg1.add_stream(pkts)
4105 self.pg_enable_capture(self.pg_interfaces)
4107 capture = self.pg2.get_capture(len(pkts))
4108 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4111 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
4113 def test_unknown_proto(self):
4114 """ NAT64 translate packet with unknown protocol """
4116 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4118 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4119 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4120 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4123 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4124 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
4125 TCP(sport=self.tcp_port_in, dport=20))
4126 self.pg0.add_stream(p)
4127 self.pg_enable_capture(self.pg_interfaces)
4129 p = self.pg1.get_capture(1)
4131 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4132 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
4134 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4135 TCP(sport=1234, dport=1234))
4136 self.pg0.add_stream(p)
4137 self.pg_enable_capture(self.pg_interfaces)
4139 p = self.pg1.get_capture(1)
4142 self.assertEqual(packet[IP].src, self.nat_addr)
4143 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4144 self.assertTrue(packet.haslayer(GRE))
4145 self.check_ip_checksum(packet)
4147 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4151 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4152 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4154 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4155 TCP(sport=1234, dport=1234))
4156 self.pg1.add_stream(p)
4157 self.pg_enable_capture(self.pg_interfaces)
4159 p = self.pg0.get_capture(1)
4162 self.assertEqual(packet[IPv6].src, remote_ip6)
4163 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4164 self.assertEqual(packet[IPv6].nh, 47)
4166 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4169 def test_hairpinning_unknown_proto(self):
4170 """ NAT64 translate packet with unknown protocol - hairpinning """
4172 client = self.pg0.remote_hosts[0]
4173 server = self.pg0.remote_hosts[1]
4174 server_tcp_in_port = 22
4175 server_tcp_out_port = 4022
4176 client_tcp_in_port = 1234
4177 client_tcp_out_port = 1235
4178 server_nat_ip = "10.0.0.100"
4179 client_nat_ip = "10.0.0.110"
4180 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
4181 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
4182 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
4183 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
4185 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
4187 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4188 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4190 self.vapi.nat64_add_del_static_bib(server.ip6n,
4193 server_tcp_out_port,
4196 self.vapi.nat64_add_del_static_bib(server.ip6n,
4202 self.vapi.nat64_add_del_static_bib(client.ip6n,
4205 client_tcp_out_port,
4209 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4210 IPv6(src=client.ip6, dst=server_nat_ip6) /
4211 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4212 self.pg0.add_stream(p)
4213 self.pg_enable_capture(self.pg_interfaces)
4215 p = self.pg0.get_capture(1)
4217 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4218 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
4220 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4221 TCP(sport=1234, dport=1234))
4222 self.pg0.add_stream(p)
4223 self.pg_enable_capture(self.pg_interfaces)
4225 p = self.pg0.get_capture(1)
4228 self.assertEqual(packet[IPv6].src, client_nat_ip6)
4229 self.assertEqual(packet[IPv6].dst, server.ip6)
4230 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4232 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4236 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4237 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
4239 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4240 TCP(sport=1234, dport=1234))
4241 self.pg0.add_stream(p)
4242 self.pg_enable_capture(self.pg_interfaces)
4244 p = self.pg0.get_capture(1)
4247 self.assertEqual(packet[IPv6].src, server_nat_ip6)
4248 self.assertEqual(packet[IPv6].dst, client.ip6)
4249 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4251 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4254 def test_one_armed_nat64(self):
4255 """ One armed NAT64 """
4257 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
4261 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4263 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
4264 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
4267 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
4268 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
4269 TCP(sport=12345, dport=80))
4270 self.pg3.add_stream(p)
4271 self.pg_enable_capture(self.pg_interfaces)
4273 capture = self.pg3.get_capture(1)
4278 self.assertEqual(ip.src, self.nat_addr)
4279 self.assertEqual(ip.dst, self.pg3.remote_ip4)
4280 self.assertNotEqual(tcp.sport, 12345)
4281 external_port = tcp.sport
4282 self.assertEqual(tcp.dport, 80)
4283 self.check_tcp_checksum(p)
4284 self.check_ip_checksum(p)
4286 self.logger.error(ppp("Unexpected or invalid packet:", p))
4290 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
4291 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
4292 TCP(sport=80, dport=external_port))
4293 self.pg3.add_stream(p)
4294 self.pg_enable_capture(self.pg_interfaces)
4296 capture = self.pg3.get_capture(1)
4301 self.assertEqual(ip.src, remote_host_ip6)
4302 self.assertEqual(ip.dst, self.pg3.remote_ip6)
4303 self.assertEqual(tcp.sport, 80)
4304 self.assertEqual(tcp.dport, 12345)
4305 self.check_tcp_checksum(p)
4307 self.logger.error(ppp("Unexpected or invalid packet:", p))
4310 def test_frag_in_order(self):
4311 """ NAT64 translate fragments arriving in order """
4312 self.tcp_port_in = random.randint(1025, 65535)
4314 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4316 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4317 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4319 reass = self.vapi.nat_reass_dump()
4320 reass_n_start = len(reass)
4324 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
4325 self.tcp_port_in, 20, data)
4326 self.pg0.add_stream(pkts)
4327 self.pg_enable_capture(self.pg_interfaces)
4329 frags = self.pg1.get_capture(len(pkts))
4330 p = self.reass_frags_and_verify(frags,
4332 self.pg1.remote_ip4)
4333 self.assertEqual(p[TCP].dport, 20)
4334 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
4335 self.tcp_port_out = p[TCP].sport
4336 self.assertEqual(data, p[Raw].load)
4339 data = "A" * 4 + "b" * 16 + "C" * 3
4340 pkts = self.create_stream_frag(self.pg1,
4345 self.pg1.add_stream(pkts)
4346 self.pg_enable_capture(self.pg_interfaces)
4348 frags = self.pg0.get_capture(len(pkts))
4349 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4350 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
4351 self.assertEqual(p[TCP].sport, 20)
4352 self.assertEqual(p[TCP].dport, self.tcp_port_in)
4353 self.assertEqual(data, p[Raw].load)
4355 reass = self.vapi.nat_reass_dump()
4356 reass_n_end = len(reass)
4358 self.assertEqual(reass_n_end - reass_n_start, 2)
4360 def test_reass_hairpinning(self):
4361 """ NAT64 fragments hairpinning """
4363 client = self.pg0.remote_hosts[0]
4364 server = self.pg0.remote_hosts[1]
4365 server_in_port = random.randint(1025, 65535)
4366 server_out_port = random.randint(1025, 65535)
4367 client_in_port = random.randint(1025, 65535)
4368 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4369 nat_addr_ip6 = ip.src
4371 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4373 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4374 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4376 # add static BIB entry for server
4377 self.vapi.nat64_add_del_static_bib(server.ip6n,
4383 # send packet from host to server
4384 pkts = self.create_stream_frag_ip6(self.pg0,
4389 self.pg0.add_stream(pkts)
4390 self.pg_enable_capture(self.pg_interfaces)
4392 frags = self.pg0.get_capture(len(pkts))
4393 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
4394 self.assertNotEqual(p[TCP].sport, client_in_port)
4395 self.assertEqual(p[TCP].dport, server_in_port)
4396 self.assertEqual(data, p[Raw].load)
4398 def test_frag_out_of_order(self):
4399 """ NAT64 translate fragments arriving out of order """
4400 self.tcp_port_in = random.randint(1025, 65535)
4402 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4404 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4405 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4409 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
4410 self.tcp_port_in, 20, data)
4412 self.pg0.add_stream(pkts)
4413 self.pg_enable_capture(self.pg_interfaces)
4415 frags = self.pg1.get_capture(len(pkts))
4416 p = self.reass_frags_and_verify(frags,
4418 self.pg1.remote_ip4)
4419 self.assertEqual(p[TCP].dport, 20)
4420 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
4421 self.tcp_port_out = p[TCP].sport
4422 self.assertEqual(data, p[Raw].load)
4425 data = "A" * 4 + "B" * 16 + "C" * 3
4426 pkts = self.create_stream_frag(self.pg1,
4432 self.pg1.add_stream(pkts)
4433 self.pg_enable_capture(self.pg_interfaces)
4435 frags = self.pg0.get_capture(len(pkts))
4436 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4437 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
4438 self.assertEqual(p[TCP].sport, 20)
4439 self.assertEqual(p[TCP].dport, self.tcp_port_in)
4440 self.assertEqual(data, p[Raw].load)
4442 def test_interface_addr(self):
4443 """ Acquire NAT64 pool addresses from interface """
4444 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
4446 # no address in NAT64 pool
4447 adresses = self.vapi.nat44_address_dump()
4448 self.assertEqual(0, len(adresses))
4450 # configure interface address and check NAT64 address pool
4451 self.pg4.config_ip4()
4452 addresses = self.vapi.nat64_pool_addr_dump()
4453 self.assertEqual(len(addresses), 1)
4454 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
4456 # remove interface address and check NAT64 address pool
4457 self.pg4.unconfig_ip4()
4458 addresses = self.vapi.nat64_pool_addr_dump()
4459 self.assertEqual(0, len(adresses))
4461 def nat64_get_ses_num(self):
4463 Return number of active NAT64 sessions.
4465 st = self.vapi.nat64_st_dump()
4468 def clear_nat64(self):
4470 Clear NAT64 configuration.
4472 self.vapi.nat64_set_timeouts()
4474 interfaces = self.vapi.nat64_interface_dump()
4475 for intf in interfaces:
4476 if intf.is_inside > 1:
4477 self.vapi.nat64_add_del_interface(intf.sw_if_index,
4480 self.vapi.nat64_add_del_interface(intf.sw_if_index,
4484 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4487 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4495 bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
4498 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4506 bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
4509 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4517 adresses = self.vapi.nat64_pool_addr_dump()
4518 for addr in adresses:
4519 self.vapi.nat64_add_del_pool_addr_range(addr.address,
4524 prefixes = self.vapi.nat64_prefix_dump()
4525 for prefix in prefixes:
4526 self.vapi.nat64_add_del_prefix(prefix.prefix,
4528 vrf_id=prefix.vrf_id,
4532 super(TestNAT64, self).tearDown()
4533 if not self.vpp_dead:
4534 self.logger.info(self.vapi.cli("show nat64 pool"))
4535 self.logger.info(self.vapi.cli("show nat64 interfaces"))
4536 self.logger.info(self.vapi.cli("show nat64 prefix"))
4537 self.logger.info(self.vapi.cli("show nat64 bib all"))
4538 self.logger.info(self.vapi.cli("show nat64 session table all"))
4539 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
4543 class TestDSlite(MethodHolder):
4544 """ DS-Lite Test Cases """
4547 def setUpClass(cls):
4548 super(TestDSlite, cls).setUpClass()
4551 cls.nat_addr = '10.0.0.3'
4552 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4554 cls.create_pg_interfaces(range(2))
4556 cls.pg0.config_ip4()
4557 cls.pg0.resolve_arp()
4559 cls.pg1.config_ip6()
4560 cls.pg1.generate_remote_hosts(2)
4561 cls.pg1.configure_ipv6_neighbors()
4564 super(TestDSlite, cls).tearDownClass()
4567 def test_dslite(self):
4568 """ Test DS-Lite """
4569 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
4571 aftr_ip4 = '192.0.0.1'
4572 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
4573 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
4574 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
4575 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
4578 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4579 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
4580 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4581 UDP(sport=20000, dport=10000))
4582 self.pg1.add_stream(p)
4583 self.pg_enable_capture(self.pg_interfaces)
4585 capture = self.pg0.get_capture(1)
4586 capture = capture[0]
4587 self.assertFalse(capture.haslayer(IPv6))
4588 self.assertEqual(capture[IP].src, self.nat_addr)
4589 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4590 self.assertNotEqual(capture[UDP].sport, 20000)
4591 self.assertEqual(capture[UDP].dport, 10000)
4592 self.check_ip_checksum(capture)
4593 out_port = capture[UDP].sport
4595 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4596 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4597 UDP(sport=10000, dport=out_port))
4598 self.pg0.add_stream(p)
4599 self.pg_enable_capture(self.pg_interfaces)
4601 capture = self.pg1.get_capture(1)
4602 capture = capture[0]
4603 self.assertEqual(capture[IPv6].src, aftr_ip6)
4604 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
4605 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4606 self.assertEqual(capture[IP].dst, '192.168.1.1')
4607 self.assertEqual(capture[UDP].sport, 10000)
4608 self.assertEqual(capture[UDP].dport, 20000)
4609 self.check_ip_checksum(capture)
4612 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4613 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
4614 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4615 TCP(sport=20001, dport=10001))
4616 self.pg1.add_stream(p)
4617 self.pg_enable_capture(self.pg_interfaces)
4619 capture = self.pg0.get_capture(1)
4620 capture = capture[0]
4621 self.assertFalse(capture.haslayer(IPv6))
4622 self.assertEqual(capture[IP].src, self.nat_addr)
4623 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4624 self.assertNotEqual(capture[TCP].sport, 20001)
4625 self.assertEqual(capture[TCP].dport, 10001)
4626 self.check_ip_checksum(capture)
4627 self.check_tcp_checksum(capture)
4628 out_port = capture[TCP].sport
4630 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4631 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4632 TCP(sport=10001, dport=out_port))
4633 self.pg0.add_stream(p)
4634 self.pg_enable_capture(self.pg_interfaces)
4636 capture = self.pg1.get_capture(1)
4637 capture = capture[0]
4638 self.assertEqual(capture[IPv6].src, aftr_ip6)
4639 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4640 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4641 self.assertEqual(capture[IP].dst, '192.168.1.1')
4642 self.assertEqual(capture[TCP].sport, 10001)
4643 self.assertEqual(capture[TCP].dport, 20001)
4644 self.check_ip_checksum(capture)
4645 self.check_tcp_checksum(capture)
4648 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4649 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
4650 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4651 ICMP(id=4000, type='echo-request'))
4652 self.pg1.add_stream(p)
4653 self.pg_enable_capture(self.pg_interfaces)
4655 capture = self.pg0.get_capture(1)
4656 capture = capture[0]
4657 self.assertFalse(capture.haslayer(IPv6))
4658 self.assertEqual(capture[IP].src, self.nat_addr)
4659 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4660 self.assertNotEqual(capture[ICMP].id, 4000)
4661 self.check_ip_checksum(capture)
4662 self.check_icmp_checksum(capture)
4663 out_id = capture[ICMP].id
4665 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4666 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4667 ICMP(id=out_id, type='echo-reply'))
4668 self.pg0.add_stream(p)
4669 self.pg_enable_capture(self.pg_interfaces)
4671 capture = self.pg1.get_capture(1)
4672 capture = capture[0]
4673 self.assertEqual(capture[IPv6].src, aftr_ip6)
4674 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4675 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4676 self.assertEqual(capture[IP].dst, '192.168.1.1')
4677 self.assertEqual(capture[ICMP].id, 4000)
4678 self.check_ip_checksum(capture)
4679 self.check_icmp_checksum(capture)
4682 super(TestDSlite, self).tearDown()
4683 if not self.vpp_dead:
4684 self.logger.info(self.vapi.cli("show dslite pool"))
4686 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
4687 self.logger.info(self.vapi.cli("show dslite sessions"))
4689 if __name__ == '__main__':
4690 unittest.main(testRunner=VppTestRunner)