9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
31 super(MethodHolder, cls).setUpClass()
34 super(MethodHolder, self).tearDown()
36 def check_ip_checksum(self, pkt):
38 Check IP checksum of the packet
40 :param pkt: Packet to check IP checksum
42 new = pkt.__class__(str(pkt))
44 new = new.__class__(str(new))
45 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
47 def check_tcp_checksum(self, pkt):
49 Check TCP checksum in IP packet
51 :param pkt: Packet to check TCP checksum
53 new = pkt.__class__(str(pkt))
55 new = new.__class__(str(new))
56 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
58 def check_udp_checksum(self, pkt):
60 Check UDP checksum in IP packet
62 :param pkt: Packet to check UDP checksum
64 new = pkt.__class__(str(pkt))
66 new = new.__class__(str(new))
67 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
69 def check_icmp_errror_embedded(self, pkt):
71 Check ICMP error embeded packet checksum
73 :param pkt: Packet to check ICMP error embeded packet checksum
75 if pkt.haslayer(IPerror):
76 new = pkt.__class__(str(pkt))
77 del new['IPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
81 if pkt.haslayer(TCPerror):
82 new = pkt.__class__(str(pkt))
83 del new['TCPerror'].chksum
84 new = new.__class__(str(new))
85 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
87 if pkt.haslayer(UDPerror):
88 if pkt['UDPerror'].chksum != 0:
89 new = pkt.__class__(str(pkt))
90 del new['UDPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['UDPerror'].chksum,
93 pkt['UDPerror'].chksum)
95 if pkt.haslayer(ICMPerror):
96 del new['ICMPerror'].chksum
97 new = new.__class__(str(new))
98 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
100 def check_icmp_checksum(self, pkt):
102 Check ICMP checksum in IPv4 packet
104 :param pkt: Packet to check ICMP checksum
106 new = pkt.__class__(str(pkt))
107 del new['ICMP'].chksum
108 new = new.__class__(str(new))
109 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110 if pkt.haslayer(IPerror):
111 self.check_icmp_errror_embedded(pkt)
113 def check_icmpv6_checksum(self, pkt):
115 Check ICMPv6 checksum in IPv4 packet
117 :param pkt: Packet to check ICMPv6 checksum
119 new = pkt.__class__(str(pkt))
120 if pkt.haslayer(ICMPv6DestUnreach):
121 del new['ICMPv6DestUnreach'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124 pkt['ICMPv6DestUnreach'].cksum)
125 self.check_icmp_errror_embedded(pkt)
126 if pkt.haslayer(ICMPv6EchoRequest):
127 del new['ICMPv6EchoRequest'].cksum
128 new = new.__class__(str(new))
129 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130 pkt['ICMPv6EchoRequest'].cksum)
131 if pkt.haslayer(ICMPv6EchoReply):
132 del new['ICMPv6EchoReply'].cksum
133 new = new.__class__(str(new))
134 self.assertEqual(new['ICMPv6EchoReply'].cksum,
135 pkt['ICMPv6EchoReply'].cksum)
137 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
139 Create packet stream for inside network
141 :param in_if: Inside interface
142 :param out_if: Outside interface
143 :param dst_ip: Destination address
144 :param ttl: TTL of generated packets
147 dst_ip = out_if.remote_ip4
151 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
152 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
153 TCP(sport=self.tcp_port_in, dport=20))
157 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
158 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
159 UDP(sport=self.udp_port_in, dport=20))
163 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
165 ICMP(id=self.icmp_id_in, type='echo-request'))
170 def compose_ip6(self, ip4, pref, plen):
172 Compose IPv4-embedded IPv6 addresses
174 :param ip4: IPv4 address
175 :param pref: IPv6 prefix
176 :param plen: IPv6 prefix length
177 :returns: IPv4-embedded IPv6 addresses
179 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
180 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
195 pref_n[10] = ip4_n[3]
199 pref_n[10] = ip4_n[2]
200 pref_n[11] = ip4_n[3]
203 pref_n[10] = ip4_n[1]
204 pref_n[11] = ip4_n[2]
205 pref_n[12] = ip4_n[3]
207 pref_n[12] = ip4_n[0]
208 pref_n[13] = ip4_n[1]
209 pref_n[14] = ip4_n[2]
210 pref_n[15] = ip4_n[3]
211 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
213 def extract_ip4(self, ip6, plen):
215 Extract IPv4 address embedded in IPv6 addresses
217 :param ip6: IPv6 address
218 :param plen: IPv6 prefix length
219 :returns: extracted IPv4 address
221 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
253 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
255 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
257 Create IPv6 packet stream for inside network
259 :param in_if: Inside interface
260 :param out_if: Outside interface
261 :param ttl: Hop Limit of generated packets
262 :param pref: NAT64 prefix
263 :param plen: NAT64 prefix length
267 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
269 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
272 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
273 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
274 TCP(sport=self.tcp_port_in, dport=20))
278 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
279 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
280 UDP(sport=self.udp_port_in, dport=20))
284 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
285 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
286 ICMPv6EchoRequest(id=self.icmp_id_in))
291 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
292 use_inside_ports=False):
294 Create packet stream for outside network
296 :param out_if: Outside interface
297 :param dst_ip: Destination IP address (Default use global NAT address)
298 :param ttl: TTL of generated packets
299 :param use_inside_ports: Use inside NAT ports as destination ports
300 instead of outside ports
303 dst_ip = self.nat_addr
304 if not use_inside_ports:
305 tcp_port = self.tcp_port_out
306 udp_port = self.udp_port_out
307 icmp_id = self.icmp_id_out
309 tcp_port = self.tcp_port_in
310 udp_port = self.udp_port_in
311 icmp_id = self.icmp_id_in
314 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
315 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
316 TCP(dport=tcp_port, sport=20))
320 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
321 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
322 UDP(dport=udp_port, sport=20))
326 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
327 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
328 ICMP(id=icmp_id, type='echo-reply'))
333 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
335 Create packet stream for outside network
337 :param out_if: Outside interface
338 :param dst_ip: Destination IP address (Default use global NAT address)
339 :param hl: HL of generated packets
343 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
344 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
345 TCP(dport=self.tcp_port_out, sport=20))
349 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
350 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
351 UDP(dport=self.udp_port_out, sport=20))
355 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
356 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
357 ICMPv6EchoReply(id=self.icmp_id_out))
362 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
363 packet_num=3, dst_ip=None, is_ip6=False):
365 Verify captured packets on outside network
367 :param capture: Captured packets
368 :param nat_ip: Translated IP address (Default use global NAT address)
369 :param same_port: Sorce port number is not translated (Default False)
370 :param packet_num: Expected number of packets (Default 3)
371 :param dst_ip: Destination IP address (Default do not verify)
372 :param is_ip6: If L3 protocol is IPv6 (Default False)
376 ICMP46 = ICMPv6EchoRequest
381 nat_ip = self.nat_addr
382 self.assertEqual(packet_num, len(capture))
383 for packet in capture:
386 self.check_ip_checksum(packet)
387 self.assertEqual(packet[IP46].src, nat_ip)
388 if dst_ip is not None:
389 self.assertEqual(packet[IP46].dst, dst_ip)
390 if packet.haslayer(TCP):
392 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
395 packet[TCP].sport, self.tcp_port_in)
396 self.tcp_port_out = packet[TCP].sport
397 self.check_tcp_checksum(packet)
398 elif packet.haslayer(UDP):
400 self.assertEqual(packet[UDP].sport, self.udp_port_in)
403 packet[UDP].sport, self.udp_port_in)
404 self.udp_port_out = packet[UDP].sport
407 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
409 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
410 self.icmp_id_out = packet[ICMP46].id
412 self.check_icmpv6_checksum(packet)
414 self.check_icmp_checksum(packet)
416 self.logger.error(ppp("Unexpected or invalid packet "
417 "(outside network):", packet))
420 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
421 packet_num=3, dst_ip=None):
423 Verify captured packets on outside network
425 :param capture: Captured packets
426 :param nat_ip: Translated IP address
427 :param same_port: Sorce port number is not translated (Default False)
428 :param packet_num: Expected number of packets (Default 3)
429 :param dst_ip: Destination IP address (Default do not verify)
431 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
434 def verify_capture_in(self, capture, in_if, packet_num=3):
436 Verify captured packets on inside network
438 :param capture: Captured packets
439 :param in_if: Inside interface
440 :param packet_num: Expected number of packets (Default 3)
442 self.assertEqual(packet_num, len(capture))
443 for packet in capture:
445 self.check_ip_checksum(packet)
446 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447 if packet.haslayer(TCP):
448 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
449 self.check_tcp_checksum(packet)
450 elif packet.haslayer(UDP):
451 self.assertEqual(packet[UDP].dport, self.udp_port_in)
453 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
454 self.check_icmp_checksum(packet)
456 self.logger.error(ppp("Unexpected or invalid packet "
457 "(inside network):", packet))
460 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
462 Verify captured IPv6 packets on inside network
464 :param capture: Captured packets
465 :param src_ip: Source IP
466 :param dst_ip: Destination IP address
467 :param packet_num: Expected number of packets (Default 3)
469 self.assertEqual(packet_num, len(capture))
470 for packet in capture:
472 self.assertEqual(packet[IPv6].src, src_ip)
473 self.assertEqual(packet[IPv6].dst, dst_ip)
474 if packet.haslayer(TCP):
475 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
476 self.check_tcp_checksum(packet)
477 elif packet.haslayer(UDP):
478 self.assertEqual(packet[UDP].dport, self.udp_port_in)
479 self.check_udp_checksum(packet)
481 self.assertEqual(packet[ICMPv6EchoReply].id,
483 self.check_icmpv6_checksum(packet)
485 self.logger.error(ppp("Unexpected or invalid packet "
486 "(inside network):", packet))
489 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
491 Verify captured packet that don't have to be translated
493 :param capture: Captured packets
494 :param ingress_if: Ingress interface
495 :param egress_if: Egress interface
497 for packet in capture:
499 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
500 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
501 if packet.haslayer(TCP):
502 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
503 elif packet.haslayer(UDP):
504 self.assertEqual(packet[UDP].sport, self.udp_port_in)
506 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
508 self.logger.error(ppp("Unexpected or invalid packet "
509 "(inside network):", packet))
512 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
513 packet_num=3, icmp_type=11):
515 Verify captured packets with ICMP errors on outside network
517 :param capture: Captured packets
518 :param src_ip: Translated IP address or IP address of VPP
519 (Default use global NAT address)
520 :param packet_num: Expected number of packets (Default 3)
521 :param icmp_type: Type of error ICMP packet
522 we are expecting (Default 11)
525 src_ip = self.nat_addr
526 self.assertEqual(packet_num, len(capture))
527 for packet in capture:
529 self.assertEqual(packet[IP].src, src_ip)
530 self.assertTrue(packet.haslayer(ICMP))
532 self.assertEqual(icmp.type, icmp_type)
533 self.assertTrue(icmp.haslayer(IPerror))
534 inner_ip = icmp[IPerror]
535 if inner_ip.haslayer(TCPerror):
536 self.assertEqual(inner_ip[TCPerror].dport,
538 elif inner_ip.haslayer(UDPerror):
539 self.assertEqual(inner_ip[UDPerror].dport,
542 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
544 self.logger.error(ppp("Unexpected or invalid packet "
545 "(outside network):", packet))
548 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
551 Verify captured packets with ICMP errors on inside network
553 :param capture: Captured packets
554 :param in_if: Inside interface
555 :param packet_num: Expected number of packets (Default 3)
556 :param icmp_type: Type of error ICMP packet
557 we are expecting (Default 11)
559 self.assertEqual(packet_num, len(capture))
560 for packet in capture:
562 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
563 self.assertTrue(packet.haslayer(ICMP))
565 self.assertEqual(icmp.type, icmp_type)
566 self.assertTrue(icmp.haslayer(IPerror))
567 inner_ip = icmp[IPerror]
568 if inner_ip.haslayer(TCPerror):
569 self.assertEqual(inner_ip[TCPerror].sport,
571 elif inner_ip.haslayer(UDPerror):
572 self.assertEqual(inner_ip[UDPerror].sport,
575 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
577 self.logger.error(ppp("Unexpected or invalid packet "
578 "(inside network):", packet))
581 def create_stream_frag(self, src_if, dst, sport, dport, data):
583 Create fragmented packet stream
585 :param src_if: Source interface
586 :param dst: Destination IPv4 address
587 :param sport: Source TCP port
588 :param dport: Destination TCP port
589 :param data: Payload data
592 id = random.randint(0, 65535)
593 p = (IP(src=src_if.remote_ip4, dst=dst) /
594 TCP(sport=sport, dport=dport) /
596 p = p.__class__(str(p))
597 chksum = p['TCP'].chksum
599 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
600 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
601 TCP(sport=sport, dport=dport, chksum=chksum) /
604 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
605 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
606 proto=IP_PROTOS.tcp) /
609 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
610 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
616 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
617 pref=None, plen=0, frag_size=128):
619 Create fragmented packet stream
621 :param src_if: Source interface
622 :param dst: Destination IPv4 address
623 :param sport: Source TCP port
624 :param dport: Destination TCP port
625 :param data: Payload data
626 :param pref: NAT64 prefix
627 :param plen: NAT64 prefix length
628 :param fragsize: size of fragments
632 dst_ip6 = ''.join(['64:ff9b::', dst])
634 dst_ip6 = self.compose_ip6(dst, pref, plen)
636 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
637 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
638 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
639 TCP(sport=sport, dport=dport) /
642 return fragment6(p, frag_size)
644 def reass_frags_and_verify(self, frags, src, dst):
646 Reassemble and verify fragmented packet
648 :param frags: Captured fragments
649 :param src: Source IPv4 address to verify
650 :param dst: Destination IPv4 address to verify
652 :returns: Reassembled IPv4 packet
654 buffer = StringIO.StringIO()
656 self.assertEqual(p[IP].src, src)
657 self.assertEqual(p[IP].dst, dst)
658 self.check_ip_checksum(p)
659 buffer.seek(p[IP].frag * 8)
660 buffer.write(p[IP].payload)
661 ip = frags[0].getlayer(IP)
662 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
663 proto=frags[0][IP].proto)
664 if ip.proto == IP_PROTOS.tcp:
665 p = (ip / TCP(buffer.getvalue()))
666 self.check_tcp_checksum(p)
667 elif ip.proto == IP_PROTOS.udp:
668 p = (ip / UDP(buffer.getvalue()))
671 def reass_frags_and_verify_ip6(self, frags, src, dst):
673 Reassemble and verify fragmented packet
675 :param frags: Captured fragments
676 :param src: Source IPv6 address to verify
677 :param dst: Destination IPv6 address to verify
679 :returns: Reassembled IPv6 packet
681 buffer = StringIO.StringIO()
683 self.assertEqual(p[IPv6].src, src)
684 self.assertEqual(p[IPv6].dst, dst)
685 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
686 buffer.write(p[IPv6ExtHdrFragment].payload)
687 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
688 nh=frags[0][IPv6ExtHdrFragment].nh)
689 if ip.nh == IP_PROTOS.tcp:
690 p = (ip / TCP(buffer.getvalue()))
691 self.check_tcp_checksum(p)
692 elif ip.nh == IP_PROTOS.udp:
693 p = (ip / UDP(buffer.getvalue()))
696 def verify_ipfix_nat44_ses(self, data):
698 Verify IPFIX NAT44 session create/delete event
700 :param data: Decoded IPFIX data records
702 nat44_ses_create_num = 0
703 nat44_ses_delete_num = 0
704 self.assertEqual(6, len(data))
707 self.assertIn(ord(record[230]), [4, 5])
708 if ord(record[230]) == 4:
709 nat44_ses_create_num += 1
711 nat44_ses_delete_num += 1
713 self.assertEqual(self.pg0.remote_ip4n, record[8])
714 # postNATSourceIPv4Address
715 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
718 self.assertEqual(struct.pack("!I", 0), record[234])
719 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
720 if IP_PROTOS.icmp == ord(record[4]):
721 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
722 self.assertEqual(struct.pack("!H", self.icmp_id_out),
724 elif IP_PROTOS.tcp == ord(record[4]):
725 self.assertEqual(struct.pack("!H", self.tcp_port_in),
727 self.assertEqual(struct.pack("!H", self.tcp_port_out),
729 elif IP_PROTOS.udp == ord(record[4]):
730 self.assertEqual(struct.pack("!H", self.udp_port_in),
732 self.assertEqual(struct.pack("!H", self.udp_port_out),
735 self.fail("Invalid protocol")
736 self.assertEqual(3, nat44_ses_create_num)
737 self.assertEqual(3, nat44_ses_delete_num)
739 def verify_ipfix_addr_exhausted(self, data):
741 Verify IPFIX NAT addresses event
743 :param data: Decoded IPFIX data records
745 self.assertEqual(1, len(data))
748 self.assertEqual(ord(record[230]), 3)
750 self.assertEqual(struct.pack("!I", 0), record[283])
752 def verify_ipfix_max_sessions(self, data, limit):
754 Verify IPFIX maximum session entries exceeded event
756 :param data: Decoded IPFIX data records
757 :param limit: Number of maximum session entries that can be created.
759 self.assertEqual(1, len(data))
762 self.assertEqual(ord(record[230]), 13)
763 # natQuotaExceededEvent
764 self.assertEqual(struct.pack("I", 1), record[466])
766 self.assertEqual(struct.pack("I", limit), record[471])
768 def verify_ipfix_max_bibs(self, data, limit):
770 Verify IPFIX maximum BIB entries exceeded event
772 :param data: Decoded IPFIX data records
773 :param limit: Number of maximum BIB entries that can be created.
775 self.assertEqual(1, len(data))
778 self.assertEqual(ord(record[230]), 13)
779 # natQuotaExceededEvent
780 self.assertEqual(struct.pack("I", 2), record[466])
782 self.assertEqual(struct.pack("I", limit), record[472])
784 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
786 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
788 :param data: Decoded IPFIX data records
789 :param limit: Number of maximum fragments pending reassembly
790 :param src_addr: IPv6 source address
792 self.assertEqual(1, len(data))
795 self.assertEqual(ord(record[230]), 13)
796 # natQuotaExceededEvent
797 self.assertEqual(struct.pack("I", 5), record[466])
798 # maxFragmentsPendingReassembly
799 self.assertEqual(struct.pack("I", limit), record[475])
801 self.assertEqual(src_addr, record[27])
803 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
805 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
807 :param data: Decoded IPFIX data records
808 :param limit: Number of maximum fragments pending reassembly
809 :param src_addr: IPv4 source address
811 self.assertEqual(1, len(data))
814 self.assertEqual(ord(record[230]), 13)
815 # natQuotaExceededEvent
816 self.assertEqual(struct.pack("I", 5), record[466])
817 # maxFragmentsPendingReassembly
818 self.assertEqual(struct.pack("I", limit), record[475])
820 self.assertEqual(src_addr, record[8])
822 def verify_ipfix_bib(self, data, is_create, src_addr):
824 Verify IPFIX NAT64 BIB create and delete events
826 :param data: Decoded IPFIX data records
827 :param is_create: Create event if nonzero value otherwise delete event
828 :param src_addr: IPv6 source address
830 self.assertEqual(1, len(data))
834 self.assertEqual(ord(record[230]), 10)
836 self.assertEqual(ord(record[230]), 11)
838 self.assertEqual(src_addr, record[27])
839 # postNATSourceIPv4Address
840 self.assertEqual(self.nat_addr_n, record[225])
842 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
844 self.assertEqual(struct.pack("!I", 0), record[234])
845 # sourceTransportPort
846 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
847 # postNAPTSourceTransportPort
848 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
850 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
853 Verify IPFIX NAT64 session create and delete events
855 :param data: Decoded IPFIX data records
856 :param is_create: Create event if nonzero value otherwise delete event
857 :param src_addr: IPv6 source address
858 :param dst_addr: IPv4 destination address
859 :param dst_port: destination TCP port
861 self.assertEqual(1, len(data))
865 self.assertEqual(ord(record[230]), 6)
867 self.assertEqual(ord(record[230]), 7)
869 self.assertEqual(src_addr, record[27])
870 # destinationIPv6Address
871 self.assertEqual(socket.inet_pton(socket.AF_INET6,
872 self.compose_ip6(dst_addr,
876 # postNATSourceIPv4Address
877 self.assertEqual(self.nat_addr_n, record[225])
878 # postNATDestinationIPv4Address
879 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
882 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
884 self.assertEqual(struct.pack("!I", 0), record[234])
885 # sourceTransportPort
886 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
887 # postNAPTSourceTransportPort
888 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
889 # destinationTransportPort
890 self.assertEqual(struct.pack("!H", dst_port), record[11])
891 # postNAPTDestinationTransportPort
892 self.assertEqual(struct.pack("!H", dst_port), record[228])
895 class TestNAT44(MethodHolder):
896 """ NAT44 Test Cases """
900 super(TestNAT44, cls).setUpClass()
903 cls.tcp_port_in = 6303
904 cls.tcp_port_out = 6303
905 cls.udp_port_in = 6304
906 cls.udp_port_out = 6304
907 cls.icmp_id_in = 6305
908 cls.icmp_id_out = 6305
909 cls.nat_addr = '10.0.0.3'
910 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
911 cls.ipfix_src_port = 4739
912 cls.ipfix_domain_id = 1
914 cls.create_pg_interfaces(range(10))
915 cls.interfaces = list(cls.pg_interfaces[0:4])
917 for i in cls.interfaces:
922 cls.pg0.generate_remote_hosts(3)
923 cls.pg0.configure_ipv4_neighbors()
925 cls.pg1.generate_remote_hosts(1)
926 cls.pg1.configure_ipv4_neighbors()
928 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
929 cls.vapi.ip_table_add_del(10, is_add=1)
930 cls.vapi.ip_table_add_del(20, is_add=1)
932 cls.pg4._local_ip4 = "172.16.255.1"
933 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
934 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
935 cls.pg4.set_table_ip4(10)
936 cls.pg5._local_ip4 = "172.17.255.3"
937 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
938 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
939 cls.pg5.set_table_ip4(10)
940 cls.pg6._local_ip4 = "172.16.255.1"
941 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
942 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
943 cls.pg6.set_table_ip4(20)
944 for i in cls.overlapping_interfaces:
952 cls.pg9.generate_remote_hosts(2)
954 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
955 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
959 cls.pg9.resolve_arp()
960 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
961 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
962 cls.pg9.resolve_arp()
965 super(TestNAT44, cls).tearDownClass()
968 def clear_nat44(self):
970 Clear NAT44 configuration.
972 # I found no elegant way to do this
973 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
974 dst_address_length=32,
975 next_hop_address=self.pg7.remote_ip4n,
976 next_hop_sw_if_index=self.pg7.sw_if_index,
978 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
979 dst_address_length=32,
980 next_hop_address=self.pg8.remote_ip4n,
981 next_hop_sw_if_index=self.pg8.sw_if_index,
984 for intf in [self.pg7, self.pg8]:
985 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
987 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
992 if self.pg7.has_ip4_config:
993 self.pg7.unconfig_ip4()
995 self.vapi.nat44_forwarding_enable_disable(0)
997 interfaces = self.vapi.nat44_interface_addr_dump()
998 for intf in interfaces:
999 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
1000 twice_nat=intf.twice_nat,
1003 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
1004 domain_id=self.ipfix_domain_id)
1005 self.ipfix_src_port = 4739
1006 self.ipfix_domain_id = 1
1008 interfaces = self.vapi.nat44_interface_dump()
1009 for intf in interfaces:
1010 if intf.is_inside > 1:
1011 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1014 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1018 interfaces = self.vapi.nat44_interface_output_feature_dump()
1019 for intf in interfaces:
1020 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
1024 static_mappings = self.vapi.nat44_static_mapping_dump()
1025 for sm in static_mappings:
1026 self.vapi.nat44_add_del_static_mapping(
1027 sm.local_ip_address,
1028 sm.external_ip_address,
1029 local_port=sm.local_port,
1030 external_port=sm.external_port,
1031 addr_only=sm.addr_only,
1033 protocol=sm.protocol,
1034 twice_nat=sm.twice_nat,
1035 self_twice_nat=sm.self_twice_nat,
1036 out2in_only=sm.out2in_only,
1038 external_sw_if_index=sm.external_sw_if_index,
1041 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
1042 for lb_sm in lb_static_mappings:
1043 self.vapi.nat44_add_del_lb_static_mapping(
1044 lb_sm.external_addr,
1045 lb_sm.external_port,
1047 vrf_id=lb_sm.vrf_id,
1048 twice_nat=lb_sm.twice_nat,
1049 self_twice_nat=lb_sm.self_twice_nat,
1050 out2in_only=lb_sm.out2in_only,
1056 identity_mappings = self.vapi.nat44_identity_mapping_dump()
1057 for id_m in identity_mappings:
1058 self.vapi.nat44_add_del_identity_mapping(
1059 addr_only=id_m.addr_only,
1062 sw_if_index=id_m.sw_if_index,
1064 protocol=id_m.protocol,
1067 adresses = self.vapi.nat44_address_dump()
1068 for addr in adresses:
1069 self.vapi.nat44_add_del_address_range(addr.ip_address,
1071 twice_nat=addr.twice_nat,
1074 self.vapi.nat_set_reass()
1075 self.vapi.nat_set_reass(is_ip6=1)
1077 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
1078 local_port=0, external_port=0, vrf_id=0,
1079 is_add=1, external_sw_if_index=0xFFFFFFFF,
1080 proto=0, twice_nat=0, self_twice_nat=0,
1081 out2in_only=0, tag=""):
1083 Add/delete NAT44 static mapping
1085 :param local_ip: Local IP address
1086 :param external_ip: External IP address
1087 :param local_port: Local port number (Optional)
1088 :param external_port: External port number (Optional)
1089 :param vrf_id: VRF ID (Default 0)
1090 :param is_add: 1 if add, 0 if delete (Default add)
1091 :param external_sw_if_index: External interface instead of IP address
1092 :param proto: IP protocol (Mandatory if port specified)
1093 :param twice_nat: 1 if translate external host address and port
1094 :param self_twice_nat: 1 if translate external host address and port
1095 whenever external host address equals
1096 local address of internal host
1097 :param out2in_only: if 1 rule is matching only out2in direction
1098 :param tag: Opaque string tag
1101 if local_port and external_port:
1103 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
1104 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
1105 self.vapi.nat44_add_del_static_mapping(
1108 external_sw_if_index,
1120 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
1122 Add/delete NAT44 address
1124 :param ip: IP address
1125 :param is_add: 1 if add, 0 if delete (Default add)
1126 :param twice_nat: twice NAT address for extenal hosts
1128 nat_addr = socket.inet_pton(socket.AF_INET, ip)
1129 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
1131 twice_nat=twice_nat)
1133 def test_dynamic(self):
1134 """ NAT44 dynamic translation test """
1136 self.nat44_add_address(self.nat_addr)
1137 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1138 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1142 pkts = self.create_stream_in(self.pg0, self.pg1)
1143 self.pg0.add_stream(pkts)
1144 self.pg_enable_capture(self.pg_interfaces)
1146 capture = self.pg1.get_capture(len(pkts))
1147 self.verify_capture_out(capture)
1150 pkts = self.create_stream_out(self.pg1)
1151 self.pg1.add_stream(pkts)
1152 self.pg_enable_capture(self.pg_interfaces)
1154 capture = self.pg0.get_capture(len(pkts))
1155 self.verify_capture_in(capture, self.pg0)
1157 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1158 """ NAT44 handling of client packets with TTL=1 """
1160 self.nat44_add_address(self.nat_addr)
1161 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1162 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1165 # Client side - generate traffic
1166 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1167 self.pg0.add_stream(pkts)
1168 self.pg_enable_capture(self.pg_interfaces)
1171 # Client side - verify ICMP type 11 packets
1172 capture = self.pg0.get_capture(len(pkts))
1173 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1175 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1176 """ NAT44 handling of server packets with TTL=1 """
1178 self.nat44_add_address(self.nat_addr)
1179 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1180 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1183 # Client side - create sessions
1184 pkts = self.create_stream_in(self.pg0, self.pg1)
1185 self.pg0.add_stream(pkts)
1186 self.pg_enable_capture(self.pg_interfaces)
1189 # Server side - generate traffic
1190 capture = self.pg1.get_capture(len(pkts))
1191 self.verify_capture_out(capture)
1192 pkts = self.create_stream_out(self.pg1, ttl=1)
1193 self.pg1.add_stream(pkts)
1194 self.pg_enable_capture(self.pg_interfaces)
1197 # Server side - verify ICMP type 11 packets
1198 capture = self.pg1.get_capture(len(pkts))
1199 self.verify_capture_out_with_icmp_errors(capture,
1200 src_ip=self.pg1.local_ip4)
1202 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1203 """ NAT44 handling of error responses to client packets with TTL=2 """
1205 self.nat44_add_address(self.nat_addr)
1206 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1207 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1210 # Client side - generate traffic
1211 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1212 self.pg0.add_stream(pkts)
1213 self.pg_enable_capture(self.pg_interfaces)
1216 # Server side - simulate ICMP type 11 response
1217 capture = self.pg1.get_capture(len(pkts))
1218 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1219 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1220 ICMP(type=11) / packet[IP] for packet in capture]
1221 self.pg1.add_stream(pkts)
1222 self.pg_enable_capture(self.pg_interfaces)
1225 # Client side - verify ICMP type 11 packets
1226 capture = self.pg0.get_capture(len(pkts))
1227 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1229 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1230 """ NAT44 handling of error responses to server packets with TTL=2 """
1232 self.nat44_add_address(self.nat_addr)
1233 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1234 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1237 # Client side - create sessions
1238 pkts = self.create_stream_in(self.pg0, self.pg1)
1239 self.pg0.add_stream(pkts)
1240 self.pg_enable_capture(self.pg_interfaces)
1243 # Server side - generate traffic
1244 capture = self.pg1.get_capture(len(pkts))
1245 self.verify_capture_out(capture)
1246 pkts = self.create_stream_out(self.pg1, ttl=2)
1247 self.pg1.add_stream(pkts)
1248 self.pg_enable_capture(self.pg_interfaces)
1251 # Client side - simulate ICMP type 11 response
1252 capture = self.pg0.get_capture(len(pkts))
1253 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1254 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1255 ICMP(type=11) / packet[IP] for packet in capture]
1256 self.pg0.add_stream(pkts)
1257 self.pg_enable_capture(self.pg_interfaces)
1260 # Server side - verify ICMP type 11 packets
1261 capture = self.pg1.get_capture(len(pkts))
1262 self.verify_capture_out_with_icmp_errors(capture)
1264 def test_ping_out_interface_from_outside(self):
1265 """ Ping NAT44 out interface from outside network """
1267 self.nat44_add_address(self.nat_addr)
1268 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1269 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1272 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1273 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1274 ICMP(id=self.icmp_id_out, type='echo-request'))
1276 self.pg1.add_stream(pkts)
1277 self.pg_enable_capture(self.pg_interfaces)
1279 capture = self.pg1.get_capture(len(pkts))
1280 self.assertEqual(1, len(capture))
1283 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1284 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1285 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1286 self.assertEqual(packet[ICMP].type, 0) # echo reply
1288 self.logger.error(ppp("Unexpected or invalid packet "
1289 "(outside network):", packet))
1292 def test_ping_internal_host_from_outside(self):
1293 """ Ping internal host from outside network """
1295 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1296 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1297 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1301 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1302 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1303 ICMP(id=self.icmp_id_out, type='echo-request'))
1304 self.pg1.add_stream(pkt)
1305 self.pg_enable_capture(self.pg_interfaces)
1307 capture = self.pg0.get_capture(1)
1308 self.verify_capture_in(capture, self.pg0, packet_num=1)
1309 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1312 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1313 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1314 ICMP(id=self.icmp_id_in, type='echo-reply'))
1315 self.pg0.add_stream(pkt)
1316 self.pg_enable_capture(self.pg_interfaces)
1318 capture = self.pg1.get_capture(1)
1319 self.verify_capture_out(capture, same_port=True, packet_num=1)
1320 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1322 def test_forwarding(self):
1323 """ NAT44 forwarding test """
1325 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1326 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1328 self.vapi.nat44_forwarding_enable_disable(1)
1330 real_ip = self.pg0.remote_ip4n
1331 alias_ip = self.nat_addr_n
1332 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1333 external_ip=alias_ip)
1336 # in2out - static mapping match
1338 pkts = self.create_stream_out(self.pg1)
1339 self.pg1.add_stream(pkts)
1340 self.pg_enable_capture(self.pg_interfaces)
1342 capture = self.pg0.get_capture(len(pkts))
1343 self.verify_capture_in(capture, self.pg0)
1345 pkts = self.create_stream_in(self.pg0, self.pg1)
1346 self.pg0.add_stream(pkts)
1347 self.pg_enable_capture(self.pg_interfaces)
1349 capture = self.pg1.get_capture(len(pkts))
1350 self.verify_capture_out(capture, same_port=True)
1352 # in2out - no static mapping match
1354 host0 = self.pg0.remote_hosts[0]
1355 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1357 pkts = self.create_stream_out(self.pg1,
1358 dst_ip=self.pg0.remote_ip4,
1359 use_inside_ports=True)
1360 self.pg1.add_stream(pkts)
1361 self.pg_enable_capture(self.pg_interfaces)
1363 capture = self.pg0.get_capture(len(pkts))
1364 self.verify_capture_in(capture, self.pg0)
1366 pkts = self.create_stream_in(self.pg0, self.pg1)
1367 self.pg0.add_stream(pkts)
1368 self.pg_enable_capture(self.pg_interfaces)
1370 capture = self.pg1.get_capture(len(pkts))
1371 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1374 self.pg0.remote_hosts[0] = host0
1377 self.vapi.nat44_forwarding_enable_disable(0)
1378 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1379 external_ip=alias_ip,
1382 def test_static_in(self):
1383 """ 1:1 NAT initialized from inside network """
1385 nat_ip = "10.0.0.10"
1386 self.tcp_port_out = 6303
1387 self.udp_port_out = 6304
1388 self.icmp_id_out = 6305
1390 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1391 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1392 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1394 sm = self.vapi.nat44_static_mapping_dump()
1395 self.assertEqual(len(sm), 1)
1396 self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1397 self.assertEqual(sm[0].protocol, 0)
1398 self.assertEqual(sm[0].local_port, 0)
1399 self.assertEqual(sm[0].external_port, 0)
1402 pkts = self.create_stream_in(self.pg0, self.pg1)
1403 self.pg0.add_stream(pkts)
1404 self.pg_enable_capture(self.pg_interfaces)
1406 capture = self.pg1.get_capture(len(pkts))
1407 self.verify_capture_out(capture, nat_ip, True)
1410 pkts = self.create_stream_out(self.pg1, nat_ip)
1411 self.pg1.add_stream(pkts)
1412 self.pg_enable_capture(self.pg_interfaces)
1414 capture = self.pg0.get_capture(len(pkts))
1415 self.verify_capture_in(capture, self.pg0)
1417 def test_static_out(self):
1418 """ 1:1 NAT initialized from outside network """
1420 nat_ip = "10.0.0.20"
1421 self.tcp_port_out = 6303
1422 self.udp_port_out = 6304
1423 self.icmp_id_out = 6305
1426 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1427 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1428 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1430 sm = self.vapi.nat44_static_mapping_dump()
1431 self.assertEqual(len(sm), 1)
1432 self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1435 pkts = self.create_stream_out(self.pg1, nat_ip)
1436 self.pg1.add_stream(pkts)
1437 self.pg_enable_capture(self.pg_interfaces)
1439 capture = self.pg0.get_capture(len(pkts))
1440 self.verify_capture_in(capture, self.pg0)
1443 pkts = self.create_stream_in(self.pg0, self.pg1)
1444 self.pg0.add_stream(pkts)
1445 self.pg_enable_capture(self.pg_interfaces)
1447 capture = self.pg1.get_capture(len(pkts))
1448 self.verify_capture_out(capture, nat_ip, True)
1450 def test_static_with_port_in(self):
1451 """ 1:1 NAPT initialized from inside network """
1453 self.tcp_port_out = 3606
1454 self.udp_port_out = 3607
1455 self.icmp_id_out = 3608
1457 self.nat44_add_address(self.nat_addr)
1458 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1459 self.tcp_port_in, self.tcp_port_out,
1460 proto=IP_PROTOS.tcp)
1461 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1462 self.udp_port_in, self.udp_port_out,
1463 proto=IP_PROTOS.udp)
1464 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1465 self.icmp_id_in, self.icmp_id_out,
1466 proto=IP_PROTOS.icmp)
1467 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1468 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1472 pkts = self.create_stream_in(self.pg0, self.pg1)
1473 self.pg0.add_stream(pkts)
1474 self.pg_enable_capture(self.pg_interfaces)
1476 capture = self.pg1.get_capture(len(pkts))
1477 self.verify_capture_out(capture)
1480 pkts = self.create_stream_out(self.pg1)
1481 self.pg1.add_stream(pkts)
1482 self.pg_enable_capture(self.pg_interfaces)
1484 capture = self.pg0.get_capture(len(pkts))
1485 self.verify_capture_in(capture, self.pg0)
1487 def test_static_with_port_out(self):
1488 """ 1:1 NAPT initialized from outside network """
1490 self.tcp_port_out = 30606
1491 self.udp_port_out = 30607
1492 self.icmp_id_out = 30608
1494 self.nat44_add_address(self.nat_addr)
1495 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1496 self.tcp_port_in, self.tcp_port_out,
1497 proto=IP_PROTOS.tcp)
1498 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1499 self.udp_port_in, self.udp_port_out,
1500 proto=IP_PROTOS.udp)
1501 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1502 self.icmp_id_in, self.icmp_id_out,
1503 proto=IP_PROTOS.icmp)
1504 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1505 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1509 pkts = self.create_stream_out(self.pg1)
1510 self.pg1.add_stream(pkts)
1511 self.pg_enable_capture(self.pg_interfaces)
1513 capture = self.pg0.get_capture(len(pkts))
1514 self.verify_capture_in(capture, self.pg0)
1517 pkts = self.create_stream_in(self.pg0, self.pg1)
1518 self.pg0.add_stream(pkts)
1519 self.pg_enable_capture(self.pg_interfaces)
1521 capture = self.pg1.get_capture(len(pkts))
1522 self.verify_capture_out(capture)
1524 def test_static_with_port_out2(self):
1525 """ 1:1 NAPT symmetrical rule """
1530 self.vapi.nat44_forwarding_enable_disable(1)
1531 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1532 local_port, external_port,
1533 proto=IP_PROTOS.tcp, out2in_only=1)
1534 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1535 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1538 # from client to service
1539 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1540 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1541 TCP(sport=12345, dport=external_port))
1542 self.pg1.add_stream(p)
1543 self.pg_enable_capture(self.pg_interfaces)
1545 capture = self.pg0.get_capture(1)
1551 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1552 self.assertEqual(tcp.dport, local_port)
1553 self.check_tcp_checksum(p)
1554 self.check_ip_checksum(p)
1556 self.logger.error(ppp("Unexpected or invalid packet:", p))
1560 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1561 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1562 ICMP(type=11) / capture[0][IP])
1563 self.pg0.add_stream(p)
1564 self.pg_enable_capture(self.pg_interfaces)
1566 capture = self.pg1.get_capture(1)
1569 self.assertEqual(p[IP].src, self.nat_addr)
1571 self.assertEqual(inner.dst, self.nat_addr)
1572 self.assertEqual(inner[TCPerror].dport, external_port)
1574 self.logger.error(ppp("Unexpected or invalid packet:", p))
1577 # from service back to client
1578 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1579 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1580 TCP(sport=local_port, dport=12345))
1581 self.pg0.add_stream(p)
1582 self.pg_enable_capture(self.pg_interfaces)
1584 capture = self.pg1.get_capture(1)
1589 self.assertEqual(ip.src, self.nat_addr)
1590 self.assertEqual(tcp.sport, external_port)
1591 self.check_tcp_checksum(p)
1592 self.check_ip_checksum(p)
1594 self.logger.error(ppp("Unexpected or invalid packet:", p))
1598 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1599 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1600 ICMP(type=11) / capture[0][IP])
1601 self.pg1.add_stream(p)
1602 self.pg_enable_capture(self.pg_interfaces)
1604 capture = self.pg0.get_capture(1)
1607 self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
1609 self.assertEqual(inner.src, self.pg0.remote_ip4)
1610 self.assertEqual(inner[TCPerror].sport, local_port)
1612 self.logger.error(ppp("Unexpected or invalid packet:", p))
1615 # from client to server (no translation)
1616 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1617 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1618 TCP(sport=12346, dport=local_port))
1619 self.pg1.add_stream(p)
1620 self.pg_enable_capture(self.pg_interfaces)
1622 capture = self.pg0.get_capture(1)
1628 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1629 self.assertEqual(tcp.dport, local_port)
1630 self.check_tcp_checksum(p)
1631 self.check_ip_checksum(p)
1633 self.logger.error(ppp("Unexpected or invalid packet:", p))
1636 # from service back to client (no translation)
1637 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1638 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1639 TCP(sport=local_port, dport=12346))
1640 self.pg0.add_stream(p)
1641 self.pg_enable_capture(self.pg_interfaces)
1643 capture = self.pg1.get_capture(1)
1648 self.assertEqual(ip.src, self.pg0.remote_ip4)
1649 self.assertEqual(tcp.sport, local_port)
1650 self.check_tcp_checksum(p)
1651 self.check_ip_checksum(p)
1653 self.logger.error(ppp("Unexpected or invalid packet:", p))
1656 def test_static_vrf_aware(self):
1657 """ 1:1 NAT VRF awareness """
1659 nat_ip1 = "10.0.0.30"
1660 nat_ip2 = "10.0.0.40"
1661 self.tcp_port_out = 6303
1662 self.udp_port_out = 6304
1663 self.icmp_id_out = 6305
1665 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1667 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1669 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1671 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1672 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1674 # inside interface VRF match NAT44 static mapping VRF
1675 pkts = self.create_stream_in(self.pg4, self.pg3)
1676 self.pg4.add_stream(pkts)
1677 self.pg_enable_capture(self.pg_interfaces)
1679 capture = self.pg3.get_capture(len(pkts))
1680 self.verify_capture_out(capture, nat_ip1, True)
1682 # inside interface VRF don't match NAT44 static mapping VRF (packets
1684 pkts = self.create_stream_in(self.pg0, self.pg3)
1685 self.pg0.add_stream(pkts)
1686 self.pg_enable_capture(self.pg_interfaces)
1688 self.pg3.assert_nothing_captured()
1690 def test_dynamic_to_static(self):
1691 """ Switch from dynamic translation to 1:1NAT """
1692 nat_ip = "10.0.0.10"
1693 self.tcp_port_out = 6303
1694 self.udp_port_out = 6304
1695 self.icmp_id_out = 6305
1697 self.nat44_add_address(self.nat_addr)
1698 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1699 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1703 pkts = self.create_stream_in(self.pg0, self.pg1)
1704 self.pg0.add_stream(pkts)
1705 self.pg_enable_capture(self.pg_interfaces)
1707 capture = self.pg1.get_capture(len(pkts))
1708 self.verify_capture_out(capture)
1711 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1712 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
1713 self.assertEqual(len(sessions), 0)
1714 pkts = self.create_stream_in(self.pg0, self.pg1)
1715 self.pg0.add_stream(pkts)
1716 self.pg_enable_capture(self.pg_interfaces)
1718 capture = self.pg1.get_capture(len(pkts))
1719 self.verify_capture_out(capture, nat_ip, True)
1721 def test_identity_nat(self):
1722 """ Identity NAT """
1724 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1725 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1726 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1729 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1730 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1731 TCP(sport=12345, dport=56789))
1732 self.pg1.add_stream(p)
1733 self.pg_enable_capture(self.pg_interfaces)
1735 capture = self.pg0.get_capture(1)
1740 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1741 self.assertEqual(ip.src, self.pg1.remote_ip4)
1742 self.assertEqual(tcp.dport, 56789)
1743 self.assertEqual(tcp.sport, 12345)
1744 self.check_tcp_checksum(p)
1745 self.check_ip_checksum(p)
1747 self.logger.error(ppp("Unexpected or invalid packet:", p))
1750 def test_static_lb(self):
1751 """ NAT44 local service load balancing """
1752 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1755 server1 = self.pg0.remote_hosts[0]
1756 server2 = self.pg0.remote_hosts[1]
1758 locals = [{'addr': server1.ip4n,
1761 {'addr': server2.ip4n,
1765 self.nat44_add_address(self.nat_addr)
1766 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1769 local_num=len(locals),
1771 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1772 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1775 # from client to service
1776 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1777 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1778 TCP(sport=12345, dport=external_port))
1779 self.pg1.add_stream(p)
1780 self.pg_enable_capture(self.pg_interfaces)
1782 capture = self.pg0.get_capture(1)
1788 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1789 if ip.dst == server1.ip4:
1793 self.assertEqual(tcp.dport, local_port)
1794 self.check_tcp_checksum(p)
1795 self.check_ip_checksum(p)
1797 self.logger.error(ppp("Unexpected or invalid packet:", p))
1800 # from service back to client
1801 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1802 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1803 TCP(sport=local_port, dport=12345))
1804 self.pg0.add_stream(p)
1805 self.pg_enable_capture(self.pg_interfaces)
1807 capture = self.pg1.get_capture(1)
1812 self.assertEqual(ip.src, self.nat_addr)
1813 self.assertEqual(tcp.sport, external_port)
1814 self.check_tcp_checksum(p)
1815 self.check_ip_checksum(p)
1817 self.logger.error(ppp("Unexpected or invalid packet:", p))
1820 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1821 def test_static_lb_multi_clients(self):
1822 """ NAT44 local service load balancing - multiple clients"""
1824 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1827 server1 = self.pg0.remote_hosts[0]
1828 server2 = self.pg0.remote_hosts[1]
1830 locals = [{'addr': server1.ip4n,
1833 {'addr': server2.ip4n,
1837 self.nat44_add_address(self.nat_addr)
1838 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1841 local_num=len(locals),
1843 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1844 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1849 clients = ip4_range(self.pg1.remote_ip4, 10, 50)
1851 for client in clients:
1852 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1853 IP(src=client, dst=self.nat_addr) /
1854 TCP(sport=12345, dport=external_port))
1856 self.pg1.add_stream(pkts)
1857 self.pg_enable_capture(self.pg_interfaces)
1859 capture = self.pg0.get_capture(len(pkts))
1861 if p[IP].dst == server1.ip4:
1865 self.assertTrue(server1_n > server2_n)
1867 def test_static_lb_2(self):
1868 """ NAT44 local service load balancing (asymmetrical rule) """
1869 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1872 server1 = self.pg0.remote_hosts[0]
1873 server2 = self.pg0.remote_hosts[1]
1875 locals = [{'addr': server1.ip4n,
1878 {'addr': server2.ip4n,
1882 self.vapi.nat44_forwarding_enable_disable(1)
1883 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1887 local_num=len(locals),
1889 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1890 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1893 # from client to service
1894 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1895 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1896 TCP(sport=12345, dport=external_port))
1897 self.pg1.add_stream(p)
1898 self.pg_enable_capture(self.pg_interfaces)
1900 capture = self.pg0.get_capture(1)
1906 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1907 if ip.dst == server1.ip4:
1911 self.assertEqual(tcp.dport, local_port)
1912 self.check_tcp_checksum(p)
1913 self.check_ip_checksum(p)
1915 self.logger.error(ppp("Unexpected or invalid packet:", p))
1918 # from service back to client
1919 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1920 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1921 TCP(sport=local_port, dport=12345))
1922 self.pg0.add_stream(p)
1923 self.pg_enable_capture(self.pg_interfaces)
1925 capture = self.pg1.get_capture(1)
1930 self.assertEqual(ip.src, self.nat_addr)
1931 self.assertEqual(tcp.sport, external_port)
1932 self.check_tcp_checksum(p)
1933 self.check_ip_checksum(p)
1935 self.logger.error(ppp("Unexpected or invalid packet:", p))
1938 # from client to server (no translation)
1939 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1940 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
1941 TCP(sport=12346, dport=local_port))
1942 self.pg1.add_stream(p)
1943 self.pg_enable_capture(self.pg_interfaces)
1945 capture = self.pg0.get_capture(1)
1951 self.assertEqual(ip.dst, server1.ip4)
1952 self.assertEqual(tcp.dport, local_port)
1953 self.check_tcp_checksum(p)
1954 self.check_ip_checksum(p)
1956 self.logger.error(ppp("Unexpected or invalid packet:", p))
1959 # from service back to client (no translation)
1960 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
1961 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
1962 TCP(sport=local_port, dport=12346))
1963 self.pg0.add_stream(p)
1964 self.pg_enable_capture(self.pg_interfaces)
1966 capture = self.pg1.get_capture(1)
1971 self.assertEqual(ip.src, server1.ip4)
1972 self.assertEqual(tcp.sport, local_port)
1973 self.check_tcp_checksum(p)
1974 self.check_ip_checksum(p)
1976 self.logger.error(ppp("Unexpected or invalid packet:", p))
1979 def test_multiple_inside_interfaces(self):
1980 """ NAT44 multiple non-overlapping address space inside interfaces """
1982 self.nat44_add_address(self.nat_addr)
1983 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1984 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1985 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1988 # between two NAT44 inside interfaces (no translation)
1989 pkts = self.create_stream_in(self.pg0, self.pg1)
1990 self.pg0.add_stream(pkts)
1991 self.pg_enable_capture(self.pg_interfaces)
1993 capture = self.pg1.get_capture(len(pkts))
1994 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1996 # from NAT44 inside to interface without NAT44 feature (no translation)
1997 pkts = self.create_stream_in(self.pg0, self.pg2)
1998 self.pg0.add_stream(pkts)
1999 self.pg_enable_capture(self.pg_interfaces)
2001 capture = self.pg2.get_capture(len(pkts))
2002 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
2004 # in2out 1st interface
2005 pkts = self.create_stream_in(self.pg0, self.pg3)
2006 self.pg0.add_stream(pkts)
2007 self.pg_enable_capture(self.pg_interfaces)
2009 capture = self.pg3.get_capture(len(pkts))
2010 self.verify_capture_out(capture)
2012 # out2in 1st interface
2013 pkts = self.create_stream_out(self.pg3)
2014 self.pg3.add_stream(pkts)
2015 self.pg_enable_capture(self.pg_interfaces)
2017 capture = self.pg0.get_capture(len(pkts))
2018 self.verify_capture_in(capture, self.pg0)
2020 # in2out 2nd interface
2021 pkts = self.create_stream_in(self.pg1, self.pg3)
2022 self.pg1.add_stream(pkts)
2023 self.pg_enable_capture(self.pg_interfaces)
2025 capture = self.pg3.get_capture(len(pkts))
2026 self.verify_capture_out(capture)
2028 # out2in 2nd interface
2029 pkts = self.create_stream_out(self.pg3)
2030 self.pg3.add_stream(pkts)
2031 self.pg_enable_capture(self.pg_interfaces)
2033 capture = self.pg1.get_capture(len(pkts))
2034 self.verify_capture_in(capture, self.pg1)
2036 def test_inside_overlapping_interfaces(self):
2037 """ NAT44 multiple inside interfaces with overlapping address space """
2039 static_nat_ip = "10.0.0.10"
2040 self.nat44_add_address(self.nat_addr)
2041 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
2043 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
2044 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
2045 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
2046 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
2049 # between NAT44 inside interfaces with same VRF (no translation)
2050 pkts = self.create_stream_in(self.pg4, self.pg5)
2051 self.pg4.add_stream(pkts)
2052 self.pg_enable_capture(self.pg_interfaces)
2054 capture = self.pg5.get_capture(len(pkts))
2055 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
2057 # between NAT44 inside interfaces with different VRF (hairpinning)
2058 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
2059 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
2060 TCP(sport=1234, dport=5678))
2061 self.pg4.add_stream(p)
2062 self.pg_enable_capture(self.pg_interfaces)
2064 capture = self.pg6.get_capture(1)
2069 self.assertEqual(ip.src, self.nat_addr)
2070 self.assertEqual(ip.dst, self.pg6.remote_ip4)
2071 self.assertNotEqual(tcp.sport, 1234)
2072 self.assertEqual(tcp.dport, 5678)
2074 self.logger.error(ppp("Unexpected or invalid packet:", p))
2077 # in2out 1st interface
2078 pkts = self.create_stream_in(self.pg4, self.pg3)
2079 self.pg4.add_stream(pkts)
2080 self.pg_enable_capture(self.pg_interfaces)
2082 capture = self.pg3.get_capture(len(pkts))
2083 self.verify_capture_out(capture)
2085 # out2in 1st interface
2086 pkts = self.create_stream_out(self.pg3)
2087 self.pg3.add_stream(pkts)
2088 self.pg_enable_capture(self.pg_interfaces)
2090 capture = self.pg4.get_capture(len(pkts))
2091 self.verify_capture_in(capture, self.pg4)
2093 # in2out 2nd interface
2094 pkts = self.create_stream_in(self.pg5, self.pg3)
2095 self.pg5.add_stream(pkts)
2096 self.pg_enable_capture(self.pg_interfaces)
2098 capture = self.pg3.get_capture(len(pkts))
2099 self.verify_capture_out(capture)
2101 # out2in 2nd interface
2102 pkts = self.create_stream_out(self.pg3)
2103 self.pg3.add_stream(pkts)
2104 self.pg_enable_capture(self.pg_interfaces)
2106 capture = self.pg5.get_capture(len(pkts))
2107 self.verify_capture_in(capture, self.pg5)
2110 addresses = self.vapi.nat44_address_dump()
2111 self.assertEqual(len(addresses), 1)
2112 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
2113 self.assertEqual(len(sessions), 3)
2114 for session in sessions:
2115 self.assertFalse(session.is_static)
2116 self.assertEqual(session.inside_ip_address[0:4],
2117 self.pg5.remote_ip4n)
2118 self.assertEqual(session.outside_ip_address,
2119 addresses[0].ip_address)
2120 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
2121 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
2122 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
2123 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
2124 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
2125 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
2126 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
2127 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
2128 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
2130 # in2out 3rd interface
2131 pkts = self.create_stream_in(self.pg6, self.pg3)
2132 self.pg6.add_stream(pkts)
2133 self.pg_enable_capture(self.pg_interfaces)
2135 capture = self.pg3.get_capture(len(pkts))
2136 self.verify_capture_out(capture, static_nat_ip, True)
2138 # out2in 3rd interface
2139 pkts = self.create_stream_out(self.pg3, static_nat_ip)
2140 self.pg3.add_stream(pkts)
2141 self.pg_enable_capture(self.pg_interfaces)
2143 capture = self.pg6.get_capture(len(pkts))
2144 self.verify_capture_in(capture, self.pg6)
2146 # general user and session dump verifications
2147 users = self.vapi.nat44_user_dump()
2148 self.assertTrue(len(users) >= 3)
2149 addresses = self.vapi.nat44_address_dump()
2150 self.assertEqual(len(addresses), 1)
2152 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
2154 for session in sessions:
2155 self.assertEqual(user.ip_address, session.inside_ip_address)
2156 self.assertTrue(session.total_bytes > session.total_pkts > 0)
2157 self.assertTrue(session.protocol in
2158 [IP_PROTOS.tcp, IP_PROTOS.udp,
2162 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
2163 self.assertTrue(len(sessions) >= 4)
2164 for session in sessions:
2165 self.assertFalse(session.is_static)
2166 self.assertEqual(session.inside_ip_address[0:4],
2167 self.pg4.remote_ip4n)
2168 self.assertEqual(session.outside_ip_address,
2169 addresses[0].ip_address)
2172 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
2173 self.assertTrue(len(sessions) >= 3)
2174 for session in sessions:
2175 self.assertTrue(session.is_static)
2176 self.assertEqual(session.inside_ip_address[0:4],
2177 self.pg6.remote_ip4n)
2178 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
2179 map(int, static_nat_ip.split('.')))
2180 self.assertTrue(session.inside_port in
2181 [self.tcp_port_in, self.udp_port_in,
2184 def test_hairpinning(self):
2185 """ NAT44 hairpinning - 1:1 NAPT """
2187 host = self.pg0.remote_hosts[0]
2188 server = self.pg0.remote_hosts[1]
2191 server_in_port = 5678
2192 server_out_port = 8765
2194 self.nat44_add_address(self.nat_addr)
2195 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2196 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2198 # add static mapping for server
2199 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2200 server_in_port, server_out_port,
2201 proto=IP_PROTOS.tcp)
2203 # send packet from host to server
2204 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2205 IP(src=host.ip4, dst=self.nat_addr) /
2206 TCP(sport=host_in_port, dport=server_out_port))
2207 self.pg0.add_stream(p)
2208 self.pg_enable_capture(self.pg_interfaces)
2210 capture = self.pg0.get_capture(1)
2215 self.assertEqual(ip.src, self.nat_addr)
2216 self.assertEqual(ip.dst, server.ip4)
2217 self.assertNotEqual(tcp.sport, host_in_port)
2218 self.assertEqual(tcp.dport, server_in_port)
2219 self.check_tcp_checksum(p)
2220 host_out_port = tcp.sport
2222 self.logger.error(ppp("Unexpected or invalid packet:", p))
2225 # send reply from server to host
2226 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2227 IP(src=server.ip4, dst=self.nat_addr) /
2228 TCP(sport=server_in_port, dport=host_out_port))
2229 self.pg0.add_stream(p)
2230 self.pg_enable_capture(self.pg_interfaces)
2232 capture = self.pg0.get_capture(1)
2237 self.assertEqual(ip.src, self.nat_addr)
2238 self.assertEqual(ip.dst, host.ip4)
2239 self.assertEqual(tcp.sport, server_out_port)
2240 self.assertEqual(tcp.dport, host_in_port)
2241 self.check_tcp_checksum(p)
2243 self.logger.error(ppp("Unexpected or invalid packet:", p))
2246 def test_hairpinning2(self):
2247 """ NAT44 hairpinning - 1:1 NAT"""
2249 server1_nat_ip = "10.0.0.10"
2250 server2_nat_ip = "10.0.0.11"
2251 host = self.pg0.remote_hosts[0]
2252 server1 = self.pg0.remote_hosts[1]
2253 server2 = self.pg0.remote_hosts[2]
2254 server_tcp_port = 22
2255 server_udp_port = 20
2257 self.nat44_add_address(self.nat_addr)
2258 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2259 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2262 # add static mapping for servers
2263 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2264 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2268 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2269 IP(src=host.ip4, dst=server1_nat_ip) /
2270 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2272 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2273 IP(src=host.ip4, dst=server1_nat_ip) /
2274 UDP(sport=self.udp_port_in, dport=server_udp_port))
2276 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2277 IP(src=host.ip4, dst=server1_nat_ip) /
2278 ICMP(id=self.icmp_id_in, type='echo-request'))
2280 self.pg0.add_stream(pkts)
2281 self.pg_enable_capture(self.pg_interfaces)
2283 capture = self.pg0.get_capture(len(pkts))
2284 for packet in capture:
2286 self.assertEqual(packet[IP].src, self.nat_addr)
2287 self.assertEqual(packet[IP].dst, server1.ip4)
2288 if packet.haslayer(TCP):
2289 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2290 self.assertEqual(packet[TCP].dport, server_tcp_port)
2291 self.tcp_port_out = packet[TCP].sport
2292 self.check_tcp_checksum(packet)
2293 elif packet.haslayer(UDP):
2294 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2295 self.assertEqual(packet[UDP].dport, server_udp_port)
2296 self.udp_port_out = packet[UDP].sport
2298 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2299 self.icmp_id_out = packet[ICMP].id
2301 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2306 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2307 IP(src=server1.ip4, dst=self.nat_addr) /
2308 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2310 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2311 IP(src=server1.ip4, dst=self.nat_addr) /
2312 UDP(sport=server_udp_port, dport=self.udp_port_out))
2314 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2315 IP(src=server1.ip4, dst=self.nat_addr) /
2316 ICMP(id=self.icmp_id_out, type='echo-reply'))
2318 self.pg0.add_stream(pkts)
2319 self.pg_enable_capture(self.pg_interfaces)
2321 capture = self.pg0.get_capture(len(pkts))
2322 for packet in capture:
2324 self.assertEqual(packet[IP].src, server1_nat_ip)
2325 self.assertEqual(packet[IP].dst, host.ip4)
2326 if packet.haslayer(TCP):
2327 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2328 self.assertEqual(packet[TCP].sport, server_tcp_port)
2329 self.check_tcp_checksum(packet)
2330 elif packet.haslayer(UDP):
2331 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2332 self.assertEqual(packet[UDP].sport, server_udp_port)
2334 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2336 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2339 # server2 to server1
2341 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2342 IP(src=server2.ip4, dst=server1_nat_ip) /
2343 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2345 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2346 IP(src=server2.ip4, dst=server1_nat_ip) /
2347 UDP(sport=self.udp_port_in, dport=server_udp_port))
2349 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2350 IP(src=server2.ip4, dst=server1_nat_ip) /
2351 ICMP(id=self.icmp_id_in, type='echo-request'))
2353 self.pg0.add_stream(pkts)
2354 self.pg_enable_capture(self.pg_interfaces)
2356 capture = self.pg0.get_capture(len(pkts))
2357 for packet in capture:
2359 self.assertEqual(packet[IP].src, server2_nat_ip)
2360 self.assertEqual(packet[IP].dst, server1.ip4)
2361 if packet.haslayer(TCP):
2362 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2363 self.assertEqual(packet[TCP].dport, server_tcp_port)
2364 self.tcp_port_out = packet[TCP].sport
2365 self.check_tcp_checksum(packet)
2366 elif packet.haslayer(UDP):
2367 self.assertEqual(packet[UDP].sport, self.udp_port_in)
2368 self.assertEqual(packet[UDP].dport, server_udp_port)
2369 self.udp_port_out = packet[UDP].sport
2371 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2372 self.icmp_id_out = packet[ICMP].id
2374 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2377 # server1 to server2
2379 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2380 IP(src=server1.ip4, dst=server2_nat_ip) /
2381 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2383 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2384 IP(src=server1.ip4, dst=server2_nat_ip) /
2385 UDP(sport=server_udp_port, dport=self.udp_port_out))
2387 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2388 IP(src=server1.ip4, dst=server2_nat_ip) /
2389 ICMP(id=self.icmp_id_out, type='echo-reply'))
2391 self.pg0.add_stream(pkts)
2392 self.pg_enable_capture(self.pg_interfaces)
2394 capture = self.pg0.get_capture(len(pkts))
2395 for packet in capture:
2397 self.assertEqual(packet[IP].src, server1_nat_ip)
2398 self.assertEqual(packet[IP].dst, server2.ip4)
2399 if packet.haslayer(TCP):
2400 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2401 self.assertEqual(packet[TCP].sport, server_tcp_port)
2402 self.check_tcp_checksum(packet)
2403 elif packet.haslayer(UDP):
2404 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2405 self.assertEqual(packet[UDP].sport, server_udp_port)
2407 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2409 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2412 def test_max_translations_per_user(self):
2413 """ MAX translations per user - recycle the least recently used """
2415 self.nat44_add_address(self.nat_addr)
2416 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2417 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2420 # get maximum number of translations per user
2421 nat44_config = self.vapi.nat_show_config()
2423 # send more than maximum number of translations per user packets
2424 pkts_num = nat44_config.max_translations_per_user + 5
2426 for port in range(0, pkts_num):
2427 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2428 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2429 TCP(sport=1025 + port))
2431 self.pg0.add_stream(pkts)
2432 self.pg_enable_capture(self.pg_interfaces)
2435 # verify number of translated packet
2436 self.pg1.get_capture(pkts_num)
2438 def test_interface_addr(self):
2439 """ Acquire NAT44 addresses from interface """
2440 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2442 # no address in NAT pool
2443 adresses = self.vapi.nat44_address_dump()
2444 self.assertEqual(0, len(adresses))
2446 # configure interface address and check NAT address pool
2447 self.pg7.config_ip4()
2448 adresses = self.vapi.nat44_address_dump()
2449 self.assertEqual(1, len(adresses))
2450 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2452 # remove interface address and check NAT address pool
2453 self.pg7.unconfig_ip4()
2454 adresses = self.vapi.nat44_address_dump()
2455 self.assertEqual(0, len(adresses))
2457 def test_interface_addr_static_mapping(self):
2458 """ Static mapping with addresses from interface """
2461 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2462 self.nat44_add_static_mapping(
2464 external_sw_if_index=self.pg7.sw_if_index,
2467 # static mappings with external interface
2468 static_mappings = self.vapi.nat44_static_mapping_dump()
2469 self.assertEqual(1, len(static_mappings))
2470 self.assertEqual(self.pg7.sw_if_index,
2471 static_mappings[0].external_sw_if_index)
2472 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2474 # configure interface address and check static mappings
2475 self.pg7.config_ip4()
2476 static_mappings = self.vapi.nat44_static_mapping_dump()
2477 self.assertEqual(2, len(static_mappings))
2479 for sm in static_mappings:
2480 if sm.external_sw_if_index == 0xFFFFFFFF:
2481 self.assertEqual(sm.external_ip_address[0:4],
2482 self.pg7.local_ip4n)
2483 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2485 self.assertTrue(resolved)
2487 # remove interface address and check static mappings
2488 self.pg7.unconfig_ip4()
2489 static_mappings = self.vapi.nat44_static_mapping_dump()
2490 self.assertEqual(1, len(static_mappings))
2491 self.assertEqual(self.pg7.sw_if_index,
2492 static_mappings[0].external_sw_if_index)
2493 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2495 # configure interface address again and check static mappings
2496 self.pg7.config_ip4()
2497 static_mappings = self.vapi.nat44_static_mapping_dump()
2498 self.assertEqual(2, len(static_mappings))
2500 for sm in static_mappings:
2501 if sm.external_sw_if_index == 0xFFFFFFFF:
2502 self.assertEqual(sm.external_ip_address[0:4],
2503 self.pg7.local_ip4n)
2504 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2506 self.assertTrue(resolved)
2508 # remove static mapping
2509 self.nat44_add_static_mapping(
2511 external_sw_if_index=self.pg7.sw_if_index,
2514 static_mappings = self.vapi.nat44_static_mapping_dump()
2515 self.assertEqual(0, len(static_mappings))
2517 def test_interface_addr_identity_nat(self):
2518 """ Identity NAT with addresses from interface """
2521 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2522 self.vapi.nat44_add_del_identity_mapping(
2523 sw_if_index=self.pg7.sw_if_index,
2525 protocol=IP_PROTOS.tcp,
2528 # identity mappings with external interface
2529 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2530 self.assertEqual(1, len(identity_mappings))
2531 self.assertEqual(self.pg7.sw_if_index,
2532 identity_mappings[0].sw_if_index)
2534 # configure interface address and check identity mappings
2535 self.pg7.config_ip4()
2536 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2538 self.assertEqual(2, len(identity_mappings))
2539 for sm in identity_mappings:
2540 if sm.sw_if_index == 0xFFFFFFFF:
2541 self.assertEqual(identity_mappings[0].ip_address,
2542 self.pg7.local_ip4n)
2543 self.assertEqual(port, identity_mappings[0].port)
2544 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2546 self.assertTrue(resolved)
2548 # remove interface address and check identity mappings
2549 self.pg7.unconfig_ip4()
2550 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2551 self.assertEqual(1, len(identity_mappings))
2552 self.assertEqual(self.pg7.sw_if_index,
2553 identity_mappings[0].sw_if_index)
2555 def test_ipfix_nat44_sess(self):
2556 """ IPFIX logging NAT44 session created/delted """
2557 self.ipfix_domain_id = 10
2558 self.ipfix_src_port = 20202
2559 colector_port = 30303
2560 bind_layers(UDP, IPFIX, dport=30303)
2561 self.nat44_add_address(self.nat_addr)
2562 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2563 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2565 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2566 src_address=self.pg3.local_ip4n,
2568 template_interval=10,
2569 collector_port=colector_port)
2570 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2571 src_port=self.ipfix_src_port)
2573 pkts = self.create_stream_in(self.pg0, self.pg1)
2574 self.pg0.add_stream(pkts)
2575 self.pg_enable_capture(self.pg_interfaces)
2577 capture = self.pg1.get_capture(len(pkts))
2578 self.verify_capture_out(capture)
2579 self.nat44_add_address(self.nat_addr, is_add=0)
2580 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2581 capture = self.pg3.get_capture(9)
2582 ipfix = IPFIXDecoder()
2583 # first load template
2585 self.assertTrue(p.haslayer(IPFIX))
2586 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2587 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2588 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2589 self.assertEqual(p[UDP].dport, colector_port)
2590 self.assertEqual(p[IPFIX].observationDomainID,
2591 self.ipfix_domain_id)
2592 if p.haslayer(Template):
2593 ipfix.add_template(p.getlayer(Template))
2594 # verify events in data set
2596 if p.haslayer(Data):
2597 data = ipfix.decode_data_set(p.getlayer(Set))
2598 self.verify_ipfix_nat44_ses(data)
2600 def test_ipfix_addr_exhausted(self):
2601 """ IPFIX logging NAT addresses exhausted """
2602 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2603 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2605 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2606 src_address=self.pg3.local_ip4n,
2608 template_interval=10)
2609 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2610 src_port=self.ipfix_src_port)
2612 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2613 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2615 self.pg0.add_stream(p)
2616 self.pg_enable_capture(self.pg_interfaces)
2618 capture = self.pg1.get_capture(0)
2619 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2620 capture = self.pg3.get_capture(9)
2621 ipfix = IPFIXDecoder()
2622 # first load template
2624 self.assertTrue(p.haslayer(IPFIX))
2625 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2626 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2627 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2628 self.assertEqual(p[UDP].dport, 4739)
2629 self.assertEqual(p[IPFIX].observationDomainID,
2630 self.ipfix_domain_id)
2631 if p.haslayer(Template):
2632 ipfix.add_template(p.getlayer(Template))
2633 # verify events in data set
2635 if p.haslayer(Data):
2636 data = ipfix.decode_data_set(p.getlayer(Set))
2637 self.verify_ipfix_addr_exhausted(data)
2639 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2640 def test_ipfix_max_sessions(self):
2641 """ IPFIX logging maximum session entries exceeded """
2642 self.nat44_add_address(self.nat_addr)
2643 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2644 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2647 nat44_config = self.vapi.nat_show_config()
2648 max_sessions = 10 * nat44_config.translation_buckets
2651 for i in range(0, max_sessions):
2652 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2653 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2654 IP(src=src, dst=self.pg1.remote_ip4) /
2657 self.pg0.add_stream(pkts)
2658 self.pg_enable_capture(self.pg_interfaces)
2661 self.pg1.get_capture(max_sessions)
2662 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2663 src_address=self.pg3.local_ip4n,
2665 template_interval=10)
2666 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2667 src_port=self.ipfix_src_port)
2669 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2670 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2672 self.pg0.add_stream(p)
2673 self.pg_enable_capture(self.pg_interfaces)
2675 self.pg1.get_capture(0)
2676 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2677 capture = self.pg3.get_capture(9)
2678 ipfix = IPFIXDecoder()
2679 # first load template
2681 self.assertTrue(p.haslayer(IPFIX))
2682 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2683 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2684 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2685 self.assertEqual(p[UDP].dport, 4739)
2686 self.assertEqual(p[IPFIX].observationDomainID,
2687 self.ipfix_domain_id)
2688 if p.haslayer(Template):
2689 ipfix.add_template(p.getlayer(Template))
2690 # verify events in data set
2692 if p.haslayer(Data):
2693 data = ipfix.decode_data_set(p.getlayer(Set))
2694 self.verify_ipfix_max_sessions(data, max_sessions)
2696 def test_pool_addr_fib(self):
2697 """ NAT44 add pool addresses to FIB """
2698 static_addr = '10.0.0.10'
2699 self.nat44_add_address(self.nat_addr)
2700 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2701 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2703 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2706 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2707 ARP(op=ARP.who_has, pdst=self.nat_addr,
2708 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2709 self.pg1.add_stream(p)
2710 self.pg_enable_capture(self.pg_interfaces)
2712 capture = self.pg1.get_capture(1)
2713 self.assertTrue(capture[0].haslayer(ARP))
2714 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2717 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2718 ARP(op=ARP.who_has, pdst=static_addr,
2719 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2720 self.pg1.add_stream(p)
2721 self.pg_enable_capture(self.pg_interfaces)
2723 capture = self.pg1.get_capture(1)
2724 self.assertTrue(capture[0].haslayer(ARP))
2725 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2727 # send ARP to non-NAT44 interface
2728 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2729 ARP(op=ARP.who_has, pdst=self.nat_addr,
2730 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2731 self.pg2.add_stream(p)
2732 self.pg_enable_capture(self.pg_interfaces)
2734 capture = self.pg1.get_capture(0)
2736 # remove addresses and verify
2737 self.nat44_add_address(self.nat_addr, is_add=0)
2738 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2741 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2742 ARP(op=ARP.who_has, pdst=self.nat_addr,
2743 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2744 self.pg1.add_stream(p)
2745 self.pg_enable_capture(self.pg_interfaces)
2747 capture = self.pg1.get_capture(0)
2749 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2750 ARP(op=ARP.who_has, pdst=static_addr,
2751 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2752 self.pg1.add_stream(p)
2753 self.pg_enable_capture(self.pg_interfaces)
2755 capture = self.pg1.get_capture(0)
2757 def test_vrf_mode(self):
2758 """ NAT44 tenant VRF aware address pool mode """
2762 nat_ip1 = "10.0.0.10"
2763 nat_ip2 = "10.0.0.11"
2765 self.pg0.unconfig_ip4()
2766 self.pg1.unconfig_ip4()
2767 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2768 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2769 self.pg0.set_table_ip4(vrf_id1)
2770 self.pg1.set_table_ip4(vrf_id2)
2771 self.pg0.config_ip4()
2772 self.pg1.config_ip4()
2774 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2775 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2776 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2777 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2778 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2782 pkts = self.create_stream_in(self.pg0, self.pg2)
2783 self.pg0.add_stream(pkts)
2784 self.pg_enable_capture(self.pg_interfaces)
2786 capture = self.pg2.get_capture(len(pkts))
2787 self.verify_capture_out(capture, nat_ip1)
2790 pkts = self.create_stream_in(self.pg1, self.pg2)
2791 self.pg1.add_stream(pkts)
2792 self.pg_enable_capture(self.pg_interfaces)
2794 capture = self.pg2.get_capture(len(pkts))
2795 self.verify_capture_out(capture, nat_ip2)
2797 self.pg0.unconfig_ip4()
2798 self.pg1.unconfig_ip4()
2799 self.pg0.set_table_ip4(0)
2800 self.pg1.set_table_ip4(0)
2801 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2802 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2804 def test_vrf_feature_independent(self):
2805 """ NAT44 tenant VRF independent address pool mode """
2807 nat_ip1 = "10.0.0.10"
2808 nat_ip2 = "10.0.0.11"
2810 self.nat44_add_address(nat_ip1)
2811 self.nat44_add_address(nat_ip2, vrf_id=99)
2812 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2813 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2814 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2818 pkts = self.create_stream_in(self.pg0, self.pg2)
2819 self.pg0.add_stream(pkts)
2820 self.pg_enable_capture(self.pg_interfaces)
2822 capture = self.pg2.get_capture(len(pkts))
2823 self.verify_capture_out(capture, nat_ip1)
2826 pkts = self.create_stream_in(self.pg1, self.pg2)
2827 self.pg1.add_stream(pkts)
2828 self.pg_enable_capture(self.pg_interfaces)
2830 capture = self.pg2.get_capture(len(pkts))
2831 self.verify_capture_out(capture, nat_ip1)
2833 def test_dynamic_ipless_interfaces(self):
2834 """ NAT44 interfaces without configured IP address """
2836 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2837 mactobinary(self.pg7.remote_mac),
2838 self.pg7.remote_ip4n,
2840 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2841 mactobinary(self.pg8.remote_mac),
2842 self.pg8.remote_ip4n,
2845 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2846 dst_address_length=32,
2847 next_hop_address=self.pg7.remote_ip4n,
2848 next_hop_sw_if_index=self.pg7.sw_if_index)
2849 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2850 dst_address_length=32,
2851 next_hop_address=self.pg8.remote_ip4n,
2852 next_hop_sw_if_index=self.pg8.sw_if_index)
2854 self.nat44_add_address(self.nat_addr)
2855 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2856 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2860 pkts = self.create_stream_in(self.pg7, self.pg8)
2861 self.pg7.add_stream(pkts)
2862 self.pg_enable_capture(self.pg_interfaces)
2864 capture = self.pg8.get_capture(len(pkts))
2865 self.verify_capture_out(capture)
2868 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2869 self.pg8.add_stream(pkts)
2870 self.pg_enable_capture(self.pg_interfaces)
2872 capture = self.pg7.get_capture(len(pkts))
2873 self.verify_capture_in(capture, self.pg7)
2875 def test_static_ipless_interfaces(self):
2876 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2878 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2879 mactobinary(self.pg7.remote_mac),
2880 self.pg7.remote_ip4n,
2882 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2883 mactobinary(self.pg8.remote_mac),
2884 self.pg8.remote_ip4n,
2887 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2888 dst_address_length=32,
2889 next_hop_address=self.pg7.remote_ip4n,
2890 next_hop_sw_if_index=self.pg7.sw_if_index)
2891 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2892 dst_address_length=32,
2893 next_hop_address=self.pg8.remote_ip4n,
2894 next_hop_sw_if_index=self.pg8.sw_if_index)
2896 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2897 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2898 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2902 pkts = self.create_stream_out(self.pg8)
2903 self.pg8.add_stream(pkts)
2904 self.pg_enable_capture(self.pg_interfaces)
2906 capture = self.pg7.get_capture(len(pkts))
2907 self.verify_capture_in(capture, self.pg7)
2910 pkts = self.create_stream_in(self.pg7, self.pg8)
2911 self.pg7.add_stream(pkts)
2912 self.pg_enable_capture(self.pg_interfaces)
2914 capture = self.pg8.get_capture(len(pkts))
2915 self.verify_capture_out(capture, self.nat_addr, True)
2917 def test_static_with_port_ipless_interfaces(self):
2918 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2920 self.tcp_port_out = 30606
2921 self.udp_port_out = 30607
2922 self.icmp_id_out = 30608
2924 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2925 mactobinary(self.pg7.remote_mac),
2926 self.pg7.remote_ip4n,
2928 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2929 mactobinary(self.pg8.remote_mac),
2930 self.pg8.remote_ip4n,
2933 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2934 dst_address_length=32,
2935 next_hop_address=self.pg7.remote_ip4n,
2936 next_hop_sw_if_index=self.pg7.sw_if_index)
2937 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2938 dst_address_length=32,
2939 next_hop_address=self.pg8.remote_ip4n,
2940 next_hop_sw_if_index=self.pg8.sw_if_index)
2942 self.nat44_add_address(self.nat_addr)
2943 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2944 self.tcp_port_in, self.tcp_port_out,
2945 proto=IP_PROTOS.tcp)
2946 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2947 self.udp_port_in, self.udp_port_out,
2948 proto=IP_PROTOS.udp)
2949 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2950 self.icmp_id_in, self.icmp_id_out,
2951 proto=IP_PROTOS.icmp)
2952 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2953 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2957 pkts = self.create_stream_out(self.pg8)
2958 self.pg8.add_stream(pkts)
2959 self.pg_enable_capture(self.pg_interfaces)
2961 capture = self.pg7.get_capture(len(pkts))
2962 self.verify_capture_in(capture, self.pg7)
2965 pkts = self.create_stream_in(self.pg7, self.pg8)
2966 self.pg7.add_stream(pkts)
2967 self.pg_enable_capture(self.pg_interfaces)
2969 capture = self.pg8.get_capture(len(pkts))
2970 self.verify_capture_out(capture)
2972 def test_static_unknown_proto(self):
2973 """ 1:1 NAT translate packet with unknown protocol """
2974 nat_ip = "10.0.0.10"
2975 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2976 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2977 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2981 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2982 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2984 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2985 TCP(sport=1234, dport=1234))
2986 self.pg0.add_stream(p)
2987 self.pg_enable_capture(self.pg_interfaces)
2989 p = self.pg1.get_capture(1)
2992 self.assertEqual(packet[IP].src, nat_ip)
2993 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2994 self.assertTrue(packet.haslayer(GRE))
2995 self.check_ip_checksum(packet)
2997 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3001 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3002 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3004 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3005 TCP(sport=1234, dport=1234))
3006 self.pg1.add_stream(p)
3007 self.pg_enable_capture(self.pg_interfaces)
3009 p = self.pg0.get_capture(1)
3012 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3013 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3014 self.assertTrue(packet.haslayer(GRE))
3015 self.check_ip_checksum(packet)
3017 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3020 def test_hairpinning_static_unknown_proto(self):
3021 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
3023 host = self.pg0.remote_hosts[0]
3024 server = self.pg0.remote_hosts[1]
3026 host_nat_ip = "10.0.0.10"
3027 server_nat_ip = "10.0.0.11"
3029 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
3030 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3031 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3032 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3036 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3037 IP(src=host.ip4, dst=server_nat_ip) /
3039 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3040 TCP(sport=1234, dport=1234))
3041 self.pg0.add_stream(p)
3042 self.pg_enable_capture(self.pg_interfaces)
3044 p = self.pg0.get_capture(1)
3047 self.assertEqual(packet[IP].src, host_nat_ip)
3048 self.assertEqual(packet[IP].dst, server.ip4)
3049 self.assertTrue(packet.haslayer(GRE))
3050 self.check_ip_checksum(packet)
3052 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3056 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3057 IP(src=server.ip4, dst=host_nat_ip) /
3059 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3060 TCP(sport=1234, dport=1234))
3061 self.pg0.add_stream(p)
3062 self.pg_enable_capture(self.pg_interfaces)
3064 p = self.pg0.get_capture(1)
3067 self.assertEqual(packet[IP].src, server_nat_ip)
3068 self.assertEqual(packet[IP].dst, host.ip4)
3069 self.assertTrue(packet.haslayer(GRE))
3070 self.check_ip_checksum(packet)
3072 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3075 def test_unknown_proto(self):
3076 """ NAT44 translate packet with unknown protocol """
3077 self.nat44_add_address(self.nat_addr)
3078 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3079 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3083 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3084 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3085 TCP(sport=self.tcp_port_in, dport=20))
3086 self.pg0.add_stream(p)
3087 self.pg_enable_capture(self.pg_interfaces)
3089 p = self.pg1.get_capture(1)
3091 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3092 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3094 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3095 TCP(sport=1234, dport=1234))
3096 self.pg0.add_stream(p)
3097 self.pg_enable_capture(self.pg_interfaces)
3099 p = self.pg1.get_capture(1)
3102 self.assertEqual(packet[IP].src, self.nat_addr)
3103 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3104 self.assertTrue(packet.haslayer(GRE))
3105 self.check_ip_checksum(packet)
3107 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3111 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3112 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3114 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3115 TCP(sport=1234, dport=1234))
3116 self.pg1.add_stream(p)
3117 self.pg_enable_capture(self.pg_interfaces)
3119 p = self.pg0.get_capture(1)
3122 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3123 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3124 self.assertTrue(packet.haslayer(GRE))
3125 self.check_ip_checksum(packet)
3127 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3130 def test_hairpinning_unknown_proto(self):
3131 """ NAT44 translate packet with unknown protocol - hairpinning """
3132 host = self.pg0.remote_hosts[0]
3133 server = self.pg0.remote_hosts[1]
3136 server_in_port = 5678
3137 server_out_port = 8765
3138 server_nat_ip = "10.0.0.11"
3140 self.nat44_add_address(self.nat_addr)
3141 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3142 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3145 # add static mapping for server
3146 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3149 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3150 IP(src=host.ip4, dst=server_nat_ip) /
3151 TCP(sport=host_in_port, dport=server_out_port))
3152 self.pg0.add_stream(p)
3153 self.pg_enable_capture(self.pg_interfaces)
3155 capture = self.pg0.get_capture(1)
3157 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3158 IP(src=host.ip4, dst=server_nat_ip) /
3160 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3161 TCP(sport=1234, dport=1234))
3162 self.pg0.add_stream(p)
3163 self.pg_enable_capture(self.pg_interfaces)
3165 p = self.pg0.get_capture(1)
3168 self.assertEqual(packet[IP].src, self.nat_addr)
3169 self.assertEqual(packet[IP].dst, server.ip4)
3170 self.assertTrue(packet.haslayer(GRE))
3171 self.check_ip_checksum(packet)
3173 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3177 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3178 IP(src=server.ip4, dst=self.nat_addr) /
3180 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3181 TCP(sport=1234, dport=1234))
3182 self.pg0.add_stream(p)
3183 self.pg_enable_capture(self.pg_interfaces)
3185 p = self.pg0.get_capture(1)
3188 self.assertEqual(packet[IP].src, server_nat_ip)
3189 self.assertEqual(packet[IP].dst, host.ip4)
3190 self.assertTrue(packet.haslayer(GRE))
3191 self.check_ip_checksum(packet)
3193 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3196 def test_output_feature(self):
3197 """ NAT44 interface output feature (in2out postrouting) """
3198 self.nat44_add_address(self.nat_addr)
3199 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3200 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
3201 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3205 pkts = self.create_stream_in(self.pg0, self.pg3)
3206 self.pg0.add_stream(pkts)
3207 self.pg_enable_capture(self.pg_interfaces)
3209 capture = self.pg3.get_capture(len(pkts))
3210 self.verify_capture_out(capture)
3213 pkts = self.create_stream_out(self.pg3)
3214 self.pg3.add_stream(pkts)
3215 self.pg_enable_capture(self.pg_interfaces)
3217 capture = self.pg0.get_capture(len(pkts))
3218 self.verify_capture_in(capture, self.pg0)
3220 # from non-NAT interface to NAT inside interface
3221 pkts = self.create_stream_in(self.pg2, self.pg0)
3222 self.pg2.add_stream(pkts)
3223 self.pg_enable_capture(self.pg_interfaces)
3225 capture = self.pg0.get_capture(len(pkts))
3226 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3228 def test_output_feature_vrf_aware(self):
3229 """ NAT44 interface output feature VRF aware (in2out postrouting) """
3230 nat_ip_vrf10 = "10.0.0.10"
3231 nat_ip_vrf20 = "10.0.0.20"
3233 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3234 dst_address_length=32,
3235 next_hop_address=self.pg3.remote_ip4n,
3236 next_hop_sw_if_index=self.pg3.sw_if_index,
3238 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3239 dst_address_length=32,
3240 next_hop_address=self.pg3.remote_ip4n,
3241 next_hop_sw_if_index=self.pg3.sw_if_index,
3244 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3245 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3246 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
3247 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
3248 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3252 pkts = self.create_stream_in(self.pg4, self.pg3)
3253 self.pg4.add_stream(pkts)
3254 self.pg_enable_capture(self.pg_interfaces)
3256 capture = self.pg3.get_capture(len(pkts))
3257 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3260 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3261 self.pg3.add_stream(pkts)
3262 self.pg_enable_capture(self.pg_interfaces)
3264 capture = self.pg4.get_capture(len(pkts))
3265 self.verify_capture_in(capture, self.pg4)
3268 pkts = self.create_stream_in(self.pg6, self.pg3)
3269 self.pg6.add_stream(pkts)
3270 self.pg_enable_capture(self.pg_interfaces)
3272 capture = self.pg3.get_capture(len(pkts))
3273 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3276 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3277 self.pg3.add_stream(pkts)
3278 self.pg_enable_capture(self.pg_interfaces)
3280 capture = self.pg6.get_capture(len(pkts))
3281 self.verify_capture_in(capture, self.pg6)
3283 def test_output_feature_hairpinning(self):
3284 """ NAT44 interface output feature hairpinning (in2out postrouting) """
3285 host = self.pg0.remote_hosts[0]
3286 server = self.pg0.remote_hosts[1]
3289 server_in_port = 5678
3290 server_out_port = 8765
3292 self.nat44_add_address(self.nat_addr)
3293 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3294 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3297 # add static mapping for server
3298 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3299 server_in_port, server_out_port,
3300 proto=IP_PROTOS.tcp)
3302 # send packet from host to server
3303 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3304 IP(src=host.ip4, dst=self.nat_addr) /
3305 TCP(sport=host_in_port, dport=server_out_port))
3306 self.pg0.add_stream(p)
3307 self.pg_enable_capture(self.pg_interfaces)
3309 capture = self.pg0.get_capture(1)
3314 self.assertEqual(ip.src, self.nat_addr)
3315 self.assertEqual(ip.dst, server.ip4)
3316 self.assertNotEqual(tcp.sport, host_in_port)
3317 self.assertEqual(tcp.dport, server_in_port)
3318 self.check_tcp_checksum(p)
3319 host_out_port = tcp.sport
3321 self.logger.error(ppp("Unexpected or invalid packet:", p))
3324 # send reply from server to host
3325 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3326 IP(src=server.ip4, dst=self.nat_addr) /
3327 TCP(sport=server_in_port, dport=host_out_port))
3328 self.pg0.add_stream(p)
3329 self.pg_enable_capture(self.pg_interfaces)
3331 capture = self.pg0.get_capture(1)
3336 self.assertEqual(ip.src, self.nat_addr)
3337 self.assertEqual(ip.dst, host.ip4)
3338 self.assertEqual(tcp.sport, server_out_port)
3339 self.assertEqual(tcp.dport, host_in_port)
3340 self.check_tcp_checksum(p)
3342 self.logger.error(ppp("Unexpected or invalid packet:", p))
3345 def test_output_feature_and_service(self):
3346 """ NAT44 interface output feature and services """
3347 external_addr = '1.2.3.4'
3351 self.vapi.nat44_forwarding_enable_disable(1)
3352 self.nat44_add_address(self.nat_addr)
3353 self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
3354 local_port, external_port,
3355 proto=IP_PROTOS.tcp, out2in_only=1)
3356 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3357 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3360 # from client to service
3361 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3362 IP(src=self.pg1.remote_ip4, dst=external_addr) /
3363 TCP(sport=12345, dport=external_port))
3364 self.pg1.add_stream(p)
3365 self.pg_enable_capture(self.pg_interfaces)
3367 capture = self.pg0.get_capture(1)
3373 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3374 self.assertEqual(tcp.dport, local_port)
3375 self.check_tcp_checksum(p)
3376 self.check_ip_checksum(p)
3378 self.logger.error(ppp("Unexpected or invalid packet:", p))
3381 # from service back to client
3382 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3383 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3384 TCP(sport=local_port, dport=12345))
3385 self.pg0.add_stream(p)
3386 self.pg_enable_capture(self.pg_interfaces)
3388 capture = self.pg1.get_capture(1)
3393 self.assertEqual(ip.src, external_addr)
3394 self.assertEqual(tcp.sport, external_port)
3395 self.check_tcp_checksum(p)
3396 self.check_ip_checksum(p)
3398 self.logger.error(ppp("Unexpected or invalid packet:", p))
3401 # from local network host to external network
3402 pkts = self.create_stream_in(self.pg0, self.pg1)
3403 self.pg0.add_stream(pkts)
3404 self.pg_enable_capture(self.pg_interfaces)
3406 capture = self.pg1.get_capture(len(pkts))
3407 self.verify_capture_out(capture)
3408 pkts = self.create_stream_in(self.pg0, self.pg1)
3409 self.pg0.add_stream(pkts)
3410 self.pg_enable_capture(self.pg_interfaces)
3412 capture = self.pg1.get_capture(len(pkts))
3413 self.verify_capture_out(capture)
3415 # from external network back to local network host
3416 pkts = self.create_stream_out(self.pg1)
3417 self.pg1.add_stream(pkts)
3418 self.pg_enable_capture(self.pg_interfaces)
3420 capture = self.pg0.get_capture(len(pkts))
3421 self.verify_capture_in(capture, self.pg0)
3423 def test_output_feature_and_service2(self):
3424 """ NAT44 interface output feature and service host direct access """
3425 self.vapi.nat44_forwarding_enable_disable(1)
3426 self.nat44_add_address(self.nat_addr)
3427 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3430 # session initiaded from service host - translate
3431 pkts = self.create_stream_in(self.pg0, self.pg1)
3432 self.pg0.add_stream(pkts)
3433 self.pg_enable_capture(self.pg_interfaces)
3435 capture = self.pg1.get_capture(len(pkts))
3436 self.verify_capture_out(capture)
3438 pkts = self.create_stream_out(self.pg1)
3439 self.pg1.add_stream(pkts)
3440 self.pg_enable_capture(self.pg_interfaces)
3442 capture = self.pg0.get_capture(len(pkts))
3443 self.verify_capture_in(capture, self.pg0)
3445 tcp_port_out = self.tcp_port_out
3446 udp_port_out = self.udp_port_out
3447 icmp_id_out = self.icmp_id_out
3449 # session initiaded from remote host - do not translate
3450 pkts = self.create_stream_out(self.pg1,
3451 self.pg0.remote_ip4,
3452 use_inside_ports=True)
3453 self.pg1.add_stream(pkts)
3454 self.pg_enable_capture(self.pg_interfaces)
3456 capture = self.pg0.get_capture(len(pkts))
3457 self.verify_capture_in(capture, self.pg0)
3459 pkts = self.create_stream_in(self.pg0, self.pg1)
3460 self.pg0.add_stream(pkts)
3461 self.pg_enable_capture(self.pg_interfaces)
3463 capture = self.pg1.get_capture(len(pkts))
3464 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3467 def test_output_feature_and_service3(self):
3468 """ NAT44 interface output feature and DST NAT """
3469 external_addr = '1.2.3.4'
3473 self.vapi.nat44_forwarding_enable_disable(1)
3474 self.nat44_add_address(self.nat_addr)
3475 self.nat44_add_static_mapping(self.pg1.remote_ip4, external_addr,
3476 local_port, external_port,
3477 proto=IP_PROTOS.tcp, out2in_only=1)
3478 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3479 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3481 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3484 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3485 IP(src=self.pg0.remote_ip4, dst=external_addr) /
3486 TCP(sport=12345, dport=external_port))
3487 self.pg0.add_stream(p)
3488 self.pg_enable_capture(self.pg_interfaces)
3490 capture = self.pg1.get_capture(1)
3495 self.assertEqual(ip.src, self.pg0.remote_ip4)
3496 self.assertEqual(tcp.sport, 12345)
3497 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3498 self.assertEqual(tcp.dport, local_port)
3499 self.check_tcp_checksum(p)
3500 self.check_ip_checksum(p)
3502 self.logger.error(ppp("Unexpected or invalid packet:", p))
3505 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3506 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
3507 TCP(sport=local_port, dport=12345))
3508 self.pg1.add_stream(p)
3509 self.pg_enable_capture(self.pg_interfaces)
3511 capture = self.pg0.get_capture(1)
3516 self.assertEqual(ip.src, external_addr)
3517 self.assertEqual(tcp.sport, external_port)
3518 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3519 self.assertEqual(tcp.dport, 12345)
3520 self.check_tcp_checksum(p)
3521 self.check_ip_checksum(p)
3523 self.logger.error(ppp("Unexpected or invalid packet:", p))
3526 def test_one_armed_nat44(self):
3527 """ One armed NAT44 """
3528 remote_host = self.pg9.remote_hosts[0]
3529 local_host = self.pg9.remote_hosts[1]
3532 self.nat44_add_address(self.nat_addr)
3533 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3534 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3538 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3539 IP(src=local_host.ip4, dst=remote_host.ip4) /
3540 TCP(sport=12345, dport=80))
3541 self.pg9.add_stream(p)
3542 self.pg_enable_capture(self.pg_interfaces)
3544 capture = self.pg9.get_capture(1)
3549 self.assertEqual(ip.src, self.nat_addr)
3550 self.assertEqual(ip.dst, remote_host.ip4)
3551 self.assertNotEqual(tcp.sport, 12345)
3552 external_port = tcp.sport
3553 self.assertEqual(tcp.dport, 80)
3554 self.check_tcp_checksum(p)
3555 self.check_ip_checksum(p)
3557 self.logger.error(ppp("Unexpected or invalid packet:", p))
3561 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3562 IP(src=remote_host.ip4, dst=self.nat_addr) /
3563 TCP(sport=80, dport=external_port))
3564 self.pg9.add_stream(p)
3565 self.pg_enable_capture(self.pg_interfaces)
3567 capture = self.pg9.get_capture(1)
3572 self.assertEqual(ip.src, remote_host.ip4)
3573 self.assertEqual(ip.dst, local_host.ip4)
3574 self.assertEqual(tcp.sport, 80)
3575 self.assertEqual(tcp.dport, 12345)
3576 self.check_tcp_checksum(p)
3577 self.check_ip_checksum(p)
3579 self.logger.error(ppp("Unexpected or invalid packet:", p))
3582 def test_one_armed_nat44_static(self):
3583 """ One armed NAT44 and 1:1 NAPT symmetrical rule """
3584 remote_host = self.pg9.remote_hosts[0]
3585 local_host = self.pg9.remote_hosts[1]
3590 self.vapi.nat44_forwarding_enable_disable(1)
3591 self.nat44_add_address(self.nat_addr, twice_nat=1)
3592 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
3593 local_port, external_port,
3594 proto=IP_PROTOS.tcp, out2in_only=1,
3596 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3597 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3600 # from client to service
3601 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3602 IP(src=remote_host.ip4, dst=self.nat_addr) /
3603 TCP(sport=12345, dport=external_port))
3604 self.pg9.add_stream(p)
3605 self.pg_enable_capture(self.pg_interfaces)
3607 capture = self.pg9.get_capture(1)
3613 self.assertEqual(ip.dst, local_host.ip4)
3614 self.assertEqual(ip.src, self.nat_addr)
3615 self.assertEqual(tcp.dport, local_port)
3616 self.assertNotEqual(tcp.sport, 12345)
3617 eh_port_in = tcp.sport
3618 self.check_tcp_checksum(p)
3619 self.check_ip_checksum(p)
3621 self.logger.error(ppp("Unexpected or invalid packet:", p))
3624 # from service back to client
3625 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3626 IP(src=local_host.ip4, dst=self.nat_addr) /
3627 TCP(sport=local_port, dport=eh_port_in))
3628 self.pg9.add_stream(p)
3629 self.pg_enable_capture(self.pg_interfaces)
3631 capture = self.pg9.get_capture(1)
3636 self.assertEqual(ip.src, self.nat_addr)
3637 self.assertEqual(ip.dst, remote_host.ip4)
3638 self.assertEqual(tcp.sport, external_port)
3639 self.assertEqual(tcp.dport, 12345)
3640 self.check_tcp_checksum(p)
3641 self.check_ip_checksum(p)
3643 self.logger.error(ppp("Unexpected or invalid packet:", p))
3646 def test_del_session(self):
3647 """ Delete NAT44 session """
3648 self.nat44_add_address(self.nat_addr)
3649 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3650 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3653 pkts = self.create_stream_in(self.pg0, self.pg1)
3654 self.pg0.add_stream(pkts)
3655 self.pg_enable_capture(self.pg_interfaces)
3657 capture = self.pg1.get_capture(len(pkts))
3659 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3660 nsessions = len(sessions)
3662 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
3663 sessions[0].inside_port,
3664 sessions[0].protocol)
3665 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
3666 sessions[1].outside_port,
3667 sessions[1].protocol,
3670 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3671 self.assertEqual(nsessions - len(sessions), 2)
3673 def test_set_get_reass(self):
3674 """ NAT44 set/get virtual fragmentation reassembly """
3675 reas_cfg1 = self.vapi.nat_get_reass()
3677 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
3678 max_reass=reas_cfg1.ip4_max_reass * 2,
3679 max_frag=reas_cfg1.ip4_max_frag * 2)
3681 reas_cfg2 = self.vapi.nat_get_reass()
3683 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
3684 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
3685 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
3687 self.vapi.nat_set_reass(drop_frag=1)
3688 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
3690 def test_frag_in_order(self):
3691 """ NAT44 translate fragments arriving in order """
3692 self.nat44_add_address(self.nat_addr)
3693 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3694 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3697 data = "A" * 4 + "B" * 16 + "C" * 3
3698 self.tcp_port_in = random.randint(1025, 65535)
3700 reass = self.vapi.nat_reass_dump()
3701 reass_n_start = len(reass)
3704 pkts = self.create_stream_frag(self.pg0,
3705 self.pg1.remote_ip4,
3709 self.pg0.add_stream(pkts)
3710 self.pg_enable_capture(self.pg_interfaces)
3712 frags = self.pg1.get_capture(len(pkts))
3713 p = self.reass_frags_and_verify(frags,
3715 self.pg1.remote_ip4)
3716 self.assertEqual(p[TCP].dport, 20)
3717 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3718 self.tcp_port_out = p[TCP].sport
3719 self.assertEqual(data, p[Raw].load)
3722 pkts = self.create_stream_frag(self.pg1,
3727 self.pg1.add_stream(pkts)
3728 self.pg_enable_capture(self.pg_interfaces)
3730 frags = self.pg0.get_capture(len(pkts))
3731 p = self.reass_frags_and_verify(frags,
3732 self.pg1.remote_ip4,
3733 self.pg0.remote_ip4)
3734 self.assertEqual(p[TCP].sport, 20)
3735 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3736 self.assertEqual(data, p[Raw].load)
3738 reass = self.vapi.nat_reass_dump()
3739 reass_n_end = len(reass)
3741 self.assertEqual(reass_n_end - reass_n_start, 2)
3743 def test_reass_hairpinning(self):
3744 """ NAT44 fragments hairpinning """
3745 host = self.pg0.remote_hosts[0]
3746 server = self.pg0.remote_hosts[1]
3747 host_in_port = random.randint(1025, 65535)
3749 server_in_port = random.randint(1025, 65535)
3750 server_out_port = random.randint(1025, 65535)
3751 data = "A" * 4 + "B" * 16 + "C" * 3
3753 self.nat44_add_address(self.nat_addr)
3754 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3755 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3757 # add static mapping for server
3758 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3759 server_in_port, server_out_port,
3760 proto=IP_PROTOS.tcp)
3762 # send packet from host to server
3763 pkts = self.create_stream_frag(self.pg0,
3768 self.pg0.add_stream(pkts)
3769 self.pg_enable_capture(self.pg_interfaces)
3771 frags = self.pg0.get_capture(len(pkts))
3772 p = self.reass_frags_and_verify(frags,
3775 self.assertNotEqual(p[TCP].sport, host_in_port)
3776 self.assertEqual(p[TCP].dport, server_in_port)
3777 self.assertEqual(data, p[Raw].load)
3779 def test_frag_out_of_order(self):
3780 """ NAT44 translate fragments arriving out of order """
3781 self.nat44_add_address(self.nat_addr)
3782 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3783 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3786 data = "A" * 4 + "B" * 16 + "C" * 3
3787 random.randint(1025, 65535)
3790 pkts = self.create_stream_frag(self.pg0,
3791 self.pg1.remote_ip4,
3796 self.pg0.add_stream(pkts)
3797 self.pg_enable_capture(self.pg_interfaces)
3799 frags = self.pg1.get_capture(len(pkts))
3800 p = self.reass_frags_and_verify(frags,
3802 self.pg1.remote_ip4)
3803 self.assertEqual(p[TCP].dport, 20)
3804 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3805 self.tcp_port_out = p[TCP].sport
3806 self.assertEqual(data, p[Raw].load)
3809 pkts = self.create_stream_frag(self.pg1,
3815 self.pg1.add_stream(pkts)
3816 self.pg_enable_capture(self.pg_interfaces)
3818 frags = self.pg0.get_capture(len(pkts))
3819 p = self.reass_frags_and_verify(frags,
3820 self.pg1.remote_ip4,
3821 self.pg0.remote_ip4)
3822 self.assertEqual(p[TCP].sport, 20)
3823 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3824 self.assertEqual(data, p[Raw].load)
3826 def test_port_restricted(self):
3827 """ Port restricted NAT44 (MAP-E CE) """
3828 self.nat44_add_address(self.nat_addr)
3829 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3830 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3832 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3833 "psid-offset 6 psid-len 6")
3835 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3836 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3837 TCP(sport=4567, dport=22))
3838 self.pg0.add_stream(p)
3839 self.pg_enable_capture(self.pg_interfaces)
3841 capture = self.pg1.get_capture(1)
3846 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3847 self.assertEqual(ip.src, self.nat_addr)
3848 self.assertEqual(tcp.dport, 22)
3849 self.assertNotEqual(tcp.sport, 4567)
3850 self.assertEqual((tcp.sport >> 6) & 63, 10)
3851 self.check_tcp_checksum(p)
3852 self.check_ip_checksum(p)
3854 self.logger.error(ppp("Unexpected or invalid packet:", p))
3857 def twice_nat_common(self, self_twice_nat=False, same_pg=False, lb=False,
3859 twice_nat_addr = '10.0.1.3'
3867 port_in1 = port_in+1
3868 port_in2 = port_in+2
3873 server1 = self.pg0.remote_hosts[0]
3874 server2 = self.pg0.remote_hosts[1]
3886 eh_translate = ((not self_twice_nat) or (not lb and same_pg) or
3889 self.nat44_add_address(self.nat_addr)
3890 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3892 self.nat44_add_static_mapping(pg0.remote_ip4, self.nat_addr,
3894 proto=IP_PROTOS.tcp,
3895 twice_nat=int(not self_twice_nat),
3896 self_twice_nat=int(self_twice_nat))
3898 locals = [{'addr': server1.ip4n,
3901 {'addr': server2.ip4n,
3904 out_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3905 self.vapi.nat44_add_del_lb_static_mapping(out_addr_n,
3909 not self_twice_nat),
3912 local_num=len(locals),
3914 self.vapi.nat44_interface_add_del_feature(pg0.sw_if_index)
3915 self.vapi.nat44_interface_add_del_feature(pg1.sw_if_index,
3922 assert client_id is not None
3924 client = self.pg0.remote_hosts[0]
3925 elif client_id == 2:
3926 client = self.pg0.remote_hosts[1]
3928 client = pg1.remote_hosts[0]
3929 p = (Ether(src=pg1.remote_mac, dst=pg1.local_mac) /
3930 IP(src=client.ip4, dst=self.nat_addr) /
3931 TCP(sport=eh_port_out, dport=port_out))
3933 self.pg_enable_capture(self.pg_interfaces)
3935 capture = pg0.get_capture(1)
3941 if ip.dst == server1.ip4:
3947 self.assertEqual(ip.dst, server.ip4)
3949 self.assertIn(tcp.dport, [port_in1, port_in2])
3951 self.assertEqual(tcp.dport, port_in)
3953 self.assertEqual(ip.src, twice_nat_addr)
3954 self.assertNotEqual(tcp.sport, eh_port_out)
3956 self.assertEqual(ip.src, client.ip4)
3957 self.assertEqual(tcp.sport, eh_port_out)
3959 eh_port_in = tcp.sport
3960 saved_port_in = tcp.dport
3961 self.check_tcp_checksum(p)
3962 self.check_ip_checksum(p)
3964 self.logger.error(ppp("Unexpected or invalid packet:", p))
3967 p = (Ether(src=server.mac, dst=pg0.local_mac) /
3968 IP(src=server.ip4, dst=eh_addr_in) /
3969 TCP(sport=saved_port_in, dport=eh_port_in))
3971 self.pg_enable_capture(self.pg_interfaces)
3973 capture = pg1.get_capture(1)
3978 self.assertEqual(ip.dst, client.ip4)
3979 self.assertEqual(ip.src, self.nat_addr)
3980 self.assertEqual(tcp.dport, eh_port_out)
3981 self.assertEqual(tcp.sport, port_out)
3982 self.check_tcp_checksum(p)
3983 self.check_ip_checksum(p)
3985 self.logger.error(ppp("Unexpected or invalid packet:", p))
3988 def test_twice_nat(self):
3990 self.twice_nat_common()
3992 def test_self_twice_nat_positive(self):
3993 """ Self Twice NAT44 (positive test) """
3994 self.twice_nat_common(self_twice_nat=True, same_pg=True)
3996 def test_self_twice_nat_negative(self):
3997 """ Self Twice NAT44 (negative test) """
3998 self.twice_nat_common(self_twice_nat=True)
4000 def test_twice_nat_lb(self):
4001 """ Twice NAT44 local service load balancing """
4002 self.twice_nat_common(lb=True)
4004 def test_self_twice_nat_lb_positive(self):
4005 """ Self Twice NAT44 local service load balancing (positive test) """
4006 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4009 def test_self_twice_nat_lb_negative(self):
4010 """ Self Twice NAT44 local service load balancing (negative test) """
4011 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4014 def test_twice_nat_interface_addr(self):
4015 """ Acquire twice NAT44 addresses from interface """
4016 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
4018 # no address in NAT pool
4019 adresses = self.vapi.nat44_address_dump()
4020 self.assertEqual(0, len(adresses))
4022 # configure interface address and check NAT address pool
4023 self.pg7.config_ip4()
4024 adresses = self.vapi.nat44_address_dump()
4025 self.assertEqual(1, len(adresses))
4026 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
4027 self.assertEqual(adresses[0].twice_nat, 1)
4029 # remove interface address and check NAT address pool
4030 self.pg7.unconfig_ip4()
4031 adresses = self.vapi.nat44_address_dump()
4032 self.assertEqual(0, len(adresses))
4034 def test_ipfix_max_frags(self):
4035 """ IPFIX logging maximum fragments pending reassembly exceeded """
4036 self.nat44_add_address(self.nat_addr)
4037 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4038 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4040 self.vapi.nat_set_reass(max_frag=0)
4041 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
4042 src_address=self.pg3.local_ip4n,
4044 template_interval=10)
4045 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
4046 src_port=self.ipfix_src_port)
4048 data = "A" * 4 + "B" * 16 + "C" * 3
4049 self.tcp_port_in = random.randint(1025, 65535)
4050 pkts = self.create_stream_frag(self.pg0,
4051 self.pg1.remote_ip4,
4055 self.pg0.add_stream(pkts[-1])
4056 self.pg_enable_capture(self.pg_interfaces)
4058 frags = self.pg1.get_capture(0)
4059 self.vapi.cli("ipfix flush") # FIXME this should be an API call
4060 capture = self.pg3.get_capture(9)
4061 ipfix = IPFIXDecoder()
4062 # first load template
4064 self.assertTrue(p.haslayer(IPFIX))
4065 self.assertEqual(p[IP].src, self.pg3.local_ip4)
4066 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
4067 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
4068 self.assertEqual(p[UDP].dport, 4739)
4069 self.assertEqual(p[IPFIX].observationDomainID,
4070 self.ipfix_domain_id)
4071 if p.haslayer(Template):
4072 ipfix.add_template(p.getlayer(Template))
4073 # verify events in data set
4075 if p.haslayer(Data):
4076 data = ipfix.decode_data_set(p.getlayer(Set))
4077 self.verify_ipfix_max_fragments_ip4(data, 0,
4078 self.pg0.remote_ip4n)
4081 super(TestNAT44, self).tearDown()
4082 if not self.vpp_dead:
4083 self.logger.info(self.vapi.cli("show nat44 addresses"))
4084 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4085 self.logger.info(self.vapi.cli("show nat44 static mappings"))
4086 self.logger.info(self.vapi.cli("show nat44 interface address"))
4087 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
4088 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
4089 self.vapi.cli("nat addr-port-assignment-alg default")
4093 class TestNAT44Out2InDPO(MethodHolder):
4094 """ NAT44 Test Cases using out2in DPO """
4097 def setUpConstants(cls):
4098 super(TestNAT44Out2InDPO, cls).setUpConstants()
4099 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
4102 def setUpClass(cls):
4103 super(TestNAT44Out2InDPO, cls).setUpClass()
4106 cls.tcp_port_in = 6303
4107 cls.tcp_port_out = 6303
4108 cls.udp_port_in = 6304
4109 cls.udp_port_out = 6304
4110 cls.icmp_id_in = 6305
4111 cls.icmp_id_out = 6305
4112 cls.nat_addr = '10.0.0.3'
4113 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4114 cls.dst_ip4 = '192.168.70.1'
4116 cls.create_pg_interfaces(range(2))
4119 cls.pg0.config_ip4()
4120 cls.pg0.resolve_arp()
4123 cls.pg1.config_ip6()
4124 cls.pg1.resolve_ndp()
4126 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
4127 dst_address_length=0,
4128 next_hop_address=cls.pg1.remote_ip6n,
4129 next_hop_sw_if_index=cls.pg1.sw_if_index)
4132 super(TestNAT44Out2InDPO, cls).tearDownClass()
4135 def configure_xlat(self):
4136 self.dst_ip6_pfx = '1:2:3::'
4137 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4139 self.dst_ip6_pfx_len = 96
4140 self.src_ip6_pfx = '4:5:6::'
4141 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4143 self.src_ip6_pfx_len = 96
4144 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
4145 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
4146 '\x00\x00\x00\x00', 0, is_translation=1,
4149 def test_464xlat_ce(self):
4150 """ Test 464XLAT CE with NAT44 """
4152 self.configure_xlat()
4154 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4155 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
4157 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4158 self.dst_ip6_pfx_len)
4159 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
4160 self.src_ip6_pfx_len)
4163 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4164 self.pg0.add_stream(pkts)
4165 self.pg_enable_capture(self.pg_interfaces)
4167 capture = self.pg1.get_capture(len(pkts))
4168 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
4171 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
4173 self.pg1.add_stream(pkts)
4174 self.pg_enable_capture(self.pg_interfaces)
4176 capture = self.pg0.get_capture(len(pkts))
4177 self.verify_capture_in(capture, self.pg0)
4179 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4181 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
4182 self.nat_addr_n, is_add=0)
4184 def test_464xlat_ce_no_nat(self):
4185 """ Test 464XLAT CE without NAT44 """
4187 self.configure_xlat()
4189 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4190 self.dst_ip6_pfx_len)
4191 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
4192 self.src_ip6_pfx_len)
4194 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4195 self.pg0.add_stream(pkts)
4196 self.pg_enable_capture(self.pg_interfaces)
4198 capture = self.pg1.get_capture(len(pkts))
4199 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
4200 nat_ip=out_dst_ip6, same_port=True)
4202 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4203 self.pg1.add_stream(pkts)
4204 self.pg_enable_capture(self.pg_interfaces)
4206 capture = self.pg0.get_capture(len(pkts))
4207 self.verify_capture_in(capture, self.pg0)
4210 class TestDeterministicNAT(MethodHolder):
4211 """ Deterministic NAT Test Cases """
4214 def setUpConstants(cls):
4215 super(TestDeterministicNAT, cls).setUpConstants()
4216 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
4219 def setUpClass(cls):
4220 super(TestDeterministicNAT, cls).setUpClass()
4223 cls.tcp_port_in = 6303
4224 cls.tcp_external_port = 6303
4225 cls.udp_port_in = 6304
4226 cls.udp_external_port = 6304
4227 cls.icmp_id_in = 6305
4228 cls.nat_addr = '10.0.0.3'
4230 cls.create_pg_interfaces(range(3))
4231 cls.interfaces = list(cls.pg_interfaces)
4233 for i in cls.interfaces:
4238 cls.pg0.generate_remote_hosts(2)
4239 cls.pg0.configure_ipv4_neighbors()
4242 super(TestDeterministicNAT, cls).tearDownClass()
4245 def create_stream_in(self, in_if, out_if, ttl=64):
4247 Create packet stream for inside network
4249 :param in_if: Inside interface
4250 :param out_if: Outside interface
4251 :param ttl: TTL of generated packets
4255 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4256 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4257 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
4261 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4262 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4263 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
4267 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4268 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4269 ICMP(id=self.icmp_id_in, type='echo-request'))
4274 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
4276 Create packet stream for outside network
4278 :param out_if: Outside interface
4279 :param dst_ip: Destination IP address (Default use global NAT address)
4280 :param ttl: TTL of generated packets
4283 dst_ip = self.nat_addr
4286 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4287 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4288 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
4292 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4293 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4294 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
4298 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4299 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4300 ICMP(id=self.icmp_external_id, type='echo-reply'))
4305 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
4307 Verify captured packets on outside network
4309 :param capture: Captured packets
4310 :param nat_ip: Translated IP address (Default use global NAT address)
4311 :param same_port: Sorce port number is not translated (Default False)
4312 :param packet_num: Expected number of packets (Default 3)
4315 nat_ip = self.nat_addr
4316 self.assertEqual(packet_num, len(capture))
4317 for packet in capture:
4319 self.assertEqual(packet[IP].src, nat_ip)
4320 if packet.haslayer(TCP):
4321 self.tcp_port_out = packet[TCP].sport
4322 elif packet.haslayer(UDP):
4323 self.udp_port_out = packet[UDP].sport
4325 self.icmp_external_id = packet[ICMP].id
4327 self.logger.error(ppp("Unexpected or invalid packet "
4328 "(outside network):", packet))
4331 def initiate_tcp_session(self, in_if, out_if):
4333 Initiates TCP session
4335 :param in_if: Inside interface
4336 :param out_if: Outside interface
4339 # SYN packet in->out
4340 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4341 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4342 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4345 self.pg_enable_capture(self.pg_interfaces)
4347 capture = out_if.get_capture(1)
4349 self.tcp_port_out = p[TCP].sport
4351 # SYN + ACK packet out->in
4352 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
4353 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
4354 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4356 out_if.add_stream(p)
4357 self.pg_enable_capture(self.pg_interfaces)
4359 in_if.get_capture(1)
4361 # ACK packet in->out
4362 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4363 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4364 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4367 self.pg_enable_capture(self.pg_interfaces)
4369 out_if.get_capture(1)
4372 self.logger.error("TCP 3 way handshake failed")
4375 def verify_ipfix_max_entries_per_user(self, data):
4377 Verify IPFIX maximum entries per user exceeded event
4379 :param data: Decoded IPFIX data records
4381 self.assertEqual(1, len(data))
4384 self.assertEqual(ord(record[230]), 13)
4385 # natQuotaExceededEvent
4386 self.assertEqual('\x03\x00\x00\x00', record[466])
4388 self.assertEqual('\xe8\x03\x00\x00', record[473])
4390 self.assertEqual(self.pg0.remote_ip4n, record[8])
4392 def test_deterministic_mode(self):
4393 """ NAT plugin run deterministic mode """
4394 in_addr = '172.16.255.0'
4395 out_addr = '172.17.255.50'
4396 in_addr_t = '172.16.255.20'
4397 in_addr_n = socket.inet_aton(in_addr)
4398 out_addr_n = socket.inet_aton(out_addr)
4399 in_addr_t_n = socket.inet_aton(in_addr_t)
4403 nat_config = self.vapi.nat_show_config()
4404 self.assertEqual(1, nat_config.deterministic)
4406 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
4408 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
4409 self.assertEqual(rep1.out_addr[:4], out_addr_n)
4410 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
4411 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
4413 deterministic_mappings = self.vapi.nat_det_map_dump()
4414 self.assertEqual(len(deterministic_mappings), 1)
4415 dsm = deterministic_mappings[0]
4416 self.assertEqual(in_addr_n, dsm.in_addr[:4])
4417 self.assertEqual(in_plen, dsm.in_plen)
4418 self.assertEqual(out_addr_n, dsm.out_addr[:4])
4419 self.assertEqual(out_plen, dsm.out_plen)
4421 self.clear_nat_det()
4422 deterministic_mappings = self.vapi.nat_det_map_dump()
4423 self.assertEqual(len(deterministic_mappings), 0)
4425 def test_set_timeouts(self):
4426 """ Set deterministic NAT timeouts """
4427 timeouts_before = self.vapi.nat_det_get_timeouts()
4429 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
4430 timeouts_before.tcp_established + 10,
4431 timeouts_before.tcp_transitory + 10,
4432 timeouts_before.icmp + 10)
4434 timeouts_after = self.vapi.nat_det_get_timeouts()
4436 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
4437 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
4438 self.assertNotEqual(timeouts_before.tcp_established,
4439 timeouts_after.tcp_established)
4440 self.assertNotEqual(timeouts_before.tcp_transitory,
4441 timeouts_after.tcp_transitory)
4443 def test_det_in(self):
4444 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
4446 nat_ip = "10.0.0.10"
4448 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4450 socket.inet_aton(nat_ip),
4452 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4453 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4457 pkts = self.create_stream_in(self.pg0, self.pg1)
4458 self.pg0.add_stream(pkts)
4459 self.pg_enable_capture(self.pg_interfaces)
4461 capture = self.pg1.get_capture(len(pkts))
4462 self.verify_capture_out(capture, nat_ip)
4465 pkts = self.create_stream_out(self.pg1, nat_ip)
4466 self.pg1.add_stream(pkts)
4467 self.pg_enable_capture(self.pg_interfaces)
4469 capture = self.pg0.get_capture(len(pkts))
4470 self.verify_capture_in(capture, self.pg0)
4473 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
4474 self.assertEqual(len(sessions), 3)
4478 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4479 self.assertEqual(s.in_port, self.tcp_port_in)
4480 self.assertEqual(s.out_port, self.tcp_port_out)
4481 self.assertEqual(s.ext_port, self.tcp_external_port)
4485 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4486 self.assertEqual(s.in_port, self.udp_port_in)
4487 self.assertEqual(s.out_port, self.udp_port_out)
4488 self.assertEqual(s.ext_port, self.udp_external_port)
4492 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4493 self.assertEqual(s.in_port, self.icmp_id_in)
4494 self.assertEqual(s.out_port, self.icmp_external_id)
4496 def test_multiple_users(self):
4497 """ Deterministic NAT multiple users """
4499 nat_ip = "10.0.0.10"
4501 external_port = 6303
4503 host0 = self.pg0.remote_hosts[0]
4504 host1 = self.pg0.remote_hosts[1]
4506 self.vapi.nat_det_add_del_map(host0.ip4n,
4508 socket.inet_aton(nat_ip),
4510 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4511 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4515 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
4516 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
4517 TCP(sport=port_in, dport=external_port))
4518 self.pg0.add_stream(p)
4519 self.pg_enable_capture(self.pg_interfaces)
4521 capture = self.pg1.get_capture(1)
4526 self.assertEqual(ip.src, nat_ip)
4527 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4528 self.assertEqual(tcp.dport, external_port)
4529 port_out0 = tcp.sport
4531 self.logger.error(ppp("Unexpected or invalid packet:", p))
4535 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
4536 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
4537 TCP(sport=port_in, dport=external_port))
4538 self.pg0.add_stream(p)
4539 self.pg_enable_capture(self.pg_interfaces)
4541 capture = self.pg1.get_capture(1)
4546 self.assertEqual(ip.src, nat_ip)
4547 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4548 self.assertEqual(tcp.dport, external_port)
4549 port_out1 = tcp.sport
4551 self.logger.error(ppp("Unexpected or invalid packet:", p))
4554 dms = self.vapi.nat_det_map_dump()
4555 self.assertEqual(1, len(dms))
4556 self.assertEqual(2, dms[0].ses_num)
4559 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4560 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4561 TCP(sport=external_port, dport=port_out0))
4562 self.pg1.add_stream(p)
4563 self.pg_enable_capture(self.pg_interfaces)
4565 capture = self.pg0.get_capture(1)
4570 self.assertEqual(ip.src, self.pg1.remote_ip4)
4571 self.assertEqual(ip.dst, host0.ip4)
4572 self.assertEqual(tcp.dport, port_in)
4573 self.assertEqual(tcp.sport, external_port)
4575 self.logger.error(ppp("Unexpected or invalid packet:", p))
4579 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4580 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4581 TCP(sport=external_port, dport=port_out1))
4582 self.pg1.add_stream(p)
4583 self.pg_enable_capture(self.pg_interfaces)
4585 capture = self.pg0.get_capture(1)
4590 self.assertEqual(ip.src, self.pg1.remote_ip4)
4591 self.assertEqual(ip.dst, host1.ip4)
4592 self.assertEqual(tcp.dport, port_in)
4593 self.assertEqual(tcp.sport, external_port)
4595 self.logger.error(ppp("Unexpected or invalid packet", p))
4598 # session close api test
4599 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
4601 self.pg1.remote_ip4n,
4603 dms = self.vapi.nat_det_map_dump()
4604 self.assertEqual(dms[0].ses_num, 1)
4606 self.vapi.nat_det_close_session_in(host0.ip4n,
4608 self.pg1.remote_ip4n,
4610 dms = self.vapi.nat_det_map_dump()
4611 self.assertEqual(dms[0].ses_num, 0)
4613 def test_tcp_session_close_detection_in(self):
4614 """ Deterministic NAT TCP session close from inside network """
4615 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4617 socket.inet_aton(self.nat_addr),
4619 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4620 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4623 self.initiate_tcp_session(self.pg0, self.pg1)
4625 # close the session from inside
4627 # FIN packet in -> out
4628 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4629 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4630 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4632 self.pg0.add_stream(p)
4633 self.pg_enable_capture(self.pg_interfaces)
4635 self.pg1.get_capture(1)
4639 # ACK packet out -> in
4640 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4641 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4642 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4646 # FIN packet out -> in
4647 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4648 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4649 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4653 self.pg1.add_stream(pkts)
4654 self.pg_enable_capture(self.pg_interfaces)
4656 self.pg0.get_capture(2)
4658 # ACK packet in -> out
4659 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4660 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4661 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4663 self.pg0.add_stream(p)
4664 self.pg_enable_capture(self.pg_interfaces)
4666 self.pg1.get_capture(1)
4668 # Check if deterministic NAT44 closed the session
4669 dms = self.vapi.nat_det_map_dump()
4670 self.assertEqual(0, dms[0].ses_num)
4672 self.logger.error("TCP session termination failed")
4675 def test_tcp_session_close_detection_out(self):
4676 """ Deterministic NAT TCP session close from outside network """
4677 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4679 socket.inet_aton(self.nat_addr),
4681 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4682 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4685 self.initiate_tcp_session(self.pg0, self.pg1)
4687 # close the session from outside
4689 # FIN packet out -> in
4690 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4691 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4692 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4694 self.pg1.add_stream(p)
4695 self.pg_enable_capture(self.pg_interfaces)
4697 self.pg0.get_capture(1)
4701 # ACK packet in -> out
4702 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4703 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4704 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4708 # ACK packet in -> out
4709 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4710 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4711 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4715 self.pg0.add_stream(pkts)
4716 self.pg_enable_capture(self.pg_interfaces)
4718 self.pg1.get_capture(2)
4720 # ACK packet out -> in
4721 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4722 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4723 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4725 self.pg1.add_stream(p)
4726 self.pg_enable_capture(self.pg_interfaces)
4728 self.pg0.get_capture(1)
4730 # Check if deterministic NAT44 closed the session
4731 dms = self.vapi.nat_det_map_dump()
4732 self.assertEqual(0, dms[0].ses_num)
4734 self.logger.error("TCP session termination failed")
4737 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4738 def test_session_timeout(self):
4739 """ Deterministic NAT session timeouts """
4740 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4742 socket.inet_aton(self.nat_addr),
4744 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4745 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4748 self.initiate_tcp_session(self.pg0, self.pg1)
4749 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
4750 pkts = self.create_stream_in(self.pg0, self.pg1)
4751 self.pg0.add_stream(pkts)
4752 self.pg_enable_capture(self.pg_interfaces)
4754 capture = self.pg1.get_capture(len(pkts))
4757 dms = self.vapi.nat_det_map_dump()
4758 self.assertEqual(0, dms[0].ses_num)
4760 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4761 def test_session_limit_per_user(self):
4762 """ Deterministic NAT maximum sessions per user limit """
4763 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4765 socket.inet_aton(self.nat_addr),
4767 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4768 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4770 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
4771 src_address=self.pg2.local_ip4n,
4773 template_interval=10)
4774 self.vapi.nat_ipfix()
4777 for port in range(1025, 2025):
4778 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4779 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4780 UDP(sport=port, dport=port))
4783 self.pg0.add_stream(pkts)
4784 self.pg_enable_capture(self.pg_interfaces)
4786 capture = self.pg1.get_capture(len(pkts))
4788 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4789 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4790 UDP(sport=3001, dport=3002))
4791 self.pg0.add_stream(p)
4792 self.pg_enable_capture(self.pg_interfaces)
4794 capture = self.pg1.assert_nothing_captured()
4796 # verify ICMP error packet
4797 capture = self.pg0.get_capture(1)
4799 self.assertTrue(p.haslayer(ICMP))
4801 self.assertEqual(icmp.type, 3)
4802 self.assertEqual(icmp.code, 1)
4803 self.assertTrue(icmp.haslayer(IPerror))
4804 inner_ip = icmp[IPerror]
4805 self.assertEqual(inner_ip[UDPerror].sport, 3001)
4806 self.assertEqual(inner_ip[UDPerror].dport, 3002)
4808 dms = self.vapi.nat_det_map_dump()
4810 self.assertEqual(1000, dms[0].ses_num)
4812 # verify IPFIX logging
4813 self.vapi.cli("ipfix flush") # FIXME this should be an API call
4815 capture = self.pg2.get_capture(2)
4816 ipfix = IPFIXDecoder()
4817 # first load template
4819 self.assertTrue(p.haslayer(IPFIX))
4820 if p.haslayer(Template):
4821 ipfix.add_template(p.getlayer(Template))
4822 # verify events in data set
4824 if p.haslayer(Data):
4825 data = ipfix.decode_data_set(p.getlayer(Set))
4826 self.verify_ipfix_max_entries_per_user(data)
4828 def clear_nat_det(self):
4830 Clear deterministic NAT configuration.
4832 self.vapi.nat_ipfix(enable=0)
4833 self.vapi.nat_det_set_timeouts()
4834 deterministic_mappings = self.vapi.nat_det_map_dump()
4835 for dsm in deterministic_mappings:
4836 self.vapi.nat_det_add_del_map(dsm.in_addr,
4842 interfaces = self.vapi.nat44_interface_dump()
4843 for intf in interfaces:
4844 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
4849 super(TestDeterministicNAT, self).tearDown()
4850 if not self.vpp_dead:
4851 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4853 self.vapi.cli("show nat44 deterministic mappings"))
4855 self.vapi.cli("show nat44 deterministic timeouts"))
4857 self.vapi.cli("show nat44 deterministic sessions"))
4858 self.clear_nat_det()
4861 class TestNAT64(MethodHolder):
4862 """ NAT64 Test Cases """
4865 def setUpConstants(cls):
4866 super(TestNAT64, cls).setUpConstants()
4867 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
4868 "nat64 st hash buckets 256", "}"])
4871 def setUpClass(cls):
4872 super(TestNAT64, cls).setUpClass()
4875 cls.tcp_port_in = 6303
4876 cls.tcp_port_out = 6303
4877 cls.udp_port_in = 6304
4878 cls.udp_port_out = 6304
4879 cls.icmp_id_in = 6305
4880 cls.icmp_id_out = 6305
4881 cls.nat_addr = '10.0.0.3'
4882 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4884 cls.vrf1_nat_addr = '10.0.10.3'
4885 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
4887 cls.ipfix_src_port = 4739
4888 cls.ipfix_domain_id = 1
4890 cls.create_pg_interfaces(range(5))
4891 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
4892 cls.ip6_interfaces.append(cls.pg_interfaces[2])
4893 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
4895 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
4897 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
4899 cls.pg0.generate_remote_hosts(2)
4901 for i in cls.ip6_interfaces:
4904 i.configure_ipv6_neighbors()
4906 for i in cls.ip4_interfaces:
4912 cls.pg3.config_ip4()
4913 cls.pg3.resolve_arp()
4914 cls.pg3.config_ip6()
4915 cls.pg3.configure_ipv6_neighbors()
4918 super(TestNAT64, cls).tearDownClass()
4921 def test_pool(self):
4922 """ Add/delete address to NAT64 pool """
4923 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
4925 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
4927 addresses = self.vapi.nat64_pool_addr_dump()
4928 self.assertEqual(len(addresses), 1)
4929 self.assertEqual(addresses[0].address, nat_addr)
4931 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
4933 addresses = self.vapi.nat64_pool_addr_dump()
4934 self.assertEqual(len(addresses), 0)
4936 def test_interface(self):
4937 """ Enable/disable NAT64 feature on the interface """
4938 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4939 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4941 interfaces = self.vapi.nat64_interface_dump()
4942 self.assertEqual(len(interfaces), 2)
4945 for intf in interfaces:
4946 if intf.sw_if_index == self.pg0.sw_if_index:
4947 self.assertEqual(intf.is_inside, 1)
4949 elif intf.sw_if_index == self.pg1.sw_if_index:
4950 self.assertEqual(intf.is_inside, 0)
4952 self.assertTrue(pg0_found)
4953 self.assertTrue(pg1_found)
4955 features = self.vapi.cli("show interface features pg0")
4956 self.assertNotEqual(features.find('nat64-in2out'), -1)
4957 features = self.vapi.cli("show interface features pg1")
4958 self.assertNotEqual(features.find('nat64-out2in'), -1)
4960 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
4961 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
4963 interfaces = self.vapi.nat64_interface_dump()
4964 self.assertEqual(len(interfaces), 0)
4966 def test_static_bib(self):
4967 """ Add/delete static BIB entry """
4968 in_addr = socket.inet_pton(socket.AF_INET6,
4969 '2001:db8:85a3::8a2e:370:7334')
4970 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
4973 proto = IP_PROTOS.tcp
4975 self.vapi.nat64_add_del_static_bib(in_addr,
4980 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4985 self.assertEqual(bibe.i_addr, in_addr)
4986 self.assertEqual(bibe.o_addr, out_addr)
4987 self.assertEqual(bibe.i_port, in_port)
4988 self.assertEqual(bibe.o_port, out_port)
4989 self.assertEqual(static_bib_num, 1)
4991 self.vapi.nat64_add_del_static_bib(in_addr,
4997 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
5002 self.assertEqual(static_bib_num, 0)
5004 def test_set_timeouts(self):
5005 """ Set NAT64 timeouts """
5006 # verify default values
5007 timeouts = self.vapi.nat64_get_timeouts()
5008 self.assertEqual(timeouts.udp, 300)
5009 self.assertEqual(timeouts.icmp, 60)
5010 self.assertEqual(timeouts.tcp_trans, 240)
5011 self.assertEqual(timeouts.tcp_est, 7440)
5012 self.assertEqual(timeouts.tcp_incoming_syn, 6)
5014 # set and verify custom values
5015 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
5016 tcp_est=7450, tcp_incoming_syn=10)
5017 timeouts = self.vapi.nat64_get_timeouts()
5018 self.assertEqual(timeouts.udp, 200)
5019 self.assertEqual(timeouts.icmp, 30)
5020 self.assertEqual(timeouts.tcp_trans, 250)
5021 self.assertEqual(timeouts.tcp_est, 7450)
5022 self.assertEqual(timeouts.tcp_incoming_syn, 10)
5024 def test_dynamic(self):
5025 """ NAT64 dynamic translation test """
5026 self.tcp_port_in = 6303
5027 self.udp_port_in = 6304
5028 self.icmp_id_in = 6305
5030 ses_num_start = self.nat64_get_ses_num()
5032 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5034 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5035 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5038 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5039 self.pg0.add_stream(pkts)
5040 self.pg_enable_capture(self.pg_interfaces)
5042 capture = self.pg1.get_capture(len(pkts))
5043 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5044 dst_ip=self.pg1.remote_ip4)
5047 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5048 self.pg1.add_stream(pkts)
5049 self.pg_enable_capture(self.pg_interfaces)
5051 capture = self.pg0.get_capture(len(pkts))
5052 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5053 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5056 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5057 self.pg0.add_stream(pkts)
5058 self.pg_enable_capture(self.pg_interfaces)
5060 capture = self.pg1.get_capture(len(pkts))
5061 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5062 dst_ip=self.pg1.remote_ip4)
5065 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5066 self.pg1.add_stream(pkts)
5067 self.pg_enable_capture(self.pg_interfaces)
5069 capture = self.pg0.get_capture(len(pkts))
5070 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5072 ses_num_end = self.nat64_get_ses_num()
5074 self.assertEqual(ses_num_end - ses_num_start, 3)
5076 # tenant with specific VRF
5077 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5078 self.vrf1_nat_addr_n,
5079 vrf_id=self.vrf1_id)
5080 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5082 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
5083 self.pg2.add_stream(pkts)
5084 self.pg_enable_capture(self.pg_interfaces)
5086 capture = self.pg1.get_capture(len(pkts))
5087 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5088 dst_ip=self.pg1.remote_ip4)
5090 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5091 self.pg1.add_stream(pkts)
5092 self.pg_enable_capture(self.pg_interfaces)
5094 capture = self.pg2.get_capture(len(pkts))
5095 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
5097 def test_static(self):
5098 """ NAT64 static translation test """
5099 self.tcp_port_in = 60303
5100 self.udp_port_in = 60304
5101 self.icmp_id_in = 60305
5102 self.tcp_port_out = 60303
5103 self.udp_port_out = 60304
5104 self.icmp_id_out = 60305
5106 ses_num_start = self.nat64_get_ses_num()
5108 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5110 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5111 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5113 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5118 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5123 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5130 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5131 self.pg0.add_stream(pkts)
5132 self.pg_enable_capture(self.pg_interfaces)
5134 capture = self.pg1.get_capture(len(pkts))
5135 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5136 dst_ip=self.pg1.remote_ip4, same_port=True)
5139 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5140 self.pg1.add_stream(pkts)
5141 self.pg_enable_capture(self.pg_interfaces)
5143 capture = self.pg0.get_capture(len(pkts))
5144 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5145 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5147 ses_num_end = self.nat64_get_ses_num()
5149 self.assertEqual(ses_num_end - ses_num_start, 3)
5151 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5152 def test_session_timeout(self):
5153 """ NAT64 session timeout """
5154 self.icmp_id_in = 1234
5155 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5157 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5158 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5159 self.vapi.nat64_set_timeouts(icmp=5)
5161 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5162 self.pg0.add_stream(pkts)
5163 self.pg_enable_capture(self.pg_interfaces)
5165 capture = self.pg1.get_capture(len(pkts))
5167 ses_num_before_timeout = self.nat64_get_ses_num()
5171 # ICMP session after timeout
5172 ses_num_after_timeout = self.nat64_get_ses_num()
5173 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
5175 def test_icmp_error(self):
5176 """ NAT64 ICMP Error message translation """
5177 self.tcp_port_in = 6303
5178 self.udp_port_in = 6304
5179 self.icmp_id_in = 6305
5181 ses_num_start = self.nat64_get_ses_num()
5183 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5185 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5186 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5188 # send some packets to create sessions
5189 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5190 self.pg0.add_stream(pkts)
5191 self.pg_enable_capture(self.pg_interfaces)
5193 capture_ip4 = self.pg1.get_capture(len(pkts))
5194 self.verify_capture_out(capture_ip4,
5195 nat_ip=self.nat_addr,
5196 dst_ip=self.pg1.remote_ip4)
5198 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5199 self.pg1.add_stream(pkts)
5200 self.pg_enable_capture(self.pg_interfaces)
5202 capture_ip6 = self.pg0.get_capture(len(pkts))
5203 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5204 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
5205 self.pg0.remote_ip6)
5208 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5209 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
5210 ICMPv6DestUnreach(code=1) /
5211 packet[IPv6] for packet in capture_ip6]
5212 self.pg0.add_stream(pkts)
5213 self.pg_enable_capture(self.pg_interfaces)
5215 capture = self.pg1.get_capture(len(pkts))
5216 for packet in capture:
5218 self.assertEqual(packet[IP].src, self.nat_addr)
5219 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5220 self.assertEqual(packet[ICMP].type, 3)
5221 self.assertEqual(packet[ICMP].code, 13)
5222 inner = packet[IPerror]
5223 self.assertEqual(inner.src, self.pg1.remote_ip4)
5224 self.assertEqual(inner.dst, self.nat_addr)
5225 self.check_icmp_checksum(packet)
5226 if inner.haslayer(TCPerror):
5227 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
5228 elif inner.haslayer(UDPerror):
5229 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
5231 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
5233 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5237 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5238 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5239 ICMP(type=3, code=13) /
5240 packet[IP] for packet in capture_ip4]
5241 self.pg1.add_stream(pkts)
5242 self.pg_enable_capture(self.pg_interfaces)
5244 capture = self.pg0.get_capture(len(pkts))
5245 for packet in capture:
5247 self.assertEqual(packet[IPv6].src, ip.src)
5248 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5249 icmp = packet[ICMPv6DestUnreach]
5250 self.assertEqual(icmp.code, 1)
5251 inner = icmp[IPerror6]
5252 self.assertEqual(inner.src, self.pg0.remote_ip6)
5253 self.assertEqual(inner.dst, ip.src)
5254 self.check_icmpv6_checksum(packet)
5255 if inner.haslayer(TCPerror):
5256 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
5257 elif inner.haslayer(UDPerror):
5258 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
5260 self.assertEqual(inner[ICMPv6EchoRequest].id,
5263 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5266 def test_hairpinning(self):
5267 """ NAT64 hairpinning """
5269 client = self.pg0.remote_hosts[0]
5270 server = self.pg0.remote_hosts[1]
5271 server_tcp_in_port = 22
5272 server_tcp_out_port = 4022
5273 server_udp_in_port = 23
5274 server_udp_out_port = 4023
5275 client_tcp_in_port = 1234
5276 client_udp_in_port = 1235
5277 client_tcp_out_port = 0
5278 client_udp_out_port = 0
5279 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5280 nat_addr_ip6 = ip.src
5282 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5284 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5285 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5287 self.vapi.nat64_add_del_static_bib(server.ip6n,
5290 server_tcp_out_port,
5292 self.vapi.nat64_add_del_static_bib(server.ip6n,
5295 server_udp_out_port,
5300 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5301 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5302 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5304 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5305 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5306 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
5308 self.pg0.add_stream(pkts)
5309 self.pg_enable_capture(self.pg_interfaces)
5311 capture = self.pg0.get_capture(len(pkts))
5312 for packet in capture:
5314 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5315 self.assertEqual(packet[IPv6].dst, server.ip6)
5316 if packet.haslayer(TCP):
5317 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
5318 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
5319 self.check_tcp_checksum(packet)
5320 client_tcp_out_port = packet[TCP].sport
5322 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
5323 self.assertEqual(packet[UDP].dport, server_udp_in_port)
5324 self.check_udp_checksum(packet)
5325 client_udp_out_port = packet[UDP].sport
5327 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5332 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5333 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5334 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
5336 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5337 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5338 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
5340 self.pg0.add_stream(pkts)
5341 self.pg_enable_capture(self.pg_interfaces)
5343 capture = self.pg0.get_capture(len(pkts))
5344 for packet in capture:
5346 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5347 self.assertEqual(packet[IPv6].dst, client.ip6)
5348 if packet.haslayer(TCP):
5349 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
5350 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
5351 self.check_tcp_checksum(packet)
5353 self.assertEqual(packet[UDP].sport, server_udp_out_port)
5354 self.assertEqual(packet[UDP].dport, client_udp_in_port)
5355 self.check_udp_checksum(packet)
5357 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5362 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5363 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5364 ICMPv6DestUnreach(code=1) /
5365 packet[IPv6] for packet in capture]
5366 self.pg0.add_stream(pkts)
5367 self.pg_enable_capture(self.pg_interfaces)
5369 capture = self.pg0.get_capture(len(pkts))
5370 for packet in capture:
5372 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5373 self.assertEqual(packet[IPv6].dst, server.ip6)
5374 icmp = packet[ICMPv6DestUnreach]
5375 self.assertEqual(icmp.code, 1)
5376 inner = icmp[IPerror6]
5377 self.assertEqual(inner.src, server.ip6)
5378 self.assertEqual(inner.dst, nat_addr_ip6)
5379 self.check_icmpv6_checksum(packet)
5380 if inner.haslayer(TCPerror):
5381 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
5382 self.assertEqual(inner[TCPerror].dport,
5383 client_tcp_out_port)
5385 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
5386 self.assertEqual(inner[UDPerror].dport,
5387 client_udp_out_port)
5389 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5392 def test_prefix(self):
5393 """ NAT64 Network-Specific Prefix """
5395 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5397 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5398 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5399 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5400 self.vrf1_nat_addr_n,
5401 vrf_id=self.vrf1_id)
5402 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5405 global_pref64 = "2001:db8::"
5406 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
5407 global_pref64_len = 32
5408 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
5410 prefix = self.vapi.nat64_prefix_dump()
5411 self.assertEqual(len(prefix), 1)
5412 self.assertEqual(prefix[0].prefix, global_pref64_n)
5413 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
5414 self.assertEqual(prefix[0].vrf_id, 0)
5416 # Add tenant specific prefix
5417 vrf1_pref64 = "2001:db8:122:300::"
5418 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
5419 vrf1_pref64_len = 56
5420 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
5422 vrf_id=self.vrf1_id)
5423 prefix = self.vapi.nat64_prefix_dump()
5424 self.assertEqual(len(prefix), 2)
5427 pkts = self.create_stream_in_ip6(self.pg0,
5430 plen=global_pref64_len)
5431 self.pg0.add_stream(pkts)
5432 self.pg_enable_capture(self.pg_interfaces)
5434 capture = self.pg1.get_capture(len(pkts))
5435 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5436 dst_ip=self.pg1.remote_ip4)
5438 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5439 self.pg1.add_stream(pkts)
5440 self.pg_enable_capture(self.pg_interfaces)
5442 capture = self.pg0.get_capture(len(pkts))
5443 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5446 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
5448 # Tenant specific prefix
5449 pkts = self.create_stream_in_ip6(self.pg2,
5452 plen=vrf1_pref64_len)
5453 self.pg2.add_stream(pkts)
5454 self.pg_enable_capture(self.pg_interfaces)
5456 capture = self.pg1.get_capture(len(pkts))
5457 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5458 dst_ip=self.pg1.remote_ip4)
5460 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5461 self.pg1.add_stream(pkts)
5462 self.pg_enable_capture(self.pg_interfaces)
5464 capture = self.pg2.get_capture(len(pkts))
5465 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5468 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
5470 def test_unknown_proto(self):
5471 """ NAT64 translate packet with unknown protocol """
5473 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5475 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5476 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5477 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5480 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5481 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
5482 TCP(sport=self.tcp_port_in, dport=20))
5483 self.pg0.add_stream(p)
5484 self.pg_enable_capture(self.pg_interfaces)
5486 p = self.pg1.get_capture(1)
5488 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5489 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
5491 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5492 TCP(sport=1234, dport=1234))
5493 self.pg0.add_stream(p)
5494 self.pg_enable_capture(self.pg_interfaces)
5496 p = self.pg1.get_capture(1)
5499 self.assertEqual(packet[IP].src, self.nat_addr)
5500 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5501 self.assertTrue(packet.haslayer(GRE))
5502 self.check_ip_checksum(packet)
5504 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5508 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5509 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5511 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5512 TCP(sport=1234, dport=1234))
5513 self.pg1.add_stream(p)
5514 self.pg_enable_capture(self.pg_interfaces)
5516 p = self.pg0.get_capture(1)
5519 self.assertEqual(packet[IPv6].src, remote_ip6)
5520 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5521 self.assertEqual(packet[IPv6].nh, 47)
5523 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5526 def test_hairpinning_unknown_proto(self):
5527 """ NAT64 translate packet with unknown protocol - hairpinning """
5529 client = self.pg0.remote_hosts[0]
5530 server = self.pg0.remote_hosts[1]
5531 server_tcp_in_port = 22
5532 server_tcp_out_port = 4022
5533 client_tcp_in_port = 1234
5534 client_tcp_out_port = 1235
5535 server_nat_ip = "10.0.0.100"
5536 client_nat_ip = "10.0.0.110"
5537 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
5538 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
5539 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
5540 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
5542 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
5544 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5545 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5547 self.vapi.nat64_add_del_static_bib(server.ip6n,
5550 server_tcp_out_port,
5553 self.vapi.nat64_add_del_static_bib(server.ip6n,
5559 self.vapi.nat64_add_del_static_bib(client.ip6n,
5562 client_tcp_out_port,
5566 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5567 IPv6(src=client.ip6, dst=server_nat_ip6) /
5568 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5569 self.pg0.add_stream(p)
5570 self.pg_enable_capture(self.pg_interfaces)
5572 p = self.pg0.get_capture(1)
5574 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5575 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
5577 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5578 TCP(sport=1234, dport=1234))
5579 self.pg0.add_stream(p)
5580 self.pg_enable_capture(self.pg_interfaces)
5582 p = self.pg0.get_capture(1)
5585 self.assertEqual(packet[IPv6].src, client_nat_ip6)
5586 self.assertEqual(packet[IPv6].dst, server.ip6)
5587 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5589 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5593 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5594 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
5596 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5597 TCP(sport=1234, dport=1234))
5598 self.pg0.add_stream(p)
5599 self.pg_enable_capture(self.pg_interfaces)
5601 p = self.pg0.get_capture(1)
5604 self.assertEqual(packet[IPv6].src, server_nat_ip6)
5605 self.assertEqual(packet[IPv6].dst, client.ip6)
5606 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5608 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5611 def test_one_armed_nat64(self):
5612 """ One armed NAT64 """
5614 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
5618 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5620 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
5621 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
5624 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5625 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
5626 TCP(sport=12345, dport=80))
5627 self.pg3.add_stream(p)
5628 self.pg_enable_capture(self.pg_interfaces)
5630 capture = self.pg3.get_capture(1)
5635 self.assertEqual(ip.src, self.nat_addr)
5636 self.assertEqual(ip.dst, self.pg3.remote_ip4)
5637 self.assertNotEqual(tcp.sport, 12345)
5638 external_port = tcp.sport
5639 self.assertEqual(tcp.dport, 80)
5640 self.check_tcp_checksum(p)
5641 self.check_ip_checksum(p)
5643 self.logger.error(ppp("Unexpected or invalid packet:", p))
5647 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5648 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5649 TCP(sport=80, dport=external_port))
5650 self.pg3.add_stream(p)
5651 self.pg_enable_capture(self.pg_interfaces)
5653 capture = self.pg3.get_capture(1)
5658 self.assertEqual(ip.src, remote_host_ip6)
5659 self.assertEqual(ip.dst, self.pg3.remote_ip6)
5660 self.assertEqual(tcp.sport, 80)
5661 self.assertEqual(tcp.dport, 12345)
5662 self.check_tcp_checksum(p)
5664 self.logger.error(ppp("Unexpected or invalid packet:", p))
5667 def test_frag_in_order(self):
5668 """ NAT64 translate fragments arriving in order """
5669 self.tcp_port_in = random.randint(1025, 65535)
5671 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5673 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5674 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5676 reass = self.vapi.nat_reass_dump()
5677 reass_n_start = len(reass)
5681 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5682 self.tcp_port_in, 20, data)
5683 self.pg0.add_stream(pkts)
5684 self.pg_enable_capture(self.pg_interfaces)
5686 frags = self.pg1.get_capture(len(pkts))
5687 p = self.reass_frags_and_verify(frags,
5689 self.pg1.remote_ip4)
5690 self.assertEqual(p[TCP].dport, 20)
5691 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5692 self.tcp_port_out = p[TCP].sport
5693 self.assertEqual(data, p[Raw].load)
5696 data = "A" * 4 + "b" * 16 + "C" * 3
5697 pkts = self.create_stream_frag(self.pg1,
5702 self.pg1.add_stream(pkts)
5703 self.pg_enable_capture(self.pg_interfaces)
5705 frags = self.pg0.get_capture(len(pkts))
5706 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5707 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5708 self.assertEqual(p[TCP].sport, 20)
5709 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5710 self.assertEqual(data, p[Raw].load)
5712 reass = self.vapi.nat_reass_dump()
5713 reass_n_end = len(reass)
5715 self.assertEqual(reass_n_end - reass_n_start, 2)
5717 def test_reass_hairpinning(self):
5718 """ NAT64 fragments hairpinning """
5720 client = self.pg0.remote_hosts[0]
5721 server = self.pg0.remote_hosts[1]
5722 server_in_port = random.randint(1025, 65535)
5723 server_out_port = random.randint(1025, 65535)
5724 client_in_port = random.randint(1025, 65535)
5725 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5726 nat_addr_ip6 = ip.src
5728 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5730 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5731 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5733 # add static BIB entry for server
5734 self.vapi.nat64_add_del_static_bib(server.ip6n,
5740 # send packet from host to server
5741 pkts = self.create_stream_frag_ip6(self.pg0,
5746 self.pg0.add_stream(pkts)
5747 self.pg_enable_capture(self.pg_interfaces)
5749 frags = self.pg0.get_capture(len(pkts))
5750 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
5751 self.assertNotEqual(p[TCP].sport, client_in_port)
5752 self.assertEqual(p[TCP].dport, server_in_port)
5753 self.assertEqual(data, p[Raw].load)
5755 def test_frag_out_of_order(self):
5756 """ NAT64 translate fragments arriving out of order """
5757 self.tcp_port_in = random.randint(1025, 65535)
5759 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5761 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5762 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5766 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5767 self.tcp_port_in, 20, data)
5769 self.pg0.add_stream(pkts)
5770 self.pg_enable_capture(self.pg_interfaces)
5772 frags = self.pg1.get_capture(len(pkts))
5773 p = self.reass_frags_and_verify(frags,
5775 self.pg1.remote_ip4)
5776 self.assertEqual(p[TCP].dport, 20)
5777 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5778 self.tcp_port_out = p[TCP].sport
5779 self.assertEqual(data, p[Raw].load)
5782 data = "A" * 4 + "B" * 16 + "C" * 3
5783 pkts = self.create_stream_frag(self.pg1,
5789 self.pg1.add_stream(pkts)
5790 self.pg_enable_capture(self.pg_interfaces)
5792 frags = self.pg0.get_capture(len(pkts))
5793 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5794 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5795 self.assertEqual(p[TCP].sport, 20)
5796 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5797 self.assertEqual(data, p[Raw].load)
5799 def test_interface_addr(self):
5800 """ Acquire NAT64 pool addresses from interface """
5801 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
5803 # no address in NAT64 pool
5804 adresses = self.vapi.nat44_address_dump()
5805 self.assertEqual(0, len(adresses))
5807 # configure interface address and check NAT64 address pool
5808 self.pg4.config_ip4()
5809 addresses = self.vapi.nat64_pool_addr_dump()
5810 self.assertEqual(len(addresses), 1)
5811 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
5813 # remove interface address and check NAT64 address pool
5814 self.pg4.unconfig_ip4()
5815 addresses = self.vapi.nat64_pool_addr_dump()
5816 self.assertEqual(0, len(adresses))
5818 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5819 def test_ipfix_max_bibs_sessions(self):
5820 """ IPFIX logging maximum session and BIB entries exceeded """
5823 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5827 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5829 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5830 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5834 for i in range(0, max_bibs):
5835 src = "fd01:aa::%x" % (i)
5836 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5837 IPv6(src=src, dst=remote_host_ip6) /
5838 TCP(sport=12345, dport=80))
5840 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5841 IPv6(src=src, dst=remote_host_ip6) /
5842 TCP(sport=12345, dport=22))
5844 self.pg0.add_stream(pkts)
5845 self.pg_enable_capture(self.pg_interfaces)
5847 self.pg1.get_capture(max_sessions)
5849 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5850 src_address=self.pg3.local_ip4n,
5852 template_interval=10)
5853 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5854 src_port=self.ipfix_src_port)
5856 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5857 IPv6(src=src, dst=remote_host_ip6) /
5858 TCP(sport=12345, dport=25))
5859 self.pg0.add_stream(p)
5860 self.pg_enable_capture(self.pg_interfaces)
5862 self.pg1.get_capture(0)
5863 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5864 capture = self.pg3.get_capture(9)
5865 ipfix = IPFIXDecoder()
5866 # first load template
5868 self.assertTrue(p.haslayer(IPFIX))
5869 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5870 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5871 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5872 self.assertEqual(p[UDP].dport, 4739)
5873 self.assertEqual(p[IPFIX].observationDomainID,
5874 self.ipfix_domain_id)
5875 if p.haslayer(Template):
5876 ipfix.add_template(p.getlayer(Template))
5877 # verify events in data set
5879 if p.haslayer(Data):
5880 data = ipfix.decode_data_set(p.getlayer(Set))
5881 self.verify_ipfix_max_sessions(data, max_sessions)
5883 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5884 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5885 TCP(sport=12345, dport=80))
5886 self.pg0.add_stream(p)
5887 self.pg_enable_capture(self.pg_interfaces)
5889 self.pg1.get_capture(0)
5890 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5891 capture = self.pg3.get_capture(1)
5892 # verify events in data set
5894 self.assertTrue(p.haslayer(IPFIX))
5895 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5896 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5897 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5898 self.assertEqual(p[UDP].dport, 4739)
5899 self.assertEqual(p[IPFIX].observationDomainID,
5900 self.ipfix_domain_id)
5901 if p.haslayer(Data):
5902 data = ipfix.decode_data_set(p.getlayer(Set))
5903 self.verify_ipfix_max_bibs(data, max_bibs)
5905 def test_ipfix_max_frags(self):
5906 """ IPFIX logging maximum fragments pending reassembly exceeded """
5907 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5909 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5910 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5911 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
5912 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5913 src_address=self.pg3.local_ip4n,
5915 template_interval=10)
5916 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5917 src_port=self.ipfix_src_port)
5920 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5921 self.tcp_port_in, 20, data)
5922 self.pg0.add_stream(pkts[-1])
5923 self.pg_enable_capture(self.pg_interfaces)
5925 self.pg1.get_capture(0)
5926 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5927 capture = self.pg3.get_capture(9)
5928 ipfix = IPFIXDecoder()
5929 # first load template
5931 self.assertTrue(p.haslayer(IPFIX))
5932 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5933 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5934 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5935 self.assertEqual(p[UDP].dport, 4739)
5936 self.assertEqual(p[IPFIX].observationDomainID,
5937 self.ipfix_domain_id)
5938 if p.haslayer(Template):
5939 ipfix.add_template(p.getlayer(Template))
5940 # verify events in data set
5942 if p.haslayer(Data):
5943 data = ipfix.decode_data_set(p.getlayer(Set))
5944 self.verify_ipfix_max_fragments_ip6(data, 0,
5945 self.pg0.remote_ip6n)
5947 def test_ipfix_bib_ses(self):
5948 """ IPFIX logging NAT64 BIB/session create and delete events """
5949 self.tcp_port_in = random.randint(1025, 65535)
5950 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5954 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5956 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5957 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5958 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5959 src_address=self.pg3.local_ip4n,
5961 template_interval=10)
5962 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5963 src_port=self.ipfix_src_port)
5966 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5967 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5968 TCP(sport=self.tcp_port_in, dport=25))
5969 self.pg0.add_stream(p)
5970 self.pg_enable_capture(self.pg_interfaces)
5972 p = self.pg1.get_capture(1)
5973 self.tcp_port_out = p[0][TCP].sport
5974 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5975 capture = self.pg3.get_capture(10)
5976 ipfix = IPFIXDecoder()
5977 # first load template
5979 self.assertTrue(p.haslayer(IPFIX))
5980 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5981 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5982 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5983 self.assertEqual(p[UDP].dport, 4739)
5984 self.assertEqual(p[IPFIX].observationDomainID,
5985 self.ipfix_domain_id)
5986 if p.haslayer(Template):
5987 ipfix.add_template(p.getlayer(Template))
5988 # verify events in data set
5990 if p.haslayer(Data):
5991 data = ipfix.decode_data_set(p.getlayer(Set))
5992 if ord(data[0][230]) == 10:
5993 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
5994 elif ord(data[0][230]) == 6:
5995 self.verify_ipfix_nat64_ses(data,
5997 self.pg0.remote_ip6n,
5998 self.pg1.remote_ip4,
6001 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6004 self.pg_enable_capture(self.pg_interfaces)
6005 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6008 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6009 capture = self.pg3.get_capture(2)
6010 # verify events in data set
6012 self.assertTrue(p.haslayer(IPFIX))
6013 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6014 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6015 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6016 self.assertEqual(p[UDP].dport, 4739)
6017 self.assertEqual(p[IPFIX].observationDomainID,
6018 self.ipfix_domain_id)
6019 if p.haslayer(Data):
6020 data = ipfix.decode_data_set(p.getlayer(Set))
6021 if ord(data[0][230]) == 11:
6022 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
6023 elif ord(data[0][230]) == 7:
6024 self.verify_ipfix_nat64_ses(data,
6026 self.pg0.remote_ip6n,
6027 self.pg1.remote_ip4,
6030 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6032 def nat64_get_ses_num(self):
6034 Return number of active NAT64 sessions.
6036 st = self.vapi.nat64_st_dump()
6039 def clear_nat64(self):
6041 Clear NAT64 configuration.
6043 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
6044 domain_id=self.ipfix_domain_id)
6045 self.ipfix_src_port = 4739
6046 self.ipfix_domain_id = 1
6048 self.vapi.nat64_set_timeouts()
6050 interfaces = self.vapi.nat64_interface_dump()
6051 for intf in interfaces:
6052 if intf.is_inside > 1:
6053 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6056 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6060 bib = self.vapi.nat64_bib_dump(255)
6063 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
6071 adresses = self.vapi.nat64_pool_addr_dump()
6072 for addr in adresses:
6073 self.vapi.nat64_add_del_pool_addr_range(addr.address,
6078 prefixes = self.vapi.nat64_prefix_dump()
6079 for prefix in prefixes:
6080 self.vapi.nat64_add_del_prefix(prefix.prefix,
6082 vrf_id=prefix.vrf_id,
6086 super(TestNAT64, self).tearDown()
6087 if not self.vpp_dead:
6088 self.logger.info(self.vapi.cli("show nat64 pool"))
6089 self.logger.info(self.vapi.cli("show nat64 interfaces"))
6090 self.logger.info(self.vapi.cli("show nat64 prefix"))
6091 self.logger.info(self.vapi.cli("show nat64 bib all"))
6092 self.logger.info(self.vapi.cli("show nat64 session table all"))
6093 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
6097 class TestDSlite(MethodHolder):
6098 """ DS-Lite Test Cases """
6101 def setUpClass(cls):
6102 super(TestDSlite, cls).setUpClass()
6105 cls.nat_addr = '10.0.0.3'
6106 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
6108 cls.create_pg_interfaces(range(2))
6110 cls.pg0.config_ip4()
6111 cls.pg0.resolve_arp()
6113 cls.pg1.config_ip6()
6114 cls.pg1.generate_remote_hosts(2)
6115 cls.pg1.configure_ipv6_neighbors()
6118 super(TestDSlite, cls).tearDownClass()
6121 def test_dslite(self):
6122 """ Test DS-Lite """
6123 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
6125 aftr_ip4 = '192.0.0.1'
6126 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6127 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6128 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6129 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6132 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6133 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
6134 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6135 UDP(sport=20000, dport=10000))
6136 self.pg1.add_stream(p)
6137 self.pg_enable_capture(self.pg_interfaces)
6139 capture = self.pg0.get_capture(1)
6140 capture = capture[0]
6141 self.assertFalse(capture.haslayer(IPv6))
6142 self.assertEqual(capture[IP].src, self.nat_addr)
6143 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6144 self.assertNotEqual(capture[UDP].sport, 20000)
6145 self.assertEqual(capture[UDP].dport, 10000)
6146 self.check_ip_checksum(capture)
6147 out_port = capture[UDP].sport
6149 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6150 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6151 UDP(sport=10000, dport=out_port))
6152 self.pg0.add_stream(p)
6153 self.pg_enable_capture(self.pg_interfaces)
6155 capture = self.pg1.get_capture(1)
6156 capture = capture[0]
6157 self.assertEqual(capture[IPv6].src, aftr_ip6)
6158 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6159 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6160 self.assertEqual(capture[IP].dst, '192.168.1.1')
6161 self.assertEqual(capture[UDP].sport, 10000)
6162 self.assertEqual(capture[UDP].dport, 20000)
6163 self.check_ip_checksum(capture)
6166 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6167 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6168 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6169 TCP(sport=20001, dport=10001))
6170 self.pg1.add_stream(p)
6171 self.pg_enable_capture(self.pg_interfaces)
6173 capture = self.pg0.get_capture(1)
6174 capture = capture[0]
6175 self.assertFalse(capture.haslayer(IPv6))
6176 self.assertEqual(capture[IP].src, self.nat_addr)
6177 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6178 self.assertNotEqual(capture[TCP].sport, 20001)
6179 self.assertEqual(capture[TCP].dport, 10001)
6180 self.check_ip_checksum(capture)
6181 self.check_tcp_checksum(capture)
6182 out_port = capture[TCP].sport
6184 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6185 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6186 TCP(sport=10001, dport=out_port))
6187 self.pg0.add_stream(p)
6188 self.pg_enable_capture(self.pg_interfaces)
6190 capture = self.pg1.get_capture(1)
6191 capture = capture[0]
6192 self.assertEqual(capture[IPv6].src, aftr_ip6)
6193 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6194 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6195 self.assertEqual(capture[IP].dst, '192.168.1.1')
6196 self.assertEqual(capture[TCP].sport, 10001)
6197 self.assertEqual(capture[TCP].dport, 20001)
6198 self.check_ip_checksum(capture)
6199 self.check_tcp_checksum(capture)
6202 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6203 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6204 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6205 ICMP(id=4000, type='echo-request'))
6206 self.pg1.add_stream(p)
6207 self.pg_enable_capture(self.pg_interfaces)
6209 capture = self.pg0.get_capture(1)
6210 capture = capture[0]
6211 self.assertFalse(capture.haslayer(IPv6))
6212 self.assertEqual(capture[IP].src, self.nat_addr)
6213 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6214 self.assertNotEqual(capture[ICMP].id, 4000)
6215 self.check_ip_checksum(capture)
6216 self.check_icmp_checksum(capture)
6217 out_id = capture[ICMP].id
6219 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6220 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6221 ICMP(id=out_id, type='echo-reply'))
6222 self.pg0.add_stream(p)
6223 self.pg_enable_capture(self.pg_interfaces)
6225 capture = self.pg1.get_capture(1)
6226 capture = capture[0]
6227 self.assertEqual(capture[IPv6].src, aftr_ip6)
6228 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6229 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6230 self.assertEqual(capture[IP].dst, '192.168.1.1')
6231 self.assertEqual(capture[ICMP].id, 4000)
6232 self.check_ip_checksum(capture)
6233 self.check_icmp_checksum(capture)
6235 # ping DS-Lite AFTR tunnel endpoint address
6236 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6237 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
6238 ICMPv6EchoRequest())
6239 self.pg1.add_stream(p)
6240 self.pg_enable_capture(self.pg_interfaces)
6242 capture = self.pg1.get_capture(1)
6243 self.assertEqual(1, len(capture))
6244 capture = capture[0]
6245 self.assertEqual(capture[IPv6].src, aftr_ip6)
6246 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6247 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6250 super(TestDSlite, self).tearDown()
6251 if not self.vpp_dead:
6252 self.logger.info(self.vapi.cli("show dslite pool"))
6254 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6255 self.logger.info(self.vapi.cli("show dslite sessions"))
6258 class TestDSliteCE(MethodHolder):
6259 """ DS-Lite CE Test Cases """
6262 def setUpConstants(cls):
6263 super(TestDSliteCE, cls).setUpConstants()
6264 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
6267 def setUpClass(cls):
6268 super(TestDSliteCE, cls).setUpClass()
6271 cls.create_pg_interfaces(range(2))
6273 cls.pg0.config_ip4()
6274 cls.pg0.resolve_arp()
6276 cls.pg1.config_ip6()
6277 cls.pg1.generate_remote_hosts(1)
6278 cls.pg1.configure_ipv6_neighbors()
6281 super(TestDSliteCE, cls).tearDownClass()
6284 def test_dslite_ce(self):
6285 """ Test DS-Lite CE """
6287 b4_ip4 = '192.0.0.2'
6288 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
6289 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
6290 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
6291 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
6293 aftr_ip4 = '192.0.0.1'
6294 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6295 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6296 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6297 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6299 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
6300 dst_address_length=128,
6301 next_hop_address=self.pg1.remote_ip6n,
6302 next_hop_sw_if_index=self.pg1.sw_if_index,
6306 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6307 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
6308 UDP(sport=10000, dport=20000))
6309 self.pg0.add_stream(p)
6310 self.pg_enable_capture(self.pg_interfaces)
6312 capture = self.pg1.get_capture(1)
6313 capture = capture[0]
6314 self.assertEqual(capture[IPv6].src, b4_ip6)
6315 self.assertEqual(capture[IPv6].dst, aftr_ip6)
6316 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6317 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
6318 self.assertEqual(capture[UDP].sport, 10000)
6319 self.assertEqual(capture[UDP].dport, 20000)
6320 self.check_ip_checksum(capture)
6323 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6324 IPv6(dst=b4_ip6, src=aftr_ip6) /
6325 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
6326 UDP(sport=20000, dport=10000))
6327 self.pg1.add_stream(p)
6328 self.pg_enable_capture(self.pg_interfaces)
6330 capture = self.pg0.get_capture(1)
6331 capture = capture[0]
6332 self.assertFalse(capture.haslayer(IPv6))
6333 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
6334 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6335 self.assertEqual(capture[UDP].sport, 20000)
6336 self.assertEqual(capture[UDP].dport, 10000)
6337 self.check_ip_checksum(capture)
6339 # ping DS-Lite B4 tunnel endpoint address
6340 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6341 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
6342 ICMPv6EchoRequest())
6343 self.pg1.add_stream(p)
6344 self.pg_enable_capture(self.pg_interfaces)
6346 capture = self.pg1.get_capture(1)
6347 self.assertEqual(1, len(capture))
6348 capture = capture[0]
6349 self.assertEqual(capture[IPv6].src, b4_ip6)
6350 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6351 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6354 super(TestDSliteCE, self).tearDown()
6355 if not self.vpp_dead:
6357 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6359 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
6362 class TestNAT66(MethodHolder):
6363 """ NAT66 Test Cases """
6366 def setUpClass(cls):
6367 super(TestNAT66, cls).setUpClass()
6370 cls.nat_addr = 'fd01:ff::2'
6371 cls.nat_addr_n = socket.inet_pton(socket.AF_INET6, cls.nat_addr)
6373 cls.create_pg_interfaces(range(2))
6374 cls.interfaces = list(cls.pg_interfaces)
6376 for i in cls.interfaces:
6379 i.configure_ipv6_neighbors()
6382 super(TestNAT66, cls).tearDownClass()
6385 def test_static(self):
6386 """ 1:1 NAT66 test """
6387 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
6388 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6389 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
6394 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6395 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6398 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6399 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6402 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6403 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6404 ICMPv6EchoRequest())
6406 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6407 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6408 GRE() / IP() / TCP())
6410 self.pg0.add_stream(pkts)
6411 self.pg_enable_capture(self.pg_interfaces)
6413 capture = self.pg1.get_capture(len(pkts))
6414 for packet in capture:
6416 self.assertEqual(packet[IPv6].src, self.nat_addr)
6417 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
6418 if packet.haslayer(TCP):
6419 self.check_tcp_checksum(packet)
6420 elif packet.haslayer(UDP):
6421 self.check_udp_checksum(packet)
6422 elif packet.haslayer(ICMPv6EchoRequest):
6423 self.check_icmpv6_checksum(packet)
6425 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6430 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6431 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6434 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6435 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6438 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6439 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6442 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6443 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6444 GRE() / IP() / TCP())
6446 self.pg1.add_stream(pkts)
6447 self.pg_enable_capture(self.pg_interfaces)
6449 capture = self.pg0.get_capture(len(pkts))
6450 for packet in capture:
6452 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
6453 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6454 if packet.haslayer(TCP):
6455 self.check_tcp_checksum(packet)
6456 elif packet.haslayer(UDP):
6457 self.check_udp_checksum(packet)
6458 elif packet.haslayer(ICMPv6EchoReply):
6459 self.check_icmpv6_checksum(packet)
6461 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6464 sm = self.vapi.nat66_static_mapping_dump()
6465 self.assertEqual(len(sm), 1)
6466 self.assertEqual(sm[0].total_pkts, 8)
6468 def test_check_no_translate(self):
6469 """ NAT66 translate only when egress interface is outside interface """
6470 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
6471 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index)
6472 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
6476 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6477 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6479 self.pg0.add_stream([p])
6480 self.pg_enable_capture(self.pg_interfaces)
6482 capture = self.pg1.get_capture(1)
6485 self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
6486 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
6488 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6491 def clear_nat66(self):
6493 Clear NAT66 configuration.
6495 interfaces = self.vapi.nat66_interface_dump()
6496 for intf in interfaces:
6497 self.vapi.nat66_add_del_interface(intf.sw_if_index,
6501 static_mappings = self.vapi.nat66_static_mapping_dump()
6502 for sm in static_mappings:
6503 self.vapi.nat66_add_del_static_mapping(sm.local_ip_address,
6504 sm.external_ip_address,
6509 super(TestNAT66, self).tearDown()
6510 if not self.vpp_dead:
6511 self.logger.info(self.vapi.cli("show nat66 interfaces"))
6512 self.logger.info(self.vapi.cli("show nat66 static mappings"))
6515 if __name__ == '__main__':
6516 unittest.main(testRunner=VppTestRunner)