9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
31 super(MethodHolder, cls).setUpClass()
34 super(MethodHolder, self).tearDown()
36 def check_ip_checksum(self, pkt):
38 Check IP checksum of the packet
40 :param pkt: Packet to check IP checksum
42 new = pkt.__class__(str(pkt))
44 new = new.__class__(str(new))
45 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
47 def check_tcp_checksum(self, pkt):
49 Check TCP checksum in IP packet
51 :param pkt: Packet to check TCP checksum
53 new = pkt.__class__(str(pkt))
55 new = new.__class__(str(new))
56 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
58 def check_udp_checksum(self, pkt):
60 Check UDP checksum in IP packet
62 :param pkt: Packet to check UDP checksum
64 new = pkt.__class__(str(pkt))
66 new = new.__class__(str(new))
67 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
69 def check_icmp_errror_embedded(self, pkt):
71 Check ICMP error embeded packet checksum
73 :param pkt: Packet to check ICMP error embeded packet checksum
75 if pkt.haslayer(IPerror):
76 new = pkt.__class__(str(pkt))
77 del new['IPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
81 if pkt.haslayer(TCPerror):
82 new = pkt.__class__(str(pkt))
83 del new['TCPerror'].chksum
84 new = new.__class__(str(new))
85 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
87 if pkt.haslayer(UDPerror):
88 if pkt['UDPerror'].chksum != 0:
89 new = pkt.__class__(str(pkt))
90 del new['UDPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['UDPerror'].chksum,
93 pkt['UDPerror'].chksum)
95 if pkt.haslayer(ICMPerror):
96 del new['ICMPerror'].chksum
97 new = new.__class__(str(new))
98 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
100 def check_icmp_checksum(self, pkt):
102 Check ICMP checksum in IPv4 packet
104 :param pkt: Packet to check ICMP checksum
106 new = pkt.__class__(str(pkt))
107 del new['ICMP'].chksum
108 new = new.__class__(str(new))
109 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110 if pkt.haslayer(IPerror):
111 self.check_icmp_errror_embedded(pkt)
113 def check_icmpv6_checksum(self, pkt):
115 Check ICMPv6 checksum in IPv4 packet
117 :param pkt: Packet to check ICMPv6 checksum
119 new = pkt.__class__(str(pkt))
120 if pkt.haslayer(ICMPv6DestUnreach):
121 del new['ICMPv6DestUnreach'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124 pkt['ICMPv6DestUnreach'].cksum)
125 self.check_icmp_errror_embedded(pkt)
126 if pkt.haslayer(ICMPv6EchoRequest):
127 del new['ICMPv6EchoRequest'].cksum
128 new = new.__class__(str(new))
129 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130 pkt['ICMPv6EchoRequest'].cksum)
131 if pkt.haslayer(ICMPv6EchoReply):
132 del new['ICMPv6EchoReply'].cksum
133 new = new.__class__(str(new))
134 self.assertEqual(new['ICMPv6EchoReply'].cksum,
135 pkt['ICMPv6EchoReply'].cksum)
137 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
139 Create packet stream for inside network
141 :param in_if: Inside interface
142 :param out_if: Outside interface
143 :param dst_ip: Destination address
144 :param ttl: TTL of generated packets
147 dst_ip = out_if.remote_ip4
151 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
152 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
153 TCP(sport=self.tcp_port_in, dport=20))
157 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
158 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
159 UDP(sport=self.udp_port_in, dport=20))
163 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
165 ICMP(id=self.icmp_id_in, type='echo-request'))
170 def compose_ip6(self, ip4, pref, plen):
172 Compose IPv4-embedded IPv6 addresses
174 :param ip4: IPv4 address
175 :param pref: IPv6 prefix
176 :param plen: IPv6 prefix length
177 :returns: IPv4-embedded IPv6 addresses
179 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
180 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
195 pref_n[10] = ip4_n[3]
199 pref_n[10] = ip4_n[2]
200 pref_n[11] = ip4_n[3]
203 pref_n[10] = ip4_n[1]
204 pref_n[11] = ip4_n[2]
205 pref_n[12] = ip4_n[3]
207 pref_n[12] = ip4_n[0]
208 pref_n[13] = ip4_n[1]
209 pref_n[14] = ip4_n[2]
210 pref_n[15] = ip4_n[3]
211 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
213 def extract_ip4(self, ip6, plen):
215 Extract IPv4 address embedded in IPv6 addresses
217 :param ip6: IPv6 address
218 :param plen: IPv6 prefix length
219 :returns: extracted IPv4 address
221 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
253 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
255 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
257 Create IPv6 packet stream for inside network
259 :param in_if: Inside interface
260 :param out_if: Outside interface
261 :param ttl: Hop Limit of generated packets
262 :param pref: NAT64 prefix
263 :param plen: NAT64 prefix length
267 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
269 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
272 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
273 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
274 TCP(sport=self.tcp_port_in, dport=20))
278 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
279 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
280 UDP(sport=self.udp_port_in, dport=20))
284 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
285 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
286 ICMPv6EchoRequest(id=self.icmp_id_in))
291 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
292 use_inside_ports=False):
294 Create packet stream for outside network
296 :param out_if: Outside interface
297 :param dst_ip: Destination IP address (Default use global NAT address)
298 :param ttl: TTL of generated packets
299 :param use_inside_ports: Use inside NAT ports as destination ports
300 instead of outside ports
303 dst_ip = self.nat_addr
304 if not use_inside_ports:
305 tcp_port = self.tcp_port_out
306 udp_port = self.udp_port_out
307 icmp_id = self.icmp_id_out
309 tcp_port = self.tcp_port_in
310 udp_port = self.udp_port_in
311 icmp_id = self.icmp_id_in
314 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
315 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
316 TCP(dport=tcp_port, sport=20))
320 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
321 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
322 UDP(dport=udp_port, sport=20))
326 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
327 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
328 ICMP(id=icmp_id, type='echo-reply'))
333 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
335 Create packet stream for outside network
337 :param out_if: Outside interface
338 :param dst_ip: Destination IP address (Default use global NAT address)
339 :param hl: HL of generated packets
343 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
344 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
345 TCP(dport=self.tcp_port_out, sport=20))
349 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
350 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
351 UDP(dport=self.udp_port_out, sport=20))
355 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
356 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
357 ICMPv6EchoReply(id=self.icmp_id_out))
362 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
363 packet_num=3, dst_ip=None, is_ip6=False):
365 Verify captured packets on outside network
367 :param capture: Captured packets
368 :param nat_ip: Translated IP address (Default use global NAT address)
369 :param same_port: Sorce port number is not translated (Default False)
370 :param packet_num: Expected number of packets (Default 3)
371 :param dst_ip: Destination IP address (Default do not verify)
372 :param is_ip6: If L3 protocol is IPv6 (Default False)
376 ICMP46 = ICMPv6EchoRequest
381 nat_ip = self.nat_addr
382 self.assertEqual(packet_num, len(capture))
383 for packet in capture:
386 self.check_ip_checksum(packet)
387 self.assertEqual(packet[IP46].src, nat_ip)
388 if dst_ip is not None:
389 self.assertEqual(packet[IP46].dst, dst_ip)
390 if packet.haslayer(TCP):
392 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
395 packet[TCP].sport, self.tcp_port_in)
396 self.tcp_port_out = packet[TCP].sport
397 self.check_tcp_checksum(packet)
398 elif packet.haslayer(UDP):
400 self.assertEqual(packet[UDP].sport, self.udp_port_in)
403 packet[UDP].sport, self.udp_port_in)
404 self.udp_port_out = packet[UDP].sport
407 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
409 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
410 self.icmp_id_out = packet[ICMP46].id
412 self.check_icmpv6_checksum(packet)
414 self.check_icmp_checksum(packet)
416 self.logger.error(ppp("Unexpected or invalid packet "
417 "(outside network):", packet))
420 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
421 packet_num=3, dst_ip=None):
423 Verify captured packets on outside network
425 :param capture: Captured packets
426 :param nat_ip: Translated IP address
427 :param same_port: Sorce port number is not translated (Default False)
428 :param packet_num: Expected number of packets (Default 3)
429 :param dst_ip: Destination IP address (Default do not verify)
431 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
434 def verify_capture_in(self, capture, in_if, packet_num=3):
436 Verify captured packets on inside network
438 :param capture: Captured packets
439 :param in_if: Inside interface
440 :param packet_num: Expected number of packets (Default 3)
442 self.assertEqual(packet_num, len(capture))
443 for packet in capture:
445 self.check_ip_checksum(packet)
446 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447 if packet.haslayer(TCP):
448 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
449 self.check_tcp_checksum(packet)
450 elif packet.haslayer(UDP):
451 self.assertEqual(packet[UDP].dport, self.udp_port_in)
453 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
454 self.check_icmp_checksum(packet)
456 self.logger.error(ppp("Unexpected or invalid packet "
457 "(inside network):", packet))
460 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
462 Verify captured IPv6 packets on inside network
464 :param capture: Captured packets
465 :param src_ip: Source IP
466 :param dst_ip: Destination IP address
467 :param packet_num: Expected number of packets (Default 3)
469 self.assertEqual(packet_num, len(capture))
470 for packet in capture:
472 self.assertEqual(packet[IPv6].src, src_ip)
473 self.assertEqual(packet[IPv6].dst, dst_ip)
474 if packet.haslayer(TCP):
475 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
476 self.check_tcp_checksum(packet)
477 elif packet.haslayer(UDP):
478 self.assertEqual(packet[UDP].dport, self.udp_port_in)
479 self.check_udp_checksum(packet)
481 self.assertEqual(packet[ICMPv6EchoReply].id,
483 self.check_icmpv6_checksum(packet)
485 self.logger.error(ppp("Unexpected or invalid packet "
486 "(inside network):", packet))
489 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
491 Verify captured packet that don't have to be translated
493 :param capture: Captured packets
494 :param ingress_if: Ingress interface
495 :param egress_if: Egress interface
497 for packet in capture:
499 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
500 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
501 if packet.haslayer(TCP):
502 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
503 elif packet.haslayer(UDP):
504 self.assertEqual(packet[UDP].sport, self.udp_port_in)
506 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
508 self.logger.error(ppp("Unexpected or invalid packet "
509 "(inside network):", packet))
512 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
513 packet_num=3, icmp_type=11):
515 Verify captured packets with ICMP errors on outside network
517 :param capture: Captured packets
518 :param src_ip: Translated IP address or IP address of VPP
519 (Default use global NAT address)
520 :param packet_num: Expected number of packets (Default 3)
521 :param icmp_type: Type of error ICMP packet
522 we are expecting (Default 11)
525 src_ip = self.nat_addr
526 self.assertEqual(packet_num, len(capture))
527 for packet in capture:
529 self.assertEqual(packet[IP].src, src_ip)
530 self.assertTrue(packet.haslayer(ICMP))
532 self.assertEqual(icmp.type, icmp_type)
533 self.assertTrue(icmp.haslayer(IPerror))
534 inner_ip = icmp[IPerror]
535 if inner_ip.haslayer(TCPerror):
536 self.assertEqual(inner_ip[TCPerror].dport,
538 elif inner_ip.haslayer(UDPerror):
539 self.assertEqual(inner_ip[UDPerror].dport,
542 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
544 self.logger.error(ppp("Unexpected or invalid packet "
545 "(outside network):", packet))
548 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
551 Verify captured packets with ICMP errors on inside network
553 :param capture: Captured packets
554 :param in_if: Inside interface
555 :param packet_num: Expected number of packets (Default 3)
556 :param icmp_type: Type of error ICMP packet
557 we are expecting (Default 11)
559 self.assertEqual(packet_num, len(capture))
560 for packet in capture:
562 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
563 self.assertTrue(packet.haslayer(ICMP))
565 self.assertEqual(icmp.type, icmp_type)
566 self.assertTrue(icmp.haslayer(IPerror))
567 inner_ip = icmp[IPerror]
568 if inner_ip.haslayer(TCPerror):
569 self.assertEqual(inner_ip[TCPerror].sport,
571 elif inner_ip.haslayer(UDPerror):
572 self.assertEqual(inner_ip[UDPerror].sport,
575 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
577 self.logger.error(ppp("Unexpected or invalid packet "
578 "(inside network):", packet))
581 def create_stream_frag(self, src_if, dst, sport, dport, data):
583 Create fragmented packet stream
585 :param src_if: Source interface
586 :param dst: Destination IPv4 address
587 :param sport: Source TCP port
588 :param dport: Destination TCP port
589 :param data: Payload data
592 id = random.randint(0, 65535)
593 p = (IP(src=src_if.remote_ip4, dst=dst) /
594 TCP(sport=sport, dport=dport) /
596 p = p.__class__(str(p))
597 chksum = p['TCP'].chksum
599 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
600 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
601 TCP(sport=sport, dport=dport, chksum=chksum) /
604 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
605 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
606 proto=IP_PROTOS.tcp) /
609 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
610 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
616 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
617 pref=None, plen=0, frag_size=128):
619 Create fragmented packet stream
621 :param src_if: Source interface
622 :param dst: Destination IPv4 address
623 :param sport: Source TCP port
624 :param dport: Destination TCP port
625 :param data: Payload data
626 :param pref: NAT64 prefix
627 :param plen: NAT64 prefix length
628 :param fragsize: size of fragments
632 dst_ip6 = ''.join(['64:ff9b::', dst])
634 dst_ip6 = self.compose_ip6(dst, pref, plen)
636 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
637 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
638 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
639 TCP(sport=sport, dport=dport) /
642 return fragment6(p, frag_size)
644 def reass_frags_and_verify(self, frags, src, dst):
646 Reassemble and verify fragmented packet
648 :param frags: Captured fragments
649 :param src: Source IPv4 address to verify
650 :param dst: Destination IPv4 address to verify
652 :returns: Reassembled IPv4 packet
654 buffer = StringIO.StringIO()
656 self.assertEqual(p[IP].src, src)
657 self.assertEqual(p[IP].dst, dst)
658 self.check_ip_checksum(p)
659 buffer.seek(p[IP].frag * 8)
660 buffer.write(p[IP].payload)
661 ip = frags[0].getlayer(IP)
662 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
663 proto=frags[0][IP].proto)
664 if ip.proto == IP_PROTOS.tcp:
665 p = (ip / TCP(buffer.getvalue()))
666 self.check_tcp_checksum(p)
667 elif ip.proto == IP_PROTOS.udp:
668 p = (ip / UDP(buffer.getvalue()))
671 def reass_frags_and_verify_ip6(self, frags, src, dst):
673 Reassemble and verify fragmented packet
675 :param frags: Captured fragments
676 :param src: Source IPv6 address to verify
677 :param dst: Destination IPv6 address to verify
679 :returns: Reassembled IPv6 packet
681 buffer = StringIO.StringIO()
683 self.assertEqual(p[IPv6].src, src)
684 self.assertEqual(p[IPv6].dst, dst)
685 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
686 buffer.write(p[IPv6ExtHdrFragment].payload)
687 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
688 nh=frags[0][IPv6ExtHdrFragment].nh)
689 if ip.nh == IP_PROTOS.tcp:
690 p = (ip / TCP(buffer.getvalue()))
691 self.check_tcp_checksum(p)
692 elif ip.nh == IP_PROTOS.udp:
693 p = (ip / UDP(buffer.getvalue()))
696 def verify_ipfix_nat44_ses(self, data):
698 Verify IPFIX NAT44 session create/delete event
700 :param data: Decoded IPFIX data records
702 nat44_ses_create_num = 0
703 nat44_ses_delete_num = 0
704 self.assertEqual(6, len(data))
707 self.assertIn(ord(record[230]), [4, 5])
708 if ord(record[230]) == 4:
709 nat44_ses_create_num += 1
711 nat44_ses_delete_num += 1
713 self.assertEqual(self.pg0.remote_ip4n, record[8])
714 # postNATSourceIPv4Address
715 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
718 self.assertEqual(struct.pack("!I", 0), record[234])
719 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
720 if IP_PROTOS.icmp == ord(record[4]):
721 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
722 self.assertEqual(struct.pack("!H", self.icmp_id_out),
724 elif IP_PROTOS.tcp == ord(record[4]):
725 self.assertEqual(struct.pack("!H", self.tcp_port_in),
727 self.assertEqual(struct.pack("!H", self.tcp_port_out),
729 elif IP_PROTOS.udp == ord(record[4]):
730 self.assertEqual(struct.pack("!H", self.udp_port_in),
732 self.assertEqual(struct.pack("!H", self.udp_port_out),
735 self.fail("Invalid protocol")
736 self.assertEqual(3, nat44_ses_create_num)
737 self.assertEqual(3, nat44_ses_delete_num)
739 def verify_ipfix_addr_exhausted(self, data):
741 Verify IPFIX NAT addresses event
743 :param data: Decoded IPFIX data records
745 self.assertEqual(1, len(data))
748 self.assertEqual(ord(record[230]), 3)
750 self.assertEqual(struct.pack("!I", 0), record[283])
752 def verify_ipfix_max_sessions(self, data, limit):
754 Verify IPFIX maximum session entries exceeded event
756 :param data: Decoded IPFIX data records
757 :param limit: Number of maximum session entries that can be created.
759 self.assertEqual(1, len(data))
762 self.assertEqual(ord(record[230]), 13)
763 # natQuotaExceededEvent
764 self.assertEqual(struct.pack("I", 1), record[466])
766 self.assertEqual(struct.pack("I", limit), record[471])
768 def verify_ipfix_max_bibs(self, data, limit):
770 Verify IPFIX maximum BIB entries exceeded event
772 :param data: Decoded IPFIX data records
773 :param limit: Number of maximum BIB entries that can be created.
775 self.assertEqual(1, len(data))
778 self.assertEqual(ord(record[230]), 13)
779 # natQuotaExceededEvent
780 self.assertEqual(struct.pack("I", 2), record[466])
782 self.assertEqual(struct.pack("I", limit), record[472])
784 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
786 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
788 :param data: Decoded IPFIX data records
789 :param limit: Number of maximum fragments pending reassembly
790 :param src_addr: IPv6 source address
792 self.assertEqual(1, len(data))
795 self.assertEqual(ord(record[230]), 13)
796 # natQuotaExceededEvent
797 self.assertEqual(struct.pack("I", 5), record[466])
798 # maxFragmentsPendingReassembly
799 self.assertEqual(struct.pack("I", limit), record[475])
801 self.assertEqual(src_addr, record[27])
803 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
805 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
807 :param data: Decoded IPFIX data records
808 :param limit: Number of maximum fragments pending reassembly
809 :param src_addr: IPv4 source address
811 self.assertEqual(1, len(data))
814 self.assertEqual(ord(record[230]), 13)
815 # natQuotaExceededEvent
816 self.assertEqual(struct.pack("I", 5), record[466])
817 # maxFragmentsPendingReassembly
818 self.assertEqual(struct.pack("I", limit), record[475])
820 self.assertEqual(src_addr, record[8])
822 def verify_ipfix_bib(self, data, is_create, src_addr):
824 Verify IPFIX NAT64 BIB create and delete events
826 :param data: Decoded IPFIX data records
827 :param is_create: Create event if nonzero value otherwise delete event
828 :param src_addr: IPv6 source address
830 self.assertEqual(1, len(data))
834 self.assertEqual(ord(record[230]), 10)
836 self.assertEqual(ord(record[230]), 11)
838 self.assertEqual(src_addr, record[27])
839 # postNATSourceIPv4Address
840 self.assertEqual(self.nat_addr_n, record[225])
842 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
844 self.assertEqual(struct.pack("!I", 0), record[234])
845 # sourceTransportPort
846 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
847 # postNAPTSourceTransportPort
848 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
850 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
853 Verify IPFIX NAT64 session create and delete events
855 :param data: Decoded IPFIX data records
856 :param is_create: Create event if nonzero value otherwise delete event
857 :param src_addr: IPv6 source address
858 :param dst_addr: IPv4 destination address
859 :param dst_port: destination TCP port
861 self.assertEqual(1, len(data))
865 self.assertEqual(ord(record[230]), 6)
867 self.assertEqual(ord(record[230]), 7)
869 self.assertEqual(src_addr, record[27])
870 # destinationIPv6Address
871 self.assertEqual(socket.inet_pton(socket.AF_INET6,
872 self.compose_ip6(dst_addr,
876 # postNATSourceIPv4Address
877 self.assertEqual(self.nat_addr_n, record[225])
878 # postNATDestinationIPv4Address
879 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
882 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
884 self.assertEqual(struct.pack("!I", 0), record[234])
885 # sourceTransportPort
886 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
887 # postNAPTSourceTransportPort
888 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
889 # destinationTransportPort
890 self.assertEqual(struct.pack("!H", dst_port), record[11])
891 # postNAPTDestinationTransportPort
892 self.assertEqual(struct.pack("!H", dst_port), record[228])
895 class TestNAT44(MethodHolder):
896 """ NAT44 Test Cases """
900 super(TestNAT44, cls).setUpClass()
903 cls.tcp_port_in = 6303
904 cls.tcp_port_out = 6303
905 cls.udp_port_in = 6304
906 cls.udp_port_out = 6304
907 cls.icmp_id_in = 6305
908 cls.icmp_id_out = 6305
909 cls.nat_addr = '10.0.0.3'
910 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
911 cls.ipfix_src_port = 4739
912 cls.ipfix_domain_id = 1
914 cls.create_pg_interfaces(range(10))
915 cls.interfaces = list(cls.pg_interfaces[0:4])
917 for i in cls.interfaces:
922 cls.pg0.generate_remote_hosts(3)
923 cls.pg0.configure_ipv4_neighbors()
925 cls.pg1.generate_remote_hosts(1)
926 cls.pg1.configure_ipv4_neighbors()
928 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
929 cls.vapi.ip_table_add_del(10, is_add=1)
930 cls.vapi.ip_table_add_del(20, is_add=1)
932 cls.pg4._local_ip4 = "172.16.255.1"
933 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
934 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
935 cls.pg4.set_table_ip4(10)
936 cls.pg5._local_ip4 = "172.17.255.3"
937 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
938 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
939 cls.pg5.set_table_ip4(10)
940 cls.pg6._local_ip4 = "172.16.255.1"
941 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
942 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
943 cls.pg6.set_table_ip4(20)
944 for i in cls.overlapping_interfaces:
952 cls.pg9.generate_remote_hosts(2)
954 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
955 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
959 cls.pg9.resolve_arp()
960 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
961 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
962 cls.pg9.resolve_arp()
965 super(TestNAT44, cls).tearDownClass()
968 def clear_nat44(self):
970 Clear NAT44 configuration.
972 # I found no elegant way to do this
973 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
974 dst_address_length=32,
975 next_hop_address=self.pg7.remote_ip4n,
976 next_hop_sw_if_index=self.pg7.sw_if_index,
978 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
979 dst_address_length=32,
980 next_hop_address=self.pg8.remote_ip4n,
981 next_hop_sw_if_index=self.pg8.sw_if_index,
984 for intf in [self.pg7, self.pg8]:
985 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
987 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
992 if self.pg7.has_ip4_config:
993 self.pg7.unconfig_ip4()
995 self.vapi.nat44_forwarding_enable_disable(0)
997 interfaces = self.vapi.nat44_interface_addr_dump()
998 for intf in interfaces:
999 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
1000 twice_nat=intf.twice_nat,
1003 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
1004 domain_id=self.ipfix_domain_id)
1005 self.ipfix_src_port = 4739
1006 self.ipfix_domain_id = 1
1008 interfaces = self.vapi.nat44_interface_dump()
1009 for intf in interfaces:
1010 if intf.is_inside > 1:
1011 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1014 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1018 interfaces = self.vapi.nat44_interface_output_feature_dump()
1019 for intf in interfaces:
1020 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
1024 static_mappings = self.vapi.nat44_static_mapping_dump()
1025 for sm in static_mappings:
1026 self.vapi.nat44_add_del_static_mapping(
1027 sm.local_ip_address,
1028 sm.external_ip_address,
1029 local_port=sm.local_port,
1030 external_port=sm.external_port,
1031 addr_only=sm.addr_only,
1033 protocol=sm.protocol,
1034 twice_nat=sm.twice_nat,
1035 self_twice_nat=sm.self_twice_nat,
1036 out2in_only=sm.out2in_only,
1038 external_sw_if_index=sm.external_sw_if_index,
1041 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
1042 for lb_sm in lb_static_mappings:
1043 self.vapi.nat44_add_del_lb_static_mapping(
1044 lb_sm.external_addr,
1045 lb_sm.external_port,
1047 vrf_id=lb_sm.vrf_id,
1048 twice_nat=lb_sm.twice_nat,
1049 self_twice_nat=lb_sm.self_twice_nat,
1050 out2in_only=lb_sm.out2in_only,
1056 identity_mappings = self.vapi.nat44_identity_mapping_dump()
1057 for id_m in identity_mappings:
1058 self.vapi.nat44_add_del_identity_mapping(
1059 addr_only=id_m.addr_only,
1062 sw_if_index=id_m.sw_if_index,
1064 protocol=id_m.protocol,
1067 adresses = self.vapi.nat44_address_dump()
1068 for addr in adresses:
1069 self.vapi.nat44_add_del_address_range(addr.ip_address,
1071 twice_nat=addr.twice_nat,
1074 self.vapi.nat_set_reass()
1075 self.vapi.nat_set_reass(is_ip6=1)
1077 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
1078 local_port=0, external_port=0, vrf_id=0,
1079 is_add=1, external_sw_if_index=0xFFFFFFFF,
1080 proto=0, twice_nat=0, self_twice_nat=0,
1081 out2in_only=0, tag=""):
1083 Add/delete NAT44 static mapping
1085 :param local_ip: Local IP address
1086 :param external_ip: External IP address
1087 :param local_port: Local port number (Optional)
1088 :param external_port: External port number (Optional)
1089 :param vrf_id: VRF ID (Default 0)
1090 :param is_add: 1 if add, 0 if delete (Default add)
1091 :param external_sw_if_index: External interface instead of IP address
1092 :param proto: IP protocol (Mandatory if port specified)
1093 :param twice_nat: 1 if translate external host address and port
1094 :param self_twice_nat: 1 if translate external host address and port
1095 whenever external host address equals
1096 local address of internal host
1097 :param out2in_only: if 1 rule is matching only out2in direction
1098 :param tag: Opaque string tag
1101 if local_port and external_port:
1103 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
1104 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
1105 self.vapi.nat44_add_del_static_mapping(
1108 external_sw_if_index,
1120 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
1122 Add/delete NAT44 address
1124 :param ip: IP address
1125 :param is_add: 1 if add, 0 if delete (Default add)
1126 :param twice_nat: twice NAT address for extenal hosts
1128 nat_addr = socket.inet_pton(socket.AF_INET, ip)
1129 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
1131 twice_nat=twice_nat)
1133 def test_dynamic(self):
1134 """ NAT44 dynamic translation test """
1136 self.nat44_add_address(self.nat_addr)
1137 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1138 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1142 pkts = self.create_stream_in(self.pg0, self.pg1)
1143 self.pg0.add_stream(pkts)
1144 self.pg_enable_capture(self.pg_interfaces)
1146 capture = self.pg1.get_capture(len(pkts))
1147 self.verify_capture_out(capture)
1150 pkts = self.create_stream_out(self.pg1)
1151 self.pg1.add_stream(pkts)
1152 self.pg_enable_capture(self.pg_interfaces)
1154 capture = self.pg0.get_capture(len(pkts))
1155 self.verify_capture_in(capture, self.pg0)
1157 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1158 """ NAT44 handling of client packets with TTL=1 """
1160 self.nat44_add_address(self.nat_addr)
1161 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1162 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1165 # Client side - generate traffic
1166 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1167 self.pg0.add_stream(pkts)
1168 self.pg_enable_capture(self.pg_interfaces)
1171 # Client side - verify ICMP type 11 packets
1172 capture = self.pg0.get_capture(len(pkts))
1173 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1175 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1176 """ NAT44 handling of server packets with TTL=1 """
1178 self.nat44_add_address(self.nat_addr)
1179 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1180 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1183 # Client side - create sessions
1184 pkts = self.create_stream_in(self.pg0, self.pg1)
1185 self.pg0.add_stream(pkts)
1186 self.pg_enable_capture(self.pg_interfaces)
1189 # Server side - generate traffic
1190 capture = self.pg1.get_capture(len(pkts))
1191 self.verify_capture_out(capture)
1192 pkts = self.create_stream_out(self.pg1, ttl=1)
1193 self.pg1.add_stream(pkts)
1194 self.pg_enable_capture(self.pg_interfaces)
1197 # Server side - verify ICMP type 11 packets
1198 capture = self.pg1.get_capture(len(pkts))
1199 self.verify_capture_out_with_icmp_errors(capture,
1200 src_ip=self.pg1.local_ip4)
1202 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1203 """ NAT44 handling of error responses to client packets with TTL=2 """
1205 self.nat44_add_address(self.nat_addr)
1206 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1207 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1210 # Client side - generate traffic
1211 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1212 self.pg0.add_stream(pkts)
1213 self.pg_enable_capture(self.pg_interfaces)
1216 # Server side - simulate ICMP type 11 response
1217 capture = self.pg1.get_capture(len(pkts))
1218 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1219 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1220 ICMP(type=11) / packet[IP] for packet in capture]
1221 self.pg1.add_stream(pkts)
1222 self.pg_enable_capture(self.pg_interfaces)
1225 # Client side - verify ICMP type 11 packets
1226 capture = self.pg0.get_capture(len(pkts))
1227 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1229 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1230 """ NAT44 handling of error responses to server packets with TTL=2 """
1232 self.nat44_add_address(self.nat_addr)
1233 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1234 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1237 # Client side - create sessions
1238 pkts = self.create_stream_in(self.pg0, self.pg1)
1239 self.pg0.add_stream(pkts)
1240 self.pg_enable_capture(self.pg_interfaces)
1243 # Server side - generate traffic
1244 capture = self.pg1.get_capture(len(pkts))
1245 self.verify_capture_out(capture)
1246 pkts = self.create_stream_out(self.pg1, ttl=2)
1247 self.pg1.add_stream(pkts)
1248 self.pg_enable_capture(self.pg_interfaces)
1251 # Client side - simulate ICMP type 11 response
1252 capture = self.pg0.get_capture(len(pkts))
1253 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1254 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1255 ICMP(type=11) / packet[IP] for packet in capture]
1256 self.pg0.add_stream(pkts)
1257 self.pg_enable_capture(self.pg_interfaces)
1260 # Server side - verify ICMP type 11 packets
1261 capture = self.pg1.get_capture(len(pkts))
1262 self.verify_capture_out_with_icmp_errors(capture)
1264 def test_ping_out_interface_from_outside(self):
1265 """ Ping NAT44 out interface from outside network """
1267 self.nat44_add_address(self.nat_addr)
1268 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1269 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1272 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1273 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1274 ICMP(id=self.icmp_id_out, type='echo-request'))
1276 self.pg1.add_stream(pkts)
1277 self.pg_enable_capture(self.pg_interfaces)
1279 capture = self.pg1.get_capture(len(pkts))
1280 self.assertEqual(1, len(capture))
1283 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1284 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1285 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1286 self.assertEqual(packet[ICMP].type, 0) # echo reply
1288 self.logger.error(ppp("Unexpected or invalid packet "
1289 "(outside network):", packet))
1292 def test_ping_internal_host_from_outside(self):
1293 """ Ping internal host from outside network """
1295 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1296 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1297 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1301 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1302 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1303 ICMP(id=self.icmp_id_out, type='echo-request'))
1304 self.pg1.add_stream(pkt)
1305 self.pg_enable_capture(self.pg_interfaces)
1307 capture = self.pg0.get_capture(1)
1308 self.verify_capture_in(capture, self.pg0, packet_num=1)
1309 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1312 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1313 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1314 ICMP(id=self.icmp_id_in, type='echo-reply'))
1315 self.pg0.add_stream(pkt)
1316 self.pg_enable_capture(self.pg_interfaces)
1318 capture = self.pg1.get_capture(1)
1319 self.verify_capture_out(capture, same_port=True, packet_num=1)
1320 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1322 def test_forwarding(self):
1323 """ NAT44 forwarding test """
1325 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1326 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1328 self.vapi.nat44_forwarding_enable_disable(1)
1330 real_ip = self.pg0.remote_ip4n
1331 alias_ip = self.nat_addr_n
1332 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1333 external_ip=alias_ip)
1336 # in2out - static mapping match
1338 pkts = self.create_stream_out(self.pg1)
1339 self.pg1.add_stream(pkts)
1340 self.pg_enable_capture(self.pg_interfaces)
1342 capture = self.pg0.get_capture(len(pkts))
1343 self.verify_capture_in(capture, self.pg0)
1345 pkts = self.create_stream_in(self.pg0, self.pg1)
1346 self.pg0.add_stream(pkts)
1347 self.pg_enable_capture(self.pg_interfaces)
1349 capture = self.pg1.get_capture(len(pkts))
1350 self.verify_capture_out(capture, same_port=True)
1352 # in2out - no static mapping match
1354 host0 = self.pg0.remote_hosts[0]
1355 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1357 pkts = self.create_stream_out(self.pg1,
1358 dst_ip=self.pg0.remote_ip4,
1359 use_inside_ports=True)
1360 self.pg1.add_stream(pkts)
1361 self.pg_enable_capture(self.pg_interfaces)
1363 capture = self.pg0.get_capture(len(pkts))
1364 self.verify_capture_in(capture, self.pg0)
1366 pkts = self.create_stream_in(self.pg0, self.pg1)
1367 self.pg0.add_stream(pkts)
1368 self.pg_enable_capture(self.pg_interfaces)
1370 capture = self.pg1.get_capture(len(pkts))
1371 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1374 self.pg0.remote_hosts[0] = host0
1377 self.vapi.nat44_forwarding_enable_disable(0)
1378 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1379 external_ip=alias_ip,
1382 def test_static_in(self):
1383 """ 1:1 NAT initialized from inside network """
1385 nat_ip = "10.0.0.10"
1386 self.tcp_port_out = 6303
1387 self.udp_port_out = 6304
1388 self.icmp_id_out = 6305
1390 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1391 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1392 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1394 sm = self.vapi.nat44_static_mapping_dump()
1395 self.assertEqual(len(sm), 1)
1396 self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1397 self.assertEqual(sm[0].protocol, 0)
1398 self.assertEqual(sm[0].local_port, 0)
1399 self.assertEqual(sm[0].external_port, 0)
1402 pkts = self.create_stream_in(self.pg0, self.pg1)
1403 self.pg0.add_stream(pkts)
1404 self.pg_enable_capture(self.pg_interfaces)
1406 capture = self.pg1.get_capture(len(pkts))
1407 self.verify_capture_out(capture, nat_ip, True)
1410 pkts = self.create_stream_out(self.pg1, nat_ip)
1411 self.pg1.add_stream(pkts)
1412 self.pg_enable_capture(self.pg_interfaces)
1414 capture = self.pg0.get_capture(len(pkts))
1415 self.verify_capture_in(capture, self.pg0)
1417 def test_static_out(self):
1418 """ 1:1 NAT initialized from outside network """
1420 nat_ip = "10.0.0.20"
1421 self.tcp_port_out = 6303
1422 self.udp_port_out = 6304
1423 self.icmp_id_out = 6305
1426 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1427 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1428 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1430 sm = self.vapi.nat44_static_mapping_dump()
1431 self.assertEqual(len(sm), 1)
1432 self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1435 pkts = self.create_stream_out(self.pg1, nat_ip)
1436 self.pg1.add_stream(pkts)
1437 self.pg_enable_capture(self.pg_interfaces)
1439 capture = self.pg0.get_capture(len(pkts))
1440 self.verify_capture_in(capture, self.pg0)
1443 pkts = self.create_stream_in(self.pg0, self.pg1)
1444 self.pg0.add_stream(pkts)
1445 self.pg_enable_capture(self.pg_interfaces)
1447 capture = self.pg1.get_capture(len(pkts))
1448 self.verify_capture_out(capture, nat_ip, True)
1450 def test_static_with_port_in(self):
1451 """ 1:1 NAPT initialized from inside network """
1453 self.tcp_port_out = 3606
1454 self.udp_port_out = 3607
1455 self.icmp_id_out = 3608
1457 self.nat44_add_address(self.nat_addr)
1458 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1459 self.tcp_port_in, self.tcp_port_out,
1460 proto=IP_PROTOS.tcp)
1461 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1462 self.udp_port_in, self.udp_port_out,
1463 proto=IP_PROTOS.udp)
1464 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1465 self.icmp_id_in, self.icmp_id_out,
1466 proto=IP_PROTOS.icmp)
1467 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1468 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1472 pkts = self.create_stream_in(self.pg0, self.pg1)
1473 self.pg0.add_stream(pkts)
1474 self.pg_enable_capture(self.pg_interfaces)
1476 capture = self.pg1.get_capture(len(pkts))
1477 self.verify_capture_out(capture)
1480 pkts = self.create_stream_out(self.pg1)
1481 self.pg1.add_stream(pkts)
1482 self.pg_enable_capture(self.pg_interfaces)
1484 capture = self.pg0.get_capture(len(pkts))
1485 self.verify_capture_in(capture, self.pg0)
1487 def test_static_with_port_out(self):
1488 """ 1:1 NAPT initialized from outside network """
1490 self.tcp_port_out = 30606
1491 self.udp_port_out = 30607
1492 self.icmp_id_out = 30608
1494 self.nat44_add_address(self.nat_addr)
1495 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1496 self.tcp_port_in, self.tcp_port_out,
1497 proto=IP_PROTOS.tcp)
1498 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1499 self.udp_port_in, self.udp_port_out,
1500 proto=IP_PROTOS.udp)
1501 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1502 self.icmp_id_in, self.icmp_id_out,
1503 proto=IP_PROTOS.icmp)
1504 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1505 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1509 pkts = self.create_stream_out(self.pg1)
1510 self.pg1.add_stream(pkts)
1511 self.pg_enable_capture(self.pg_interfaces)
1513 capture = self.pg0.get_capture(len(pkts))
1514 self.verify_capture_in(capture, self.pg0)
1517 pkts = self.create_stream_in(self.pg0, self.pg1)
1518 self.pg0.add_stream(pkts)
1519 self.pg_enable_capture(self.pg_interfaces)
1521 capture = self.pg1.get_capture(len(pkts))
1522 self.verify_capture_out(capture)
1524 def test_static_with_port_out2(self):
1525 """ 1:1 NAPT symmetrical rule """
1530 self.vapi.nat44_forwarding_enable_disable(1)
1531 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1532 local_port, external_port,
1533 proto=IP_PROTOS.tcp, out2in_only=1)
1534 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1535 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1538 # from client to service
1539 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1540 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1541 TCP(sport=12345, dport=external_port))
1542 self.pg1.add_stream(p)
1543 self.pg_enable_capture(self.pg_interfaces)
1545 capture = self.pg0.get_capture(1)
1551 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1552 self.assertEqual(tcp.dport, local_port)
1553 self.check_tcp_checksum(p)
1554 self.check_ip_checksum(p)
1556 self.logger.error(ppp("Unexpected or invalid packet:", p))
1560 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1561 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1562 ICMP(type=11) / capture[0][IP])
1563 self.pg0.add_stream(p)
1564 self.pg_enable_capture(self.pg_interfaces)
1566 capture = self.pg1.get_capture(1)
1569 self.assertEqual(p[IP].src, self.nat_addr)
1571 self.assertEqual(inner.dst, self.nat_addr)
1572 self.assertEqual(inner[TCPerror].dport, external_port)
1574 self.logger.error(ppp("Unexpected or invalid packet:", p))
1577 # from service back to client
1578 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1579 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1580 TCP(sport=local_port, dport=12345))
1581 self.pg0.add_stream(p)
1582 self.pg_enable_capture(self.pg_interfaces)
1584 capture = self.pg1.get_capture(1)
1589 self.assertEqual(ip.src, self.nat_addr)
1590 self.assertEqual(tcp.sport, external_port)
1591 self.check_tcp_checksum(p)
1592 self.check_ip_checksum(p)
1594 self.logger.error(ppp("Unexpected or invalid packet:", p))
1598 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1599 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1600 ICMP(type=11) / capture[0][IP])
1601 self.pg1.add_stream(p)
1602 self.pg_enable_capture(self.pg_interfaces)
1604 capture = self.pg0.get_capture(1)
1607 self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
1609 self.assertEqual(inner.src, self.pg0.remote_ip4)
1610 self.assertEqual(inner[TCPerror].sport, local_port)
1612 self.logger.error(ppp("Unexpected or invalid packet:", p))
1615 # from client to server (no translation)
1616 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1617 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1618 TCP(sport=12346, dport=local_port))
1619 self.pg1.add_stream(p)
1620 self.pg_enable_capture(self.pg_interfaces)
1622 capture = self.pg0.get_capture(1)
1628 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1629 self.assertEqual(tcp.dport, local_port)
1630 self.check_tcp_checksum(p)
1631 self.check_ip_checksum(p)
1633 self.logger.error(ppp("Unexpected or invalid packet:", p))
1636 # from service back to client (no translation)
1637 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1638 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1639 TCP(sport=local_port, dport=12346))
1640 self.pg0.add_stream(p)
1641 self.pg_enable_capture(self.pg_interfaces)
1643 capture = self.pg1.get_capture(1)
1648 self.assertEqual(ip.src, self.pg0.remote_ip4)
1649 self.assertEqual(tcp.sport, local_port)
1650 self.check_tcp_checksum(p)
1651 self.check_ip_checksum(p)
1653 self.logger.error(ppp("Unexpected or invalid packet:", p))
1656 def test_static_vrf_aware(self):
1657 """ 1:1 NAT VRF awareness """
1659 nat_ip1 = "10.0.0.30"
1660 nat_ip2 = "10.0.0.40"
1661 self.tcp_port_out = 6303
1662 self.udp_port_out = 6304
1663 self.icmp_id_out = 6305
1665 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1667 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1669 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1671 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1672 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1674 # inside interface VRF match NAT44 static mapping VRF
1675 pkts = self.create_stream_in(self.pg4, self.pg3)
1676 self.pg4.add_stream(pkts)
1677 self.pg_enable_capture(self.pg_interfaces)
1679 capture = self.pg3.get_capture(len(pkts))
1680 self.verify_capture_out(capture, nat_ip1, True)
1682 # inside interface VRF don't match NAT44 static mapping VRF (packets
1684 pkts = self.create_stream_in(self.pg0, self.pg3)
1685 self.pg0.add_stream(pkts)
1686 self.pg_enable_capture(self.pg_interfaces)
1688 self.pg3.assert_nothing_captured()
1690 def test_dynamic_to_static(self):
1691 """ Switch from dynamic translation to 1:1NAT """
1692 nat_ip = "10.0.0.10"
1693 self.tcp_port_out = 6303
1694 self.udp_port_out = 6304
1695 self.icmp_id_out = 6305
1697 self.nat44_add_address(self.nat_addr)
1698 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1699 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1703 pkts = self.create_stream_in(self.pg0, self.pg1)
1704 self.pg0.add_stream(pkts)
1705 self.pg_enable_capture(self.pg_interfaces)
1707 capture = self.pg1.get_capture(len(pkts))
1708 self.verify_capture_out(capture)
1711 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1712 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
1713 self.assertEqual(len(sessions), 0)
1714 pkts = self.create_stream_in(self.pg0, self.pg1)
1715 self.pg0.add_stream(pkts)
1716 self.pg_enable_capture(self.pg_interfaces)
1718 capture = self.pg1.get_capture(len(pkts))
1719 self.verify_capture_out(capture, nat_ip, True)
1721 def test_identity_nat(self):
1722 """ Identity NAT """
1724 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1725 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1726 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1729 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1730 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1731 TCP(sport=12345, dport=56789))
1732 self.pg1.add_stream(p)
1733 self.pg_enable_capture(self.pg_interfaces)
1735 capture = self.pg0.get_capture(1)
1740 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1741 self.assertEqual(ip.src, self.pg1.remote_ip4)
1742 self.assertEqual(tcp.dport, 56789)
1743 self.assertEqual(tcp.sport, 12345)
1744 self.check_tcp_checksum(p)
1745 self.check_ip_checksum(p)
1747 self.logger.error(ppp("Unexpected or invalid packet:", p))
1750 def test_static_lb(self):
1751 """ NAT44 local service load balancing """
1752 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1755 server1 = self.pg0.remote_hosts[0]
1756 server2 = self.pg0.remote_hosts[1]
1758 locals = [{'addr': server1.ip4n,
1761 {'addr': server2.ip4n,
1765 self.nat44_add_address(self.nat_addr)
1766 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1769 local_num=len(locals),
1771 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1772 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1775 # from client to service
1776 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1777 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1778 TCP(sport=12345, dport=external_port))
1779 self.pg1.add_stream(p)
1780 self.pg_enable_capture(self.pg_interfaces)
1782 capture = self.pg0.get_capture(1)
1788 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1789 if ip.dst == server1.ip4:
1793 self.assertEqual(tcp.dport, local_port)
1794 self.check_tcp_checksum(p)
1795 self.check_ip_checksum(p)
1797 self.logger.error(ppp("Unexpected or invalid packet:", p))
1800 # from service back to client
1801 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1802 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1803 TCP(sport=local_port, dport=12345))
1804 self.pg0.add_stream(p)
1805 self.pg_enable_capture(self.pg_interfaces)
1807 capture = self.pg1.get_capture(1)
1812 self.assertEqual(ip.src, self.nat_addr)
1813 self.assertEqual(tcp.sport, external_port)
1814 self.check_tcp_checksum(p)
1815 self.check_ip_checksum(p)
1817 self.logger.error(ppp("Unexpected or invalid packet:", p))
1820 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1821 def test_static_lb_multi_clients(self):
1822 """ NAT44 local service load balancing - multiple clients"""
1824 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1827 server1 = self.pg0.remote_hosts[0]
1828 server2 = self.pg0.remote_hosts[1]
1830 locals = [{'addr': server1.ip4n,
1833 {'addr': server2.ip4n,
1837 self.nat44_add_address(self.nat_addr)
1838 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1841 local_num=len(locals),
1843 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1844 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1849 clients = ip4_range(self.pg1.remote_ip4, 10, 50)
1851 for client in clients:
1852 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1853 IP(src=client, dst=self.nat_addr) /
1854 TCP(sport=12345, dport=external_port))
1856 self.pg1.add_stream(pkts)
1857 self.pg_enable_capture(self.pg_interfaces)
1859 capture = self.pg0.get_capture(len(pkts))
1861 if p[IP].dst == server1.ip4:
1865 self.assertTrue(server1_n > server2_n)
1867 def test_static_lb_2(self):
1868 """ NAT44 local service load balancing (asymmetrical rule) """
1869 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1872 server1 = self.pg0.remote_hosts[0]
1873 server2 = self.pg0.remote_hosts[1]
1875 locals = [{'addr': server1.ip4n,
1878 {'addr': server2.ip4n,
1882 self.vapi.nat44_forwarding_enable_disable(1)
1883 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1887 local_num=len(locals),
1889 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1890 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1893 # from client to service
1894 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1895 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1896 TCP(sport=12345, dport=external_port))
1897 self.pg1.add_stream(p)
1898 self.pg_enable_capture(self.pg_interfaces)
1900 capture = self.pg0.get_capture(1)
1906 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1907 if ip.dst == server1.ip4:
1911 self.assertEqual(tcp.dport, local_port)
1912 self.check_tcp_checksum(p)
1913 self.check_ip_checksum(p)
1915 self.logger.error(ppp("Unexpected or invalid packet:", p))
1918 # from service back to client
1919 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1920 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1921 TCP(sport=local_port, dport=12345))
1922 self.pg0.add_stream(p)
1923 self.pg_enable_capture(self.pg_interfaces)
1925 capture = self.pg1.get_capture(1)
1930 self.assertEqual(ip.src, self.nat_addr)
1931 self.assertEqual(tcp.sport, external_port)
1932 self.check_tcp_checksum(p)
1933 self.check_ip_checksum(p)
1935 self.logger.error(ppp("Unexpected or invalid packet:", p))
1938 # from client to server (no translation)
1939 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1940 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
1941 TCP(sport=12346, dport=local_port))
1942 self.pg1.add_stream(p)
1943 self.pg_enable_capture(self.pg_interfaces)
1945 capture = self.pg0.get_capture(1)
1951 self.assertEqual(ip.dst, server1.ip4)
1952 self.assertEqual(tcp.dport, local_port)
1953 self.check_tcp_checksum(p)
1954 self.check_ip_checksum(p)
1956 self.logger.error(ppp("Unexpected or invalid packet:", p))
1959 # from service back to client (no translation)
1960 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
1961 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
1962 TCP(sport=local_port, dport=12346))
1963 self.pg0.add_stream(p)
1964 self.pg_enable_capture(self.pg_interfaces)
1966 capture = self.pg1.get_capture(1)
1971 self.assertEqual(ip.src, server1.ip4)
1972 self.assertEqual(tcp.sport, local_port)
1973 self.check_tcp_checksum(p)
1974 self.check_ip_checksum(p)
1976 self.logger.error(ppp("Unexpected or invalid packet:", p))
1979 def test_multiple_inside_interfaces(self):
1980 """ NAT44 multiple non-overlapping address space inside interfaces """
1982 self.nat44_add_address(self.nat_addr)
1983 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1984 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1985 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1988 # between two NAT44 inside interfaces (no translation)
1989 pkts = self.create_stream_in(self.pg0, self.pg1)
1990 self.pg0.add_stream(pkts)
1991 self.pg_enable_capture(self.pg_interfaces)
1993 capture = self.pg1.get_capture(len(pkts))
1994 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1996 # from NAT44 inside to interface without NAT44 feature (no translation)
1997 pkts = self.create_stream_in(self.pg0, self.pg2)
1998 self.pg0.add_stream(pkts)
1999 self.pg_enable_capture(self.pg_interfaces)
2001 capture = self.pg2.get_capture(len(pkts))
2002 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
2004 # in2out 1st interface
2005 pkts = self.create_stream_in(self.pg0, self.pg3)
2006 self.pg0.add_stream(pkts)
2007 self.pg_enable_capture(self.pg_interfaces)
2009 capture = self.pg3.get_capture(len(pkts))
2010 self.verify_capture_out(capture)
2012 # out2in 1st interface
2013 pkts = self.create_stream_out(self.pg3)
2014 self.pg3.add_stream(pkts)
2015 self.pg_enable_capture(self.pg_interfaces)
2017 capture = self.pg0.get_capture(len(pkts))
2018 self.verify_capture_in(capture, self.pg0)
2020 # in2out 2nd interface
2021 pkts = self.create_stream_in(self.pg1, self.pg3)
2022 self.pg1.add_stream(pkts)
2023 self.pg_enable_capture(self.pg_interfaces)
2025 capture = self.pg3.get_capture(len(pkts))
2026 self.verify_capture_out(capture)
2028 # out2in 2nd interface
2029 pkts = self.create_stream_out(self.pg3)
2030 self.pg3.add_stream(pkts)
2031 self.pg_enable_capture(self.pg_interfaces)
2033 capture = self.pg1.get_capture(len(pkts))
2034 self.verify_capture_in(capture, self.pg1)
2036 def test_inside_overlapping_interfaces(self):
2037 """ NAT44 multiple inside interfaces with overlapping address space """
2039 static_nat_ip = "10.0.0.10"
2040 self.nat44_add_address(self.nat_addr)
2041 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
2043 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
2044 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
2045 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
2046 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
2049 # between NAT44 inside interfaces with same VRF (no translation)
2050 pkts = self.create_stream_in(self.pg4, self.pg5)
2051 self.pg4.add_stream(pkts)
2052 self.pg_enable_capture(self.pg_interfaces)
2054 capture = self.pg5.get_capture(len(pkts))
2055 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
2057 # between NAT44 inside interfaces with different VRF (hairpinning)
2058 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
2059 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
2060 TCP(sport=1234, dport=5678))
2061 self.pg4.add_stream(p)
2062 self.pg_enable_capture(self.pg_interfaces)
2064 capture = self.pg6.get_capture(1)
2069 self.assertEqual(ip.src, self.nat_addr)
2070 self.assertEqual(ip.dst, self.pg6.remote_ip4)
2071 self.assertNotEqual(tcp.sport, 1234)
2072 self.assertEqual(tcp.dport, 5678)
2074 self.logger.error(ppp("Unexpected or invalid packet:", p))
2077 # in2out 1st interface
2078 pkts = self.create_stream_in(self.pg4, self.pg3)
2079 self.pg4.add_stream(pkts)
2080 self.pg_enable_capture(self.pg_interfaces)
2082 capture = self.pg3.get_capture(len(pkts))
2083 self.verify_capture_out(capture)
2085 # out2in 1st interface
2086 pkts = self.create_stream_out(self.pg3)
2087 self.pg3.add_stream(pkts)
2088 self.pg_enable_capture(self.pg_interfaces)
2090 capture = self.pg4.get_capture(len(pkts))
2091 self.verify_capture_in(capture, self.pg4)
2093 # in2out 2nd interface
2094 pkts = self.create_stream_in(self.pg5, self.pg3)
2095 self.pg5.add_stream(pkts)
2096 self.pg_enable_capture(self.pg_interfaces)
2098 capture = self.pg3.get_capture(len(pkts))
2099 self.verify_capture_out(capture)
2101 # out2in 2nd interface
2102 pkts = self.create_stream_out(self.pg3)
2103 self.pg3.add_stream(pkts)
2104 self.pg_enable_capture(self.pg_interfaces)
2106 capture = self.pg5.get_capture(len(pkts))
2107 self.verify_capture_in(capture, self.pg5)
2110 addresses = self.vapi.nat44_address_dump()
2111 self.assertEqual(len(addresses), 1)
2112 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
2113 self.assertEqual(len(sessions), 3)
2114 for session in sessions:
2115 self.assertFalse(session.is_static)
2116 self.assertEqual(session.inside_ip_address[0:4],
2117 self.pg5.remote_ip4n)
2118 self.assertEqual(session.outside_ip_address,
2119 addresses[0].ip_address)
2120 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
2121 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
2122 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
2123 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
2124 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
2125 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
2126 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
2127 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
2128 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
2130 # in2out 3rd interface
2131 pkts = self.create_stream_in(self.pg6, self.pg3)
2132 self.pg6.add_stream(pkts)
2133 self.pg_enable_capture(self.pg_interfaces)
2135 capture = self.pg3.get_capture(len(pkts))
2136 self.verify_capture_out(capture, static_nat_ip, True)
2138 # out2in 3rd interface
2139 pkts = self.create_stream_out(self.pg3, static_nat_ip)
2140 self.pg3.add_stream(pkts)
2141 self.pg_enable_capture(self.pg_interfaces)
2143 capture = self.pg6.get_capture(len(pkts))
2144 self.verify_capture_in(capture, self.pg6)
2146 # general user and session dump verifications
2147 users = self.vapi.nat44_user_dump()
2148 self.assertTrue(len(users) >= 3)
2149 addresses = self.vapi.nat44_address_dump()
2150 self.assertEqual(len(addresses), 1)
2152 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
2154 for session in sessions:
2155 self.assertEqual(user.ip_address, session.inside_ip_address)
2156 self.assertTrue(session.total_bytes > session.total_pkts > 0)
2157 self.assertTrue(session.protocol in
2158 [IP_PROTOS.tcp, IP_PROTOS.udp,
2162 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
2163 self.assertTrue(len(sessions) >= 4)
2164 for session in sessions:
2165 self.assertFalse(session.is_static)
2166 self.assertEqual(session.inside_ip_address[0:4],
2167 self.pg4.remote_ip4n)
2168 self.assertEqual(session.outside_ip_address,
2169 addresses[0].ip_address)
2172 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
2173 self.assertTrue(len(sessions) >= 3)
2174 for session in sessions:
2175 self.assertTrue(session.is_static)
2176 self.assertEqual(session.inside_ip_address[0:4],
2177 self.pg6.remote_ip4n)
2178 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
2179 map(int, static_nat_ip.split('.')))
2180 self.assertTrue(session.inside_port in
2181 [self.tcp_port_in, self.udp_port_in,
2184 def test_hairpinning(self):
2185 """ NAT44 hairpinning - 1:1 NAPT """
2187 host = self.pg0.remote_hosts[0]
2188 server = self.pg0.remote_hosts[1]
2191 server_in_port = 5678
2192 server_out_port = 8765
2194 self.nat44_add_address(self.nat_addr)
2195 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2196 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2198 # add static mapping for server
2199 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2200 server_in_port, server_out_port,
2201 proto=IP_PROTOS.tcp)
2203 # send packet from host to server
2204 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2205 IP(src=host.ip4, dst=self.nat_addr) /
2206 TCP(sport=host_in_port, dport=server_out_port))
2207 self.pg0.add_stream(p)
2208 self.pg_enable_capture(self.pg_interfaces)
2210 capture = self.pg0.get_capture(1)
2215 self.assertEqual(ip.src, self.nat_addr)
2216 self.assertEqual(ip.dst, server.ip4)
2217 self.assertNotEqual(tcp.sport, host_in_port)
2218 self.assertEqual(tcp.dport, server_in_port)
2219 self.check_tcp_checksum(p)
2220 host_out_port = tcp.sport
2222 self.logger.error(ppp("Unexpected or invalid packet:", p))
2225 # send reply from server to host
2226 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2227 IP(src=server.ip4, dst=self.nat_addr) /
2228 TCP(sport=server_in_port, dport=host_out_port))
2229 self.pg0.add_stream(p)
2230 self.pg_enable_capture(self.pg_interfaces)
2232 capture = self.pg0.get_capture(1)
2237 self.assertEqual(ip.src, self.nat_addr)
2238 self.assertEqual(ip.dst, host.ip4)
2239 self.assertEqual(tcp.sport, server_out_port)
2240 self.assertEqual(tcp.dport, host_in_port)
2241 self.check_tcp_checksum(p)
2243 self.logger.error(ppp("Unexpected or invalid packet:", p))
2246 def test_hairpinning2(self):
2247 """ NAT44 hairpinning - 1:1 NAT"""
2249 server1_nat_ip = "10.0.0.10"
2250 server2_nat_ip = "10.0.0.11"
2251 host = self.pg0.remote_hosts[0]
2252 server1 = self.pg0.remote_hosts[1]
2253 server2 = self.pg0.remote_hosts[2]
2254 server_tcp_port = 22
2255 server_udp_port = 20
2257 self.nat44_add_address(self.nat_addr)
2258 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2259 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2262 # add static mapping for servers
2263 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2264 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2268 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2269 IP(src=host.ip4, dst=server1_nat_ip) /
2270 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2272 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2273 IP(src=host.ip4, dst=server1_nat_ip) /
2274 UDP(sport=self.udp_port_in, dport=server_udp_port))
2276 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2277 IP(src=host.ip4, dst=server1_nat_ip) /
2278 ICMP(id=self.icmp_id_in, type='echo-request'))
2280 self.pg0.add_stream(pkts)
2281 self.pg_enable_capture(self.pg_interfaces)
2283 capture = self.pg0.get_capture(len(pkts))
2284 for packet in capture:
2286 self.assertEqual(packet[IP].src, self.nat_addr)
2287 self.assertEqual(packet[IP].dst, server1.ip4)
2288 if packet.haslayer(TCP):
2289 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2290 self.assertEqual(packet[TCP].dport, server_tcp_port)
2291 self.tcp_port_out = packet[TCP].sport
2292 self.check_tcp_checksum(packet)
2293 elif packet.haslayer(UDP):
2294 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2295 self.assertEqual(packet[UDP].dport, server_udp_port)
2296 self.udp_port_out = packet[UDP].sport
2298 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2299 self.icmp_id_out = packet[ICMP].id
2301 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2306 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2307 IP(src=server1.ip4, dst=self.nat_addr) /
2308 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2310 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2311 IP(src=server1.ip4, dst=self.nat_addr) /
2312 UDP(sport=server_udp_port, dport=self.udp_port_out))
2314 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2315 IP(src=server1.ip4, dst=self.nat_addr) /
2316 ICMP(id=self.icmp_id_out, type='echo-reply'))
2318 self.pg0.add_stream(pkts)
2319 self.pg_enable_capture(self.pg_interfaces)
2321 capture = self.pg0.get_capture(len(pkts))
2322 for packet in capture:
2324 self.assertEqual(packet[IP].src, server1_nat_ip)
2325 self.assertEqual(packet[IP].dst, host.ip4)
2326 if packet.haslayer(TCP):
2327 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2328 self.assertEqual(packet[TCP].sport, server_tcp_port)
2329 self.check_tcp_checksum(packet)
2330 elif packet.haslayer(UDP):
2331 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2332 self.assertEqual(packet[UDP].sport, server_udp_port)
2334 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2336 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2339 # server2 to server1
2341 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2342 IP(src=server2.ip4, dst=server1_nat_ip) /
2343 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2345 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2346 IP(src=server2.ip4, dst=server1_nat_ip) /
2347 UDP(sport=self.udp_port_in, dport=server_udp_port))
2349 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2350 IP(src=server2.ip4, dst=server1_nat_ip) /
2351 ICMP(id=self.icmp_id_in, type='echo-request'))
2353 self.pg0.add_stream(pkts)
2354 self.pg_enable_capture(self.pg_interfaces)
2356 capture = self.pg0.get_capture(len(pkts))
2357 for packet in capture:
2359 self.assertEqual(packet[IP].src, server2_nat_ip)
2360 self.assertEqual(packet[IP].dst, server1.ip4)
2361 if packet.haslayer(TCP):
2362 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2363 self.assertEqual(packet[TCP].dport, server_tcp_port)
2364 self.tcp_port_out = packet[TCP].sport
2365 self.check_tcp_checksum(packet)
2366 elif packet.haslayer(UDP):
2367 self.assertEqual(packet[UDP].sport, self.udp_port_in)
2368 self.assertEqual(packet[UDP].dport, server_udp_port)
2369 self.udp_port_out = packet[UDP].sport
2371 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2372 self.icmp_id_out = packet[ICMP].id
2374 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2377 # server1 to server2
2379 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2380 IP(src=server1.ip4, dst=server2_nat_ip) /
2381 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2383 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2384 IP(src=server1.ip4, dst=server2_nat_ip) /
2385 UDP(sport=server_udp_port, dport=self.udp_port_out))
2387 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2388 IP(src=server1.ip4, dst=server2_nat_ip) /
2389 ICMP(id=self.icmp_id_out, type='echo-reply'))
2391 self.pg0.add_stream(pkts)
2392 self.pg_enable_capture(self.pg_interfaces)
2394 capture = self.pg0.get_capture(len(pkts))
2395 for packet in capture:
2397 self.assertEqual(packet[IP].src, server1_nat_ip)
2398 self.assertEqual(packet[IP].dst, server2.ip4)
2399 if packet.haslayer(TCP):
2400 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2401 self.assertEqual(packet[TCP].sport, server_tcp_port)
2402 self.check_tcp_checksum(packet)
2403 elif packet.haslayer(UDP):
2404 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2405 self.assertEqual(packet[UDP].sport, server_udp_port)
2407 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2409 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2412 def test_max_translations_per_user(self):
2413 """ MAX translations per user - recycle the least recently used """
2415 self.nat44_add_address(self.nat_addr)
2416 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2417 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2420 # get maximum number of translations per user
2421 nat44_config = self.vapi.nat_show_config()
2423 # send more than maximum number of translations per user packets
2424 pkts_num = nat44_config.max_translations_per_user + 5
2426 for port in range(0, pkts_num):
2427 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2428 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2429 TCP(sport=1025 + port))
2431 self.pg0.add_stream(pkts)
2432 self.pg_enable_capture(self.pg_interfaces)
2435 # verify number of translated packet
2436 self.pg1.get_capture(pkts_num)
2438 def test_interface_addr(self):
2439 """ Acquire NAT44 addresses from interface """
2440 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2442 # no address in NAT pool
2443 adresses = self.vapi.nat44_address_dump()
2444 self.assertEqual(0, len(adresses))
2446 # configure interface address and check NAT address pool
2447 self.pg7.config_ip4()
2448 adresses = self.vapi.nat44_address_dump()
2449 self.assertEqual(1, len(adresses))
2450 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2452 # remove interface address and check NAT address pool
2453 self.pg7.unconfig_ip4()
2454 adresses = self.vapi.nat44_address_dump()
2455 self.assertEqual(0, len(adresses))
2457 def test_interface_addr_static_mapping(self):
2458 """ Static mapping with addresses from interface """
2461 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2462 self.nat44_add_static_mapping(
2464 external_sw_if_index=self.pg7.sw_if_index,
2467 # static mappings with external interface
2468 static_mappings = self.vapi.nat44_static_mapping_dump()
2469 self.assertEqual(1, len(static_mappings))
2470 self.assertEqual(self.pg7.sw_if_index,
2471 static_mappings[0].external_sw_if_index)
2472 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2474 # configure interface address and check static mappings
2475 self.pg7.config_ip4()
2476 static_mappings = self.vapi.nat44_static_mapping_dump()
2477 self.assertEqual(2, len(static_mappings))
2479 for sm in static_mappings:
2480 if sm.external_sw_if_index == 0xFFFFFFFF:
2481 self.assertEqual(sm.external_ip_address[0:4],
2482 self.pg7.local_ip4n)
2483 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2485 self.assertTrue(resolved)
2487 # remove interface address and check static mappings
2488 self.pg7.unconfig_ip4()
2489 static_mappings = self.vapi.nat44_static_mapping_dump()
2490 self.assertEqual(1, len(static_mappings))
2491 self.assertEqual(self.pg7.sw_if_index,
2492 static_mappings[0].external_sw_if_index)
2493 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2495 # configure interface address again and check static mappings
2496 self.pg7.config_ip4()
2497 static_mappings = self.vapi.nat44_static_mapping_dump()
2498 self.assertEqual(2, len(static_mappings))
2500 for sm in static_mappings:
2501 if sm.external_sw_if_index == 0xFFFFFFFF:
2502 self.assertEqual(sm.external_ip_address[0:4],
2503 self.pg7.local_ip4n)
2504 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2506 self.assertTrue(resolved)
2508 # remove static mapping
2509 self.nat44_add_static_mapping(
2511 external_sw_if_index=self.pg7.sw_if_index,
2514 static_mappings = self.vapi.nat44_static_mapping_dump()
2515 self.assertEqual(0, len(static_mappings))
2517 def test_interface_addr_identity_nat(self):
2518 """ Identity NAT with addresses from interface """
2521 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2522 self.vapi.nat44_add_del_identity_mapping(
2523 sw_if_index=self.pg7.sw_if_index,
2525 protocol=IP_PROTOS.tcp,
2528 # identity mappings with external interface
2529 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2530 self.assertEqual(1, len(identity_mappings))
2531 self.assertEqual(self.pg7.sw_if_index,
2532 identity_mappings[0].sw_if_index)
2534 # configure interface address and check identity mappings
2535 self.pg7.config_ip4()
2536 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2538 self.assertEqual(2, len(identity_mappings))
2539 for sm in identity_mappings:
2540 if sm.sw_if_index == 0xFFFFFFFF:
2541 self.assertEqual(identity_mappings[0].ip_address,
2542 self.pg7.local_ip4n)
2543 self.assertEqual(port, identity_mappings[0].port)
2544 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2546 self.assertTrue(resolved)
2548 # remove interface address and check identity mappings
2549 self.pg7.unconfig_ip4()
2550 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2551 self.assertEqual(1, len(identity_mappings))
2552 self.assertEqual(self.pg7.sw_if_index,
2553 identity_mappings[0].sw_if_index)
2555 def test_ipfix_nat44_sess(self):
2556 """ IPFIX logging NAT44 session created/delted """
2557 self.ipfix_domain_id = 10
2558 self.ipfix_src_port = 20202
2559 colector_port = 30303
2560 bind_layers(UDP, IPFIX, dport=30303)
2561 self.nat44_add_address(self.nat_addr)
2562 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2563 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2565 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2566 src_address=self.pg3.local_ip4n,
2568 template_interval=10,
2569 collector_port=colector_port)
2570 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2571 src_port=self.ipfix_src_port)
2573 pkts = self.create_stream_in(self.pg0, self.pg1)
2574 self.pg0.add_stream(pkts)
2575 self.pg_enable_capture(self.pg_interfaces)
2577 capture = self.pg1.get_capture(len(pkts))
2578 self.verify_capture_out(capture)
2579 self.nat44_add_address(self.nat_addr, is_add=0)
2580 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2581 capture = self.pg3.get_capture(9)
2582 ipfix = IPFIXDecoder()
2583 # first load template
2585 self.assertTrue(p.haslayer(IPFIX))
2586 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2587 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2588 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2589 self.assertEqual(p[UDP].dport, colector_port)
2590 self.assertEqual(p[IPFIX].observationDomainID,
2591 self.ipfix_domain_id)
2592 if p.haslayer(Template):
2593 ipfix.add_template(p.getlayer(Template))
2594 # verify events in data set
2596 if p.haslayer(Data):
2597 data = ipfix.decode_data_set(p.getlayer(Set))
2598 self.verify_ipfix_nat44_ses(data)
2600 def test_ipfix_addr_exhausted(self):
2601 """ IPFIX logging NAT addresses exhausted """
2602 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2603 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2605 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2606 src_address=self.pg3.local_ip4n,
2608 template_interval=10)
2609 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2610 src_port=self.ipfix_src_port)
2612 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2613 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2615 self.pg0.add_stream(p)
2616 self.pg_enable_capture(self.pg_interfaces)
2618 capture = self.pg1.get_capture(0)
2619 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2620 capture = self.pg3.get_capture(9)
2621 ipfix = IPFIXDecoder()
2622 # first load template
2624 self.assertTrue(p.haslayer(IPFIX))
2625 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2626 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2627 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2628 self.assertEqual(p[UDP].dport, 4739)
2629 self.assertEqual(p[IPFIX].observationDomainID,
2630 self.ipfix_domain_id)
2631 if p.haslayer(Template):
2632 ipfix.add_template(p.getlayer(Template))
2633 # verify events in data set
2635 if p.haslayer(Data):
2636 data = ipfix.decode_data_set(p.getlayer(Set))
2637 self.verify_ipfix_addr_exhausted(data)
2639 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2640 def test_ipfix_max_sessions(self):
2641 """ IPFIX logging maximum session entries exceeded """
2642 self.nat44_add_address(self.nat_addr)
2643 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2644 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2647 nat44_config = self.vapi.nat_show_config()
2648 max_sessions = 10 * nat44_config.translation_buckets
2651 for i in range(0, max_sessions):
2652 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2653 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2654 IP(src=src, dst=self.pg1.remote_ip4) /
2657 self.pg0.add_stream(pkts)
2658 self.pg_enable_capture(self.pg_interfaces)
2661 self.pg1.get_capture(max_sessions)
2662 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2663 src_address=self.pg3.local_ip4n,
2665 template_interval=10)
2666 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2667 src_port=self.ipfix_src_port)
2669 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2670 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2672 self.pg0.add_stream(p)
2673 self.pg_enable_capture(self.pg_interfaces)
2675 self.pg1.get_capture(0)
2676 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2677 capture = self.pg3.get_capture(9)
2678 ipfix = IPFIXDecoder()
2679 # first load template
2681 self.assertTrue(p.haslayer(IPFIX))
2682 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2683 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2684 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2685 self.assertEqual(p[UDP].dport, 4739)
2686 self.assertEqual(p[IPFIX].observationDomainID,
2687 self.ipfix_domain_id)
2688 if p.haslayer(Template):
2689 ipfix.add_template(p.getlayer(Template))
2690 # verify events in data set
2692 if p.haslayer(Data):
2693 data = ipfix.decode_data_set(p.getlayer(Set))
2694 self.verify_ipfix_max_sessions(data, max_sessions)
2696 def test_pool_addr_fib(self):
2697 """ NAT44 add pool addresses to FIB """
2698 static_addr = '10.0.0.10'
2699 self.nat44_add_address(self.nat_addr)
2700 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2701 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2703 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2706 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2707 ARP(op=ARP.who_has, pdst=self.nat_addr,
2708 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2709 self.pg1.add_stream(p)
2710 self.pg_enable_capture(self.pg_interfaces)
2712 capture = self.pg1.get_capture(1)
2713 self.assertTrue(capture[0].haslayer(ARP))
2714 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2717 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2718 ARP(op=ARP.who_has, pdst=static_addr,
2719 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2720 self.pg1.add_stream(p)
2721 self.pg_enable_capture(self.pg_interfaces)
2723 capture = self.pg1.get_capture(1)
2724 self.assertTrue(capture[0].haslayer(ARP))
2725 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2727 # send ARP to non-NAT44 interface
2728 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2729 ARP(op=ARP.who_has, pdst=self.nat_addr,
2730 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2731 self.pg2.add_stream(p)
2732 self.pg_enable_capture(self.pg_interfaces)
2734 capture = self.pg1.get_capture(0)
2736 # remove addresses and verify
2737 self.nat44_add_address(self.nat_addr, is_add=0)
2738 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2741 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2742 ARP(op=ARP.who_has, pdst=self.nat_addr,
2743 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2744 self.pg1.add_stream(p)
2745 self.pg_enable_capture(self.pg_interfaces)
2747 capture = self.pg1.get_capture(0)
2749 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2750 ARP(op=ARP.who_has, pdst=static_addr,
2751 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2752 self.pg1.add_stream(p)
2753 self.pg_enable_capture(self.pg_interfaces)
2755 capture = self.pg1.get_capture(0)
2757 def test_vrf_mode(self):
2758 """ NAT44 tenant VRF aware address pool mode """
2762 nat_ip1 = "10.0.0.10"
2763 nat_ip2 = "10.0.0.11"
2765 self.pg0.unconfig_ip4()
2766 self.pg1.unconfig_ip4()
2767 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2768 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2769 self.pg0.set_table_ip4(vrf_id1)
2770 self.pg1.set_table_ip4(vrf_id2)
2771 self.pg0.config_ip4()
2772 self.pg1.config_ip4()
2774 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2775 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2776 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2777 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2778 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2782 pkts = self.create_stream_in(self.pg0, self.pg2)
2783 self.pg0.add_stream(pkts)
2784 self.pg_enable_capture(self.pg_interfaces)
2786 capture = self.pg2.get_capture(len(pkts))
2787 self.verify_capture_out(capture, nat_ip1)
2790 pkts = self.create_stream_in(self.pg1, self.pg2)
2791 self.pg1.add_stream(pkts)
2792 self.pg_enable_capture(self.pg_interfaces)
2794 capture = self.pg2.get_capture(len(pkts))
2795 self.verify_capture_out(capture, nat_ip2)
2797 self.pg0.unconfig_ip4()
2798 self.pg1.unconfig_ip4()
2799 self.pg0.set_table_ip4(0)
2800 self.pg1.set_table_ip4(0)
2801 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2802 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2804 def test_vrf_feature_independent(self):
2805 """ NAT44 tenant VRF independent address pool mode """
2807 nat_ip1 = "10.0.0.10"
2808 nat_ip2 = "10.0.0.11"
2810 self.nat44_add_address(nat_ip1)
2811 self.nat44_add_address(nat_ip2, vrf_id=99)
2812 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2813 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2814 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2818 pkts = self.create_stream_in(self.pg0, self.pg2)
2819 self.pg0.add_stream(pkts)
2820 self.pg_enable_capture(self.pg_interfaces)
2822 capture = self.pg2.get_capture(len(pkts))
2823 self.verify_capture_out(capture, nat_ip1)
2826 pkts = self.create_stream_in(self.pg1, self.pg2)
2827 self.pg1.add_stream(pkts)
2828 self.pg_enable_capture(self.pg_interfaces)
2830 capture = self.pg2.get_capture(len(pkts))
2831 self.verify_capture_out(capture, nat_ip1)
2833 def test_dynamic_ipless_interfaces(self):
2834 """ NAT44 interfaces without configured IP address """
2836 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2837 mactobinary(self.pg7.remote_mac),
2838 self.pg7.remote_ip4n,
2840 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2841 mactobinary(self.pg8.remote_mac),
2842 self.pg8.remote_ip4n,
2845 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2846 dst_address_length=32,
2847 next_hop_address=self.pg7.remote_ip4n,
2848 next_hop_sw_if_index=self.pg7.sw_if_index)
2849 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2850 dst_address_length=32,
2851 next_hop_address=self.pg8.remote_ip4n,
2852 next_hop_sw_if_index=self.pg8.sw_if_index)
2854 self.nat44_add_address(self.nat_addr)
2855 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2856 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2860 pkts = self.create_stream_in(self.pg7, self.pg8)
2861 self.pg7.add_stream(pkts)
2862 self.pg_enable_capture(self.pg_interfaces)
2864 capture = self.pg8.get_capture(len(pkts))
2865 self.verify_capture_out(capture)
2868 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2869 self.pg8.add_stream(pkts)
2870 self.pg_enable_capture(self.pg_interfaces)
2872 capture = self.pg7.get_capture(len(pkts))
2873 self.verify_capture_in(capture, self.pg7)
2875 def test_static_ipless_interfaces(self):
2876 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2878 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2879 mactobinary(self.pg7.remote_mac),
2880 self.pg7.remote_ip4n,
2882 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2883 mactobinary(self.pg8.remote_mac),
2884 self.pg8.remote_ip4n,
2887 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2888 dst_address_length=32,
2889 next_hop_address=self.pg7.remote_ip4n,
2890 next_hop_sw_if_index=self.pg7.sw_if_index)
2891 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2892 dst_address_length=32,
2893 next_hop_address=self.pg8.remote_ip4n,
2894 next_hop_sw_if_index=self.pg8.sw_if_index)
2896 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2897 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2898 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2902 pkts = self.create_stream_out(self.pg8)
2903 self.pg8.add_stream(pkts)
2904 self.pg_enable_capture(self.pg_interfaces)
2906 capture = self.pg7.get_capture(len(pkts))
2907 self.verify_capture_in(capture, self.pg7)
2910 pkts = self.create_stream_in(self.pg7, self.pg8)
2911 self.pg7.add_stream(pkts)
2912 self.pg_enable_capture(self.pg_interfaces)
2914 capture = self.pg8.get_capture(len(pkts))
2915 self.verify_capture_out(capture, self.nat_addr, True)
2917 def test_static_with_port_ipless_interfaces(self):
2918 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2920 self.tcp_port_out = 30606
2921 self.udp_port_out = 30607
2922 self.icmp_id_out = 30608
2924 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2925 mactobinary(self.pg7.remote_mac),
2926 self.pg7.remote_ip4n,
2928 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2929 mactobinary(self.pg8.remote_mac),
2930 self.pg8.remote_ip4n,
2933 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2934 dst_address_length=32,
2935 next_hop_address=self.pg7.remote_ip4n,
2936 next_hop_sw_if_index=self.pg7.sw_if_index)
2937 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2938 dst_address_length=32,
2939 next_hop_address=self.pg8.remote_ip4n,
2940 next_hop_sw_if_index=self.pg8.sw_if_index)
2942 self.nat44_add_address(self.nat_addr)
2943 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2944 self.tcp_port_in, self.tcp_port_out,
2945 proto=IP_PROTOS.tcp)
2946 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2947 self.udp_port_in, self.udp_port_out,
2948 proto=IP_PROTOS.udp)
2949 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2950 self.icmp_id_in, self.icmp_id_out,
2951 proto=IP_PROTOS.icmp)
2952 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2953 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2957 pkts = self.create_stream_out(self.pg8)
2958 self.pg8.add_stream(pkts)
2959 self.pg_enable_capture(self.pg_interfaces)
2961 capture = self.pg7.get_capture(len(pkts))
2962 self.verify_capture_in(capture, self.pg7)
2965 pkts = self.create_stream_in(self.pg7, self.pg8)
2966 self.pg7.add_stream(pkts)
2967 self.pg_enable_capture(self.pg_interfaces)
2969 capture = self.pg8.get_capture(len(pkts))
2970 self.verify_capture_out(capture)
2972 def test_static_unknown_proto(self):
2973 """ 1:1 NAT translate packet with unknown protocol """
2974 nat_ip = "10.0.0.10"
2975 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2976 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2977 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2981 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2982 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2984 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2985 TCP(sport=1234, dport=1234))
2986 self.pg0.add_stream(p)
2987 self.pg_enable_capture(self.pg_interfaces)
2989 p = self.pg1.get_capture(1)
2992 self.assertEqual(packet[IP].src, nat_ip)
2993 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2994 self.assertTrue(packet.haslayer(GRE))
2995 self.check_ip_checksum(packet)
2997 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3001 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3002 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3004 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3005 TCP(sport=1234, dport=1234))
3006 self.pg1.add_stream(p)
3007 self.pg_enable_capture(self.pg_interfaces)
3009 p = self.pg0.get_capture(1)
3012 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3013 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3014 self.assertTrue(packet.haslayer(GRE))
3015 self.check_ip_checksum(packet)
3017 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3020 def test_hairpinning_static_unknown_proto(self):
3021 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
3023 host = self.pg0.remote_hosts[0]
3024 server = self.pg0.remote_hosts[1]
3026 host_nat_ip = "10.0.0.10"
3027 server_nat_ip = "10.0.0.11"
3029 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
3030 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3031 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3032 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3036 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3037 IP(src=host.ip4, dst=server_nat_ip) /
3039 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3040 TCP(sport=1234, dport=1234))
3041 self.pg0.add_stream(p)
3042 self.pg_enable_capture(self.pg_interfaces)
3044 p = self.pg0.get_capture(1)
3047 self.assertEqual(packet[IP].src, host_nat_ip)
3048 self.assertEqual(packet[IP].dst, server.ip4)
3049 self.assertTrue(packet.haslayer(GRE))
3050 self.check_ip_checksum(packet)
3052 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3056 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3057 IP(src=server.ip4, dst=host_nat_ip) /
3059 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3060 TCP(sport=1234, dport=1234))
3061 self.pg0.add_stream(p)
3062 self.pg_enable_capture(self.pg_interfaces)
3064 p = self.pg0.get_capture(1)
3067 self.assertEqual(packet[IP].src, server_nat_ip)
3068 self.assertEqual(packet[IP].dst, host.ip4)
3069 self.assertTrue(packet.haslayer(GRE))
3070 self.check_ip_checksum(packet)
3072 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3075 def test_unknown_proto(self):
3076 """ NAT44 translate packet with unknown protocol """
3077 self.nat44_add_address(self.nat_addr)
3078 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3079 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3083 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3084 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3085 TCP(sport=self.tcp_port_in, dport=20))
3086 self.pg0.add_stream(p)
3087 self.pg_enable_capture(self.pg_interfaces)
3089 p = self.pg1.get_capture(1)
3091 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3092 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3094 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3095 TCP(sport=1234, dport=1234))
3096 self.pg0.add_stream(p)
3097 self.pg_enable_capture(self.pg_interfaces)
3099 p = self.pg1.get_capture(1)
3102 self.assertEqual(packet[IP].src, self.nat_addr)
3103 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3104 self.assertTrue(packet.haslayer(GRE))
3105 self.check_ip_checksum(packet)
3107 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3111 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3112 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3114 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3115 TCP(sport=1234, dport=1234))
3116 self.pg1.add_stream(p)
3117 self.pg_enable_capture(self.pg_interfaces)
3119 p = self.pg0.get_capture(1)
3122 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3123 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3124 self.assertTrue(packet.haslayer(GRE))
3125 self.check_ip_checksum(packet)
3127 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3130 def test_hairpinning_unknown_proto(self):
3131 """ NAT44 translate packet with unknown protocol - hairpinning """
3132 host = self.pg0.remote_hosts[0]
3133 server = self.pg0.remote_hosts[1]
3136 server_in_port = 5678
3137 server_out_port = 8765
3138 server_nat_ip = "10.0.0.11"
3140 self.nat44_add_address(self.nat_addr)
3141 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3142 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3145 # add static mapping for server
3146 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3149 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3150 IP(src=host.ip4, dst=server_nat_ip) /
3151 TCP(sport=host_in_port, dport=server_out_port))
3152 self.pg0.add_stream(p)
3153 self.pg_enable_capture(self.pg_interfaces)
3155 capture = self.pg0.get_capture(1)
3157 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3158 IP(src=host.ip4, dst=server_nat_ip) /
3160 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3161 TCP(sport=1234, dport=1234))
3162 self.pg0.add_stream(p)
3163 self.pg_enable_capture(self.pg_interfaces)
3165 p = self.pg0.get_capture(1)
3168 self.assertEqual(packet[IP].src, self.nat_addr)
3169 self.assertEqual(packet[IP].dst, server.ip4)
3170 self.assertTrue(packet.haslayer(GRE))
3171 self.check_ip_checksum(packet)
3173 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3177 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3178 IP(src=server.ip4, dst=self.nat_addr) /
3180 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3181 TCP(sport=1234, dport=1234))
3182 self.pg0.add_stream(p)
3183 self.pg_enable_capture(self.pg_interfaces)
3185 p = self.pg0.get_capture(1)
3188 self.assertEqual(packet[IP].src, server_nat_ip)
3189 self.assertEqual(packet[IP].dst, host.ip4)
3190 self.assertTrue(packet.haslayer(GRE))
3191 self.check_ip_checksum(packet)
3193 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3196 def test_output_feature(self):
3197 """ NAT44 interface output feature (in2out postrouting) """
3198 self.nat44_add_address(self.nat_addr)
3199 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3200 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
3201 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3205 pkts = self.create_stream_in(self.pg0, self.pg3)
3206 self.pg0.add_stream(pkts)
3207 self.pg_enable_capture(self.pg_interfaces)
3209 capture = self.pg3.get_capture(len(pkts))
3210 self.verify_capture_out(capture)
3213 pkts = self.create_stream_out(self.pg3)
3214 self.pg3.add_stream(pkts)
3215 self.pg_enable_capture(self.pg_interfaces)
3217 capture = self.pg0.get_capture(len(pkts))
3218 self.verify_capture_in(capture, self.pg0)
3220 # from non-NAT interface to NAT inside interface
3221 pkts = self.create_stream_in(self.pg2, self.pg0)
3222 self.pg2.add_stream(pkts)
3223 self.pg_enable_capture(self.pg_interfaces)
3225 capture = self.pg0.get_capture(len(pkts))
3226 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3228 def test_output_feature_vrf_aware(self):
3229 """ NAT44 interface output feature VRF aware (in2out postrouting) """
3230 nat_ip_vrf10 = "10.0.0.10"
3231 nat_ip_vrf20 = "10.0.0.20"
3233 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3234 dst_address_length=32,
3235 next_hop_address=self.pg3.remote_ip4n,
3236 next_hop_sw_if_index=self.pg3.sw_if_index,
3238 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3239 dst_address_length=32,
3240 next_hop_address=self.pg3.remote_ip4n,
3241 next_hop_sw_if_index=self.pg3.sw_if_index,
3244 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3245 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3246 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
3247 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
3248 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3252 pkts = self.create_stream_in(self.pg4, self.pg3)
3253 self.pg4.add_stream(pkts)
3254 self.pg_enable_capture(self.pg_interfaces)
3256 capture = self.pg3.get_capture(len(pkts))
3257 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3260 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3261 self.pg3.add_stream(pkts)
3262 self.pg_enable_capture(self.pg_interfaces)
3264 capture = self.pg4.get_capture(len(pkts))
3265 self.verify_capture_in(capture, self.pg4)
3268 pkts = self.create_stream_in(self.pg6, self.pg3)
3269 self.pg6.add_stream(pkts)
3270 self.pg_enable_capture(self.pg_interfaces)
3272 capture = self.pg3.get_capture(len(pkts))
3273 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3276 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3277 self.pg3.add_stream(pkts)
3278 self.pg_enable_capture(self.pg_interfaces)
3280 capture = self.pg6.get_capture(len(pkts))
3281 self.verify_capture_in(capture, self.pg6)
3283 def test_output_feature_hairpinning(self):
3284 """ NAT44 interface output feature hairpinning (in2out postrouting) """
3285 host = self.pg0.remote_hosts[0]
3286 server = self.pg0.remote_hosts[1]
3289 server_in_port = 5678
3290 server_out_port = 8765
3292 self.nat44_add_address(self.nat_addr)
3293 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3294 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3297 # add static mapping for server
3298 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3299 server_in_port, server_out_port,
3300 proto=IP_PROTOS.tcp)
3302 # send packet from host to server
3303 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3304 IP(src=host.ip4, dst=self.nat_addr) /
3305 TCP(sport=host_in_port, dport=server_out_port))
3306 self.pg0.add_stream(p)
3307 self.pg_enable_capture(self.pg_interfaces)
3309 capture = self.pg0.get_capture(1)
3314 self.assertEqual(ip.src, self.nat_addr)
3315 self.assertEqual(ip.dst, server.ip4)
3316 self.assertNotEqual(tcp.sport, host_in_port)
3317 self.assertEqual(tcp.dport, server_in_port)
3318 self.check_tcp_checksum(p)
3319 host_out_port = tcp.sport
3321 self.logger.error(ppp("Unexpected or invalid packet:", p))
3324 # send reply from server to host
3325 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3326 IP(src=server.ip4, dst=self.nat_addr) /
3327 TCP(sport=server_in_port, dport=host_out_port))
3328 self.pg0.add_stream(p)
3329 self.pg_enable_capture(self.pg_interfaces)
3331 capture = self.pg0.get_capture(1)
3336 self.assertEqual(ip.src, self.nat_addr)
3337 self.assertEqual(ip.dst, host.ip4)
3338 self.assertEqual(tcp.sport, server_out_port)
3339 self.assertEqual(tcp.dport, host_in_port)
3340 self.check_tcp_checksum(p)
3342 self.logger.error(ppp("Unexpected or invalid packet:", p))
3345 def test_output_feature_and_service(self):
3346 """ NAT44 interface output feature and services """
3347 external_addr = '1.2.3.4'
3351 self.vapi.nat44_forwarding_enable_disable(1)
3352 self.nat44_add_address(self.nat_addr)
3353 self.vapi.nat44_add_del_identity_mapping(ip=self.pg1.remote_ip4n)
3354 self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
3355 local_port, external_port,
3356 proto=IP_PROTOS.tcp, out2in_only=1)
3357 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3358 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3360 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3363 # from client to service
3364 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3365 IP(src=self.pg1.remote_ip4, dst=external_addr) /
3366 TCP(sport=12345, dport=external_port))
3367 self.pg1.add_stream(p)
3368 self.pg_enable_capture(self.pg_interfaces)
3370 capture = self.pg0.get_capture(1)
3376 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3377 self.assertEqual(tcp.dport, local_port)
3378 self.check_tcp_checksum(p)
3379 self.check_ip_checksum(p)
3381 self.logger.error(ppp("Unexpected or invalid packet:", p))
3384 # from service back to client
3385 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3386 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3387 TCP(sport=local_port, dport=12345))
3388 self.pg0.add_stream(p)
3389 self.pg_enable_capture(self.pg_interfaces)
3391 capture = self.pg1.get_capture(1)
3396 self.assertEqual(ip.src, external_addr)
3397 self.assertEqual(tcp.sport, external_port)
3398 self.check_tcp_checksum(p)
3399 self.check_ip_checksum(p)
3401 self.logger.error(ppp("Unexpected or invalid packet:", p))
3404 # from local network host to external network
3405 pkts = self.create_stream_in(self.pg0, self.pg1)
3406 self.pg0.add_stream(pkts)
3407 self.pg_enable_capture(self.pg_interfaces)
3409 capture = self.pg1.get_capture(len(pkts))
3410 self.verify_capture_out(capture)
3411 pkts = self.create_stream_in(self.pg0, self.pg1)
3412 self.pg0.add_stream(pkts)
3413 self.pg_enable_capture(self.pg_interfaces)
3415 capture = self.pg1.get_capture(len(pkts))
3416 self.verify_capture_out(capture)
3418 # from external network back to local network host
3419 pkts = self.create_stream_out(self.pg1)
3420 self.pg1.add_stream(pkts)
3421 self.pg_enable_capture(self.pg_interfaces)
3423 capture = self.pg0.get_capture(len(pkts))
3424 self.verify_capture_in(capture, self.pg0)
3426 def test_output_feature_and_service2(self):
3427 """ NAT44 interface output feature and service host direct access """
3428 self.vapi.nat44_forwarding_enable_disable(1)
3429 self.nat44_add_address(self.nat_addr)
3430 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3433 # session initiaded from service host - translate
3434 pkts = self.create_stream_in(self.pg0, self.pg1)
3435 self.pg0.add_stream(pkts)
3436 self.pg_enable_capture(self.pg_interfaces)
3438 capture = self.pg1.get_capture(len(pkts))
3439 self.verify_capture_out(capture)
3441 pkts = self.create_stream_out(self.pg1)
3442 self.pg1.add_stream(pkts)
3443 self.pg_enable_capture(self.pg_interfaces)
3445 capture = self.pg0.get_capture(len(pkts))
3446 self.verify_capture_in(capture, self.pg0)
3448 tcp_port_out = self.tcp_port_out
3449 udp_port_out = self.udp_port_out
3450 icmp_id_out = self.icmp_id_out
3452 # session initiaded from remote host - do not translate
3453 pkts = self.create_stream_out(self.pg1,
3454 self.pg0.remote_ip4,
3455 use_inside_ports=True)
3456 self.pg1.add_stream(pkts)
3457 self.pg_enable_capture(self.pg_interfaces)
3459 capture = self.pg0.get_capture(len(pkts))
3460 self.verify_capture_in(capture, self.pg0)
3462 pkts = self.create_stream_in(self.pg0, self.pg1)
3463 self.pg0.add_stream(pkts)
3464 self.pg_enable_capture(self.pg_interfaces)
3466 capture = self.pg1.get_capture(len(pkts))
3467 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3470 def test_output_feature_and_service3(self):
3471 """ NAT44 interface output feature and DST NAT """
3472 external_addr = '1.2.3.4'
3476 self.vapi.nat44_forwarding_enable_disable(1)
3477 self.nat44_add_address(self.nat_addr)
3478 self.nat44_add_static_mapping(self.pg1.remote_ip4, external_addr,
3479 local_port, external_port,
3480 proto=IP_PROTOS.tcp, out2in_only=1)
3481 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3482 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3484 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3487 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3488 IP(src=self.pg0.remote_ip4, dst=external_addr) /
3489 TCP(sport=12345, dport=external_port))
3490 self.pg0.add_stream(p)
3491 self.pg_enable_capture(self.pg_interfaces)
3493 capture = self.pg1.get_capture(1)
3498 self.assertEqual(ip.src, self.pg0.remote_ip4)
3499 self.assertEqual(tcp.sport, 12345)
3500 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3501 self.assertEqual(tcp.dport, local_port)
3502 self.check_tcp_checksum(p)
3503 self.check_ip_checksum(p)
3505 self.logger.error(ppp("Unexpected or invalid packet:", p))
3508 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3509 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
3510 TCP(sport=local_port, dport=12345))
3511 self.pg1.add_stream(p)
3512 self.pg_enable_capture(self.pg_interfaces)
3514 capture = self.pg0.get_capture(1)
3519 self.assertEqual(ip.src, external_addr)
3520 self.assertEqual(tcp.sport, external_port)
3521 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3522 self.assertEqual(tcp.dport, 12345)
3523 self.check_tcp_checksum(p)
3524 self.check_ip_checksum(p)
3526 self.logger.error(ppp("Unexpected or invalid packet:", p))
3529 def test_one_armed_nat44(self):
3530 """ One armed NAT44 """
3531 remote_host = self.pg9.remote_hosts[0]
3532 local_host = self.pg9.remote_hosts[1]
3535 self.nat44_add_address(self.nat_addr)
3536 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3537 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3541 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3542 IP(src=local_host.ip4, dst=remote_host.ip4) /
3543 TCP(sport=12345, dport=80))
3544 self.pg9.add_stream(p)
3545 self.pg_enable_capture(self.pg_interfaces)
3547 capture = self.pg9.get_capture(1)
3552 self.assertEqual(ip.src, self.nat_addr)
3553 self.assertEqual(ip.dst, remote_host.ip4)
3554 self.assertNotEqual(tcp.sport, 12345)
3555 external_port = tcp.sport
3556 self.assertEqual(tcp.dport, 80)
3557 self.check_tcp_checksum(p)
3558 self.check_ip_checksum(p)
3560 self.logger.error(ppp("Unexpected or invalid packet:", p))
3564 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3565 IP(src=remote_host.ip4, dst=self.nat_addr) /
3566 TCP(sport=80, dport=external_port))
3567 self.pg9.add_stream(p)
3568 self.pg_enable_capture(self.pg_interfaces)
3570 capture = self.pg9.get_capture(1)
3575 self.assertEqual(ip.src, remote_host.ip4)
3576 self.assertEqual(ip.dst, local_host.ip4)
3577 self.assertEqual(tcp.sport, 80)
3578 self.assertEqual(tcp.dport, 12345)
3579 self.check_tcp_checksum(p)
3580 self.check_ip_checksum(p)
3582 self.logger.error(ppp("Unexpected or invalid packet:", p))
3585 def test_one_armed_nat44_static(self):
3586 """ One armed NAT44 and 1:1 NAPT symmetrical rule """
3587 remote_host = self.pg9.remote_hosts[0]
3588 local_host = self.pg9.remote_hosts[1]
3593 self.vapi.nat44_forwarding_enable_disable(1)
3594 self.nat44_add_address(self.nat_addr, twice_nat=1)
3595 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
3596 local_port, external_port,
3597 proto=IP_PROTOS.tcp, out2in_only=1,
3599 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3600 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3603 # from client to service
3604 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3605 IP(src=remote_host.ip4, dst=self.nat_addr) /
3606 TCP(sport=12345, dport=external_port))
3607 self.pg9.add_stream(p)
3608 self.pg_enable_capture(self.pg_interfaces)
3610 capture = self.pg9.get_capture(1)
3616 self.assertEqual(ip.dst, local_host.ip4)
3617 self.assertEqual(ip.src, self.nat_addr)
3618 self.assertEqual(tcp.dport, local_port)
3619 self.assertNotEqual(tcp.sport, 12345)
3620 eh_port_in = tcp.sport
3621 self.check_tcp_checksum(p)
3622 self.check_ip_checksum(p)
3624 self.logger.error(ppp("Unexpected or invalid packet:", p))
3627 # from service back to client
3628 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3629 IP(src=local_host.ip4, dst=self.nat_addr) /
3630 TCP(sport=local_port, dport=eh_port_in))
3631 self.pg9.add_stream(p)
3632 self.pg_enable_capture(self.pg_interfaces)
3634 capture = self.pg9.get_capture(1)
3639 self.assertEqual(ip.src, self.nat_addr)
3640 self.assertEqual(ip.dst, remote_host.ip4)
3641 self.assertEqual(tcp.sport, external_port)
3642 self.assertEqual(tcp.dport, 12345)
3643 self.check_tcp_checksum(p)
3644 self.check_ip_checksum(p)
3646 self.logger.error(ppp("Unexpected or invalid packet:", p))
3649 def test_del_session(self):
3650 """ Delete NAT44 session """
3651 self.nat44_add_address(self.nat_addr)
3652 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3653 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3656 pkts = self.create_stream_in(self.pg0, self.pg1)
3657 self.pg0.add_stream(pkts)
3658 self.pg_enable_capture(self.pg_interfaces)
3660 capture = self.pg1.get_capture(len(pkts))
3662 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3663 nsessions = len(sessions)
3665 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
3666 sessions[0].inside_port,
3667 sessions[0].protocol)
3668 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
3669 sessions[1].outside_port,
3670 sessions[1].protocol,
3673 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3674 self.assertEqual(nsessions - len(sessions), 2)
3676 def test_set_get_reass(self):
3677 """ NAT44 set/get virtual fragmentation reassembly """
3678 reas_cfg1 = self.vapi.nat_get_reass()
3680 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
3681 max_reass=reas_cfg1.ip4_max_reass * 2,
3682 max_frag=reas_cfg1.ip4_max_frag * 2)
3684 reas_cfg2 = self.vapi.nat_get_reass()
3686 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
3687 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
3688 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
3690 self.vapi.nat_set_reass(drop_frag=1)
3691 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
3693 def test_frag_in_order(self):
3694 """ NAT44 translate fragments arriving in order """
3695 self.nat44_add_address(self.nat_addr)
3696 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3697 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3700 data = "A" * 4 + "B" * 16 + "C" * 3
3701 self.tcp_port_in = random.randint(1025, 65535)
3703 reass = self.vapi.nat_reass_dump()
3704 reass_n_start = len(reass)
3707 pkts = self.create_stream_frag(self.pg0,
3708 self.pg1.remote_ip4,
3712 self.pg0.add_stream(pkts)
3713 self.pg_enable_capture(self.pg_interfaces)
3715 frags = self.pg1.get_capture(len(pkts))
3716 p = self.reass_frags_and_verify(frags,
3718 self.pg1.remote_ip4)
3719 self.assertEqual(p[TCP].dport, 20)
3720 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3721 self.tcp_port_out = p[TCP].sport
3722 self.assertEqual(data, p[Raw].load)
3725 pkts = self.create_stream_frag(self.pg1,
3730 self.pg1.add_stream(pkts)
3731 self.pg_enable_capture(self.pg_interfaces)
3733 frags = self.pg0.get_capture(len(pkts))
3734 p = self.reass_frags_and_verify(frags,
3735 self.pg1.remote_ip4,
3736 self.pg0.remote_ip4)
3737 self.assertEqual(p[TCP].sport, 20)
3738 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3739 self.assertEqual(data, p[Raw].load)
3741 reass = self.vapi.nat_reass_dump()
3742 reass_n_end = len(reass)
3744 self.assertEqual(reass_n_end - reass_n_start, 2)
3746 def test_reass_hairpinning(self):
3747 """ NAT44 fragments hairpinning """
3748 host = self.pg0.remote_hosts[0]
3749 server = self.pg0.remote_hosts[1]
3750 host_in_port = random.randint(1025, 65535)
3752 server_in_port = random.randint(1025, 65535)
3753 server_out_port = random.randint(1025, 65535)
3754 data = "A" * 4 + "B" * 16 + "C" * 3
3756 self.nat44_add_address(self.nat_addr)
3757 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3758 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3760 # add static mapping for server
3761 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3762 server_in_port, server_out_port,
3763 proto=IP_PROTOS.tcp)
3765 # send packet from host to server
3766 pkts = self.create_stream_frag(self.pg0,
3771 self.pg0.add_stream(pkts)
3772 self.pg_enable_capture(self.pg_interfaces)
3774 frags = self.pg0.get_capture(len(pkts))
3775 p = self.reass_frags_and_verify(frags,
3778 self.assertNotEqual(p[TCP].sport, host_in_port)
3779 self.assertEqual(p[TCP].dport, server_in_port)
3780 self.assertEqual(data, p[Raw].load)
3782 def test_frag_out_of_order(self):
3783 """ NAT44 translate fragments arriving out of order """
3784 self.nat44_add_address(self.nat_addr)
3785 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3786 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3789 data = "A" * 4 + "B" * 16 + "C" * 3
3790 random.randint(1025, 65535)
3793 pkts = self.create_stream_frag(self.pg0,
3794 self.pg1.remote_ip4,
3799 self.pg0.add_stream(pkts)
3800 self.pg_enable_capture(self.pg_interfaces)
3802 frags = self.pg1.get_capture(len(pkts))
3803 p = self.reass_frags_and_verify(frags,
3805 self.pg1.remote_ip4)
3806 self.assertEqual(p[TCP].dport, 20)
3807 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3808 self.tcp_port_out = p[TCP].sport
3809 self.assertEqual(data, p[Raw].load)
3812 pkts = self.create_stream_frag(self.pg1,
3818 self.pg1.add_stream(pkts)
3819 self.pg_enable_capture(self.pg_interfaces)
3821 frags = self.pg0.get_capture(len(pkts))
3822 p = self.reass_frags_and_verify(frags,
3823 self.pg1.remote_ip4,
3824 self.pg0.remote_ip4)
3825 self.assertEqual(p[TCP].sport, 20)
3826 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3827 self.assertEqual(data, p[Raw].load)
3829 def test_port_restricted(self):
3830 """ Port restricted NAT44 (MAP-E CE) """
3831 self.nat44_add_address(self.nat_addr)
3832 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3833 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3835 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3836 "psid-offset 6 psid-len 6")
3838 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3839 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3840 TCP(sport=4567, dport=22))
3841 self.pg0.add_stream(p)
3842 self.pg_enable_capture(self.pg_interfaces)
3844 capture = self.pg1.get_capture(1)
3849 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3850 self.assertEqual(ip.src, self.nat_addr)
3851 self.assertEqual(tcp.dport, 22)
3852 self.assertNotEqual(tcp.sport, 4567)
3853 self.assertEqual((tcp.sport >> 6) & 63, 10)
3854 self.check_tcp_checksum(p)
3855 self.check_ip_checksum(p)
3857 self.logger.error(ppp("Unexpected or invalid packet:", p))
3860 def twice_nat_common(self, self_twice_nat=False, same_pg=False, lb=False,
3862 twice_nat_addr = '10.0.1.3'
3870 port_in1 = port_in+1
3871 port_in2 = port_in+2
3876 server1 = self.pg0.remote_hosts[0]
3877 server2 = self.pg0.remote_hosts[1]
3889 eh_translate = ((not self_twice_nat) or (not lb and same_pg) or
3892 self.nat44_add_address(self.nat_addr)
3893 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3895 self.nat44_add_static_mapping(pg0.remote_ip4, self.nat_addr,
3897 proto=IP_PROTOS.tcp,
3898 twice_nat=int(not self_twice_nat),
3899 self_twice_nat=int(self_twice_nat))
3901 locals = [{'addr': server1.ip4n,
3904 {'addr': server2.ip4n,
3907 out_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3908 self.vapi.nat44_add_del_lb_static_mapping(out_addr_n,
3912 not self_twice_nat),
3915 local_num=len(locals),
3917 self.vapi.nat44_interface_add_del_feature(pg0.sw_if_index)
3918 self.vapi.nat44_interface_add_del_feature(pg1.sw_if_index,
3925 assert client_id is not None
3927 client = self.pg0.remote_hosts[0]
3928 elif client_id == 2:
3929 client = self.pg0.remote_hosts[1]
3931 client = pg1.remote_hosts[0]
3932 p = (Ether(src=pg1.remote_mac, dst=pg1.local_mac) /
3933 IP(src=client.ip4, dst=self.nat_addr) /
3934 TCP(sport=eh_port_out, dport=port_out))
3936 self.pg_enable_capture(self.pg_interfaces)
3938 capture = pg0.get_capture(1)
3944 if ip.dst == server1.ip4:
3950 self.assertEqual(ip.dst, server.ip4)
3952 self.assertIn(tcp.dport, [port_in1, port_in2])
3954 self.assertEqual(tcp.dport, port_in)
3956 self.assertEqual(ip.src, twice_nat_addr)
3957 self.assertNotEqual(tcp.sport, eh_port_out)
3959 self.assertEqual(ip.src, client.ip4)
3960 self.assertEqual(tcp.sport, eh_port_out)
3962 eh_port_in = tcp.sport
3963 saved_port_in = tcp.dport
3964 self.check_tcp_checksum(p)
3965 self.check_ip_checksum(p)
3967 self.logger.error(ppp("Unexpected or invalid packet:", p))
3970 p = (Ether(src=server.mac, dst=pg0.local_mac) /
3971 IP(src=server.ip4, dst=eh_addr_in) /
3972 TCP(sport=saved_port_in, dport=eh_port_in))
3974 self.pg_enable_capture(self.pg_interfaces)
3976 capture = pg1.get_capture(1)
3981 self.assertEqual(ip.dst, client.ip4)
3982 self.assertEqual(ip.src, self.nat_addr)
3983 self.assertEqual(tcp.dport, eh_port_out)
3984 self.assertEqual(tcp.sport, port_out)
3985 self.check_tcp_checksum(p)
3986 self.check_ip_checksum(p)
3988 self.logger.error(ppp("Unexpected or invalid packet:", p))
3991 def test_twice_nat(self):
3993 self.twice_nat_common()
3995 def test_self_twice_nat_positive(self):
3996 """ Self Twice NAT44 (positive test) """
3997 self.twice_nat_common(self_twice_nat=True, same_pg=True)
3999 def test_self_twice_nat_negative(self):
4000 """ Self Twice NAT44 (negative test) """
4001 self.twice_nat_common(self_twice_nat=True)
4003 def test_twice_nat_lb(self):
4004 """ Twice NAT44 local service load balancing """
4005 self.twice_nat_common(lb=True)
4007 def test_self_twice_nat_lb_positive(self):
4008 """ Self Twice NAT44 local service load balancing (positive test) """
4009 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4012 def test_self_twice_nat_lb_negative(self):
4013 """ Self Twice NAT44 local service load balancing (negative test) """
4014 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4017 def test_twice_nat_interface_addr(self):
4018 """ Acquire twice NAT44 addresses from interface """
4019 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
4021 # no address in NAT pool
4022 adresses = self.vapi.nat44_address_dump()
4023 self.assertEqual(0, len(adresses))
4025 # configure interface address and check NAT address pool
4026 self.pg7.config_ip4()
4027 adresses = self.vapi.nat44_address_dump()
4028 self.assertEqual(1, len(adresses))
4029 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
4030 self.assertEqual(adresses[0].twice_nat, 1)
4032 # remove interface address and check NAT address pool
4033 self.pg7.unconfig_ip4()
4034 adresses = self.vapi.nat44_address_dump()
4035 self.assertEqual(0, len(adresses))
4037 def test_ipfix_max_frags(self):
4038 """ IPFIX logging maximum fragments pending reassembly exceeded """
4039 self.nat44_add_address(self.nat_addr)
4040 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4041 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4043 self.vapi.nat_set_reass(max_frag=0)
4044 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
4045 src_address=self.pg3.local_ip4n,
4047 template_interval=10)
4048 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
4049 src_port=self.ipfix_src_port)
4051 data = "A" * 4 + "B" * 16 + "C" * 3
4052 self.tcp_port_in = random.randint(1025, 65535)
4053 pkts = self.create_stream_frag(self.pg0,
4054 self.pg1.remote_ip4,
4058 self.pg0.add_stream(pkts[-1])
4059 self.pg_enable_capture(self.pg_interfaces)
4061 frags = self.pg1.get_capture(0)
4062 self.vapi.cli("ipfix flush") # FIXME this should be an API call
4063 capture = self.pg3.get_capture(9)
4064 ipfix = IPFIXDecoder()
4065 # first load template
4067 self.assertTrue(p.haslayer(IPFIX))
4068 self.assertEqual(p[IP].src, self.pg3.local_ip4)
4069 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
4070 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
4071 self.assertEqual(p[UDP].dport, 4739)
4072 self.assertEqual(p[IPFIX].observationDomainID,
4073 self.ipfix_domain_id)
4074 if p.haslayer(Template):
4075 ipfix.add_template(p.getlayer(Template))
4076 # verify events in data set
4078 if p.haslayer(Data):
4079 data = ipfix.decode_data_set(p.getlayer(Set))
4080 self.verify_ipfix_max_fragments_ip4(data, 0,
4081 self.pg0.remote_ip4n)
4084 super(TestNAT44, self).tearDown()
4085 if not self.vpp_dead:
4086 self.logger.info(self.vapi.cli("show nat44 addresses"))
4087 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4088 self.logger.info(self.vapi.cli("show nat44 static mappings"))
4089 self.logger.info(self.vapi.cli("show nat44 interface address"))
4090 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
4091 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
4092 self.vapi.cli("nat addr-port-assignment-alg default")
4096 class TestNAT44Out2InDPO(MethodHolder):
4097 """ NAT44 Test Cases using out2in DPO """
4100 def setUpConstants(cls):
4101 super(TestNAT44Out2InDPO, cls).setUpConstants()
4102 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
4105 def setUpClass(cls):
4106 super(TestNAT44Out2InDPO, cls).setUpClass()
4109 cls.tcp_port_in = 6303
4110 cls.tcp_port_out = 6303
4111 cls.udp_port_in = 6304
4112 cls.udp_port_out = 6304
4113 cls.icmp_id_in = 6305
4114 cls.icmp_id_out = 6305
4115 cls.nat_addr = '10.0.0.3'
4116 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4117 cls.dst_ip4 = '192.168.70.1'
4119 cls.create_pg_interfaces(range(2))
4122 cls.pg0.config_ip4()
4123 cls.pg0.resolve_arp()
4126 cls.pg1.config_ip6()
4127 cls.pg1.resolve_ndp()
4129 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
4130 dst_address_length=0,
4131 next_hop_address=cls.pg1.remote_ip6n,
4132 next_hop_sw_if_index=cls.pg1.sw_if_index)
4135 super(TestNAT44Out2InDPO, cls).tearDownClass()
4138 def configure_xlat(self):
4139 self.dst_ip6_pfx = '1:2:3::'
4140 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4142 self.dst_ip6_pfx_len = 96
4143 self.src_ip6_pfx = '4:5:6::'
4144 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4146 self.src_ip6_pfx_len = 96
4147 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
4148 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
4149 '\x00\x00\x00\x00', 0, is_translation=1,
4152 def test_464xlat_ce(self):
4153 """ Test 464XLAT CE with NAT44 """
4155 self.configure_xlat()
4157 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4158 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
4160 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4161 self.dst_ip6_pfx_len)
4162 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
4163 self.src_ip6_pfx_len)
4166 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4167 self.pg0.add_stream(pkts)
4168 self.pg_enable_capture(self.pg_interfaces)
4170 capture = self.pg1.get_capture(len(pkts))
4171 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
4174 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
4176 self.pg1.add_stream(pkts)
4177 self.pg_enable_capture(self.pg_interfaces)
4179 capture = self.pg0.get_capture(len(pkts))
4180 self.verify_capture_in(capture, self.pg0)
4182 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4184 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
4185 self.nat_addr_n, is_add=0)
4187 def test_464xlat_ce_no_nat(self):
4188 """ Test 464XLAT CE without NAT44 """
4190 self.configure_xlat()
4192 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4193 self.dst_ip6_pfx_len)
4194 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
4195 self.src_ip6_pfx_len)
4197 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4198 self.pg0.add_stream(pkts)
4199 self.pg_enable_capture(self.pg_interfaces)
4201 capture = self.pg1.get_capture(len(pkts))
4202 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
4203 nat_ip=out_dst_ip6, same_port=True)
4205 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4206 self.pg1.add_stream(pkts)
4207 self.pg_enable_capture(self.pg_interfaces)
4209 capture = self.pg0.get_capture(len(pkts))
4210 self.verify_capture_in(capture, self.pg0)
4213 class TestDeterministicNAT(MethodHolder):
4214 """ Deterministic NAT Test Cases """
4217 def setUpConstants(cls):
4218 super(TestDeterministicNAT, cls).setUpConstants()
4219 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
4222 def setUpClass(cls):
4223 super(TestDeterministicNAT, cls).setUpClass()
4226 cls.tcp_port_in = 6303
4227 cls.tcp_external_port = 6303
4228 cls.udp_port_in = 6304
4229 cls.udp_external_port = 6304
4230 cls.icmp_id_in = 6305
4231 cls.nat_addr = '10.0.0.3'
4233 cls.create_pg_interfaces(range(3))
4234 cls.interfaces = list(cls.pg_interfaces)
4236 for i in cls.interfaces:
4241 cls.pg0.generate_remote_hosts(2)
4242 cls.pg0.configure_ipv4_neighbors()
4245 super(TestDeterministicNAT, cls).tearDownClass()
4248 def create_stream_in(self, in_if, out_if, ttl=64):
4250 Create packet stream for inside network
4252 :param in_if: Inside interface
4253 :param out_if: Outside interface
4254 :param ttl: TTL of generated packets
4258 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4259 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4260 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
4264 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4265 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4266 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
4270 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4271 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4272 ICMP(id=self.icmp_id_in, type='echo-request'))
4277 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
4279 Create packet stream for outside network
4281 :param out_if: Outside interface
4282 :param dst_ip: Destination IP address (Default use global NAT address)
4283 :param ttl: TTL of generated packets
4286 dst_ip = self.nat_addr
4289 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4290 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4291 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
4295 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4296 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4297 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
4301 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4302 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4303 ICMP(id=self.icmp_external_id, type='echo-reply'))
4308 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
4310 Verify captured packets on outside network
4312 :param capture: Captured packets
4313 :param nat_ip: Translated IP address (Default use global NAT address)
4314 :param same_port: Sorce port number is not translated (Default False)
4315 :param packet_num: Expected number of packets (Default 3)
4318 nat_ip = self.nat_addr
4319 self.assertEqual(packet_num, len(capture))
4320 for packet in capture:
4322 self.assertEqual(packet[IP].src, nat_ip)
4323 if packet.haslayer(TCP):
4324 self.tcp_port_out = packet[TCP].sport
4325 elif packet.haslayer(UDP):
4326 self.udp_port_out = packet[UDP].sport
4328 self.icmp_external_id = packet[ICMP].id
4330 self.logger.error(ppp("Unexpected or invalid packet "
4331 "(outside network):", packet))
4334 def initiate_tcp_session(self, in_if, out_if):
4336 Initiates TCP session
4338 :param in_if: Inside interface
4339 :param out_if: Outside interface
4342 # SYN packet in->out
4343 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4344 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4345 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4348 self.pg_enable_capture(self.pg_interfaces)
4350 capture = out_if.get_capture(1)
4352 self.tcp_port_out = p[TCP].sport
4354 # SYN + ACK packet out->in
4355 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
4356 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
4357 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4359 out_if.add_stream(p)
4360 self.pg_enable_capture(self.pg_interfaces)
4362 in_if.get_capture(1)
4364 # ACK packet in->out
4365 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4366 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4367 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4370 self.pg_enable_capture(self.pg_interfaces)
4372 out_if.get_capture(1)
4375 self.logger.error("TCP 3 way handshake failed")
4378 def verify_ipfix_max_entries_per_user(self, data):
4380 Verify IPFIX maximum entries per user exceeded event
4382 :param data: Decoded IPFIX data records
4384 self.assertEqual(1, len(data))
4387 self.assertEqual(ord(record[230]), 13)
4388 # natQuotaExceededEvent
4389 self.assertEqual('\x03\x00\x00\x00', record[466])
4391 self.assertEqual('\xe8\x03\x00\x00', record[473])
4393 self.assertEqual(self.pg0.remote_ip4n, record[8])
4395 def test_deterministic_mode(self):
4396 """ NAT plugin run deterministic mode """
4397 in_addr = '172.16.255.0'
4398 out_addr = '172.17.255.50'
4399 in_addr_t = '172.16.255.20'
4400 in_addr_n = socket.inet_aton(in_addr)
4401 out_addr_n = socket.inet_aton(out_addr)
4402 in_addr_t_n = socket.inet_aton(in_addr_t)
4406 nat_config = self.vapi.nat_show_config()
4407 self.assertEqual(1, nat_config.deterministic)
4409 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
4411 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
4412 self.assertEqual(rep1.out_addr[:4], out_addr_n)
4413 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
4414 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
4416 deterministic_mappings = self.vapi.nat_det_map_dump()
4417 self.assertEqual(len(deterministic_mappings), 1)
4418 dsm = deterministic_mappings[0]
4419 self.assertEqual(in_addr_n, dsm.in_addr[:4])
4420 self.assertEqual(in_plen, dsm.in_plen)
4421 self.assertEqual(out_addr_n, dsm.out_addr[:4])
4422 self.assertEqual(out_plen, dsm.out_plen)
4424 self.clear_nat_det()
4425 deterministic_mappings = self.vapi.nat_det_map_dump()
4426 self.assertEqual(len(deterministic_mappings), 0)
4428 def test_set_timeouts(self):
4429 """ Set deterministic NAT timeouts """
4430 timeouts_before = self.vapi.nat_det_get_timeouts()
4432 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
4433 timeouts_before.tcp_established + 10,
4434 timeouts_before.tcp_transitory + 10,
4435 timeouts_before.icmp + 10)
4437 timeouts_after = self.vapi.nat_det_get_timeouts()
4439 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
4440 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
4441 self.assertNotEqual(timeouts_before.tcp_established,
4442 timeouts_after.tcp_established)
4443 self.assertNotEqual(timeouts_before.tcp_transitory,
4444 timeouts_after.tcp_transitory)
4446 def test_det_in(self):
4447 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
4449 nat_ip = "10.0.0.10"
4451 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4453 socket.inet_aton(nat_ip),
4455 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4456 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4460 pkts = self.create_stream_in(self.pg0, self.pg1)
4461 self.pg0.add_stream(pkts)
4462 self.pg_enable_capture(self.pg_interfaces)
4464 capture = self.pg1.get_capture(len(pkts))
4465 self.verify_capture_out(capture, nat_ip)
4468 pkts = self.create_stream_out(self.pg1, nat_ip)
4469 self.pg1.add_stream(pkts)
4470 self.pg_enable_capture(self.pg_interfaces)
4472 capture = self.pg0.get_capture(len(pkts))
4473 self.verify_capture_in(capture, self.pg0)
4476 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
4477 self.assertEqual(len(sessions), 3)
4481 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4482 self.assertEqual(s.in_port, self.tcp_port_in)
4483 self.assertEqual(s.out_port, self.tcp_port_out)
4484 self.assertEqual(s.ext_port, self.tcp_external_port)
4488 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4489 self.assertEqual(s.in_port, self.udp_port_in)
4490 self.assertEqual(s.out_port, self.udp_port_out)
4491 self.assertEqual(s.ext_port, self.udp_external_port)
4495 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4496 self.assertEqual(s.in_port, self.icmp_id_in)
4497 self.assertEqual(s.out_port, self.icmp_external_id)
4499 def test_multiple_users(self):
4500 """ Deterministic NAT multiple users """
4502 nat_ip = "10.0.0.10"
4504 external_port = 6303
4506 host0 = self.pg0.remote_hosts[0]
4507 host1 = self.pg0.remote_hosts[1]
4509 self.vapi.nat_det_add_del_map(host0.ip4n,
4511 socket.inet_aton(nat_ip),
4513 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4514 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4518 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
4519 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
4520 TCP(sport=port_in, dport=external_port))
4521 self.pg0.add_stream(p)
4522 self.pg_enable_capture(self.pg_interfaces)
4524 capture = self.pg1.get_capture(1)
4529 self.assertEqual(ip.src, nat_ip)
4530 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4531 self.assertEqual(tcp.dport, external_port)
4532 port_out0 = tcp.sport
4534 self.logger.error(ppp("Unexpected or invalid packet:", p))
4538 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
4539 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
4540 TCP(sport=port_in, dport=external_port))
4541 self.pg0.add_stream(p)
4542 self.pg_enable_capture(self.pg_interfaces)
4544 capture = self.pg1.get_capture(1)
4549 self.assertEqual(ip.src, nat_ip)
4550 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4551 self.assertEqual(tcp.dport, external_port)
4552 port_out1 = tcp.sport
4554 self.logger.error(ppp("Unexpected or invalid packet:", p))
4557 dms = self.vapi.nat_det_map_dump()
4558 self.assertEqual(1, len(dms))
4559 self.assertEqual(2, dms[0].ses_num)
4562 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4563 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4564 TCP(sport=external_port, dport=port_out0))
4565 self.pg1.add_stream(p)
4566 self.pg_enable_capture(self.pg_interfaces)
4568 capture = self.pg0.get_capture(1)
4573 self.assertEqual(ip.src, self.pg1.remote_ip4)
4574 self.assertEqual(ip.dst, host0.ip4)
4575 self.assertEqual(tcp.dport, port_in)
4576 self.assertEqual(tcp.sport, external_port)
4578 self.logger.error(ppp("Unexpected or invalid packet:", p))
4582 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4583 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4584 TCP(sport=external_port, dport=port_out1))
4585 self.pg1.add_stream(p)
4586 self.pg_enable_capture(self.pg_interfaces)
4588 capture = self.pg0.get_capture(1)
4593 self.assertEqual(ip.src, self.pg1.remote_ip4)
4594 self.assertEqual(ip.dst, host1.ip4)
4595 self.assertEqual(tcp.dport, port_in)
4596 self.assertEqual(tcp.sport, external_port)
4598 self.logger.error(ppp("Unexpected or invalid packet", p))
4601 # session close api test
4602 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
4604 self.pg1.remote_ip4n,
4606 dms = self.vapi.nat_det_map_dump()
4607 self.assertEqual(dms[0].ses_num, 1)
4609 self.vapi.nat_det_close_session_in(host0.ip4n,
4611 self.pg1.remote_ip4n,
4613 dms = self.vapi.nat_det_map_dump()
4614 self.assertEqual(dms[0].ses_num, 0)
4616 def test_tcp_session_close_detection_in(self):
4617 """ Deterministic NAT TCP session close from inside network """
4618 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4620 socket.inet_aton(self.nat_addr),
4622 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4623 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4626 self.initiate_tcp_session(self.pg0, self.pg1)
4628 # close the session from inside
4630 # FIN packet in -> out
4631 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4632 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4633 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4635 self.pg0.add_stream(p)
4636 self.pg_enable_capture(self.pg_interfaces)
4638 self.pg1.get_capture(1)
4642 # ACK packet out -> in
4643 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4644 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4645 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4649 # FIN packet out -> in
4650 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4651 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4652 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4656 self.pg1.add_stream(pkts)
4657 self.pg_enable_capture(self.pg_interfaces)
4659 self.pg0.get_capture(2)
4661 # ACK packet in -> out
4662 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4663 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4664 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4666 self.pg0.add_stream(p)
4667 self.pg_enable_capture(self.pg_interfaces)
4669 self.pg1.get_capture(1)
4671 # Check if deterministic NAT44 closed the session
4672 dms = self.vapi.nat_det_map_dump()
4673 self.assertEqual(0, dms[0].ses_num)
4675 self.logger.error("TCP session termination failed")
4678 def test_tcp_session_close_detection_out(self):
4679 """ Deterministic NAT TCP session close from outside network """
4680 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4682 socket.inet_aton(self.nat_addr),
4684 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4685 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4688 self.initiate_tcp_session(self.pg0, self.pg1)
4690 # close the session from outside
4692 # FIN packet out -> in
4693 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4694 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4695 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4697 self.pg1.add_stream(p)
4698 self.pg_enable_capture(self.pg_interfaces)
4700 self.pg0.get_capture(1)
4704 # ACK packet in -> out
4705 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4706 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4707 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4711 # ACK packet in -> out
4712 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4713 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4714 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4718 self.pg0.add_stream(pkts)
4719 self.pg_enable_capture(self.pg_interfaces)
4721 self.pg1.get_capture(2)
4723 # ACK packet out -> in
4724 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4725 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4726 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4728 self.pg1.add_stream(p)
4729 self.pg_enable_capture(self.pg_interfaces)
4731 self.pg0.get_capture(1)
4733 # Check if deterministic NAT44 closed the session
4734 dms = self.vapi.nat_det_map_dump()
4735 self.assertEqual(0, dms[0].ses_num)
4737 self.logger.error("TCP session termination failed")
4740 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4741 def test_session_timeout(self):
4742 """ Deterministic NAT session timeouts """
4743 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4745 socket.inet_aton(self.nat_addr),
4747 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4748 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4751 self.initiate_tcp_session(self.pg0, self.pg1)
4752 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
4753 pkts = self.create_stream_in(self.pg0, self.pg1)
4754 self.pg0.add_stream(pkts)
4755 self.pg_enable_capture(self.pg_interfaces)
4757 capture = self.pg1.get_capture(len(pkts))
4760 dms = self.vapi.nat_det_map_dump()
4761 self.assertEqual(0, dms[0].ses_num)
4763 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4764 def test_session_limit_per_user(self):
4765 """ Deterministic NAT maximum sessions per user limit """
4766 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4768 socket.inet_aton(self.nat_addr),
4770 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4771 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4773 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
4774 src_address=self.pg2.local_ip4n,
4776 template_interval=10)
4777 self.vapi.nat_ipfix()
4780 for port in range(1025, 2025):
4781 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4782 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4783 UDP(sport=port, dport=port))
4786 self.pg0.add_stream(pkts)
4787 self.pg_enable_capture(self.pg_interfaces)
4789 capture = self.pg1.get_capture(len(pkts))
4791 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4792 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4793 UDP(sport=3001, dport=3002))
4794 self.pg0.add_stream(p)
4795 self.pg_enable_capture(self.pg_interfaces)
4797 capture = self.pg1.assert_nothing_captured()
4799 # verify ICMP error packet
4800 capture = self.pg0.get_capture(1)
4802 self.assertTrue(p.haslayer(ICMP))
4804 self.assertEqual(icmp.type, 3)
4805 self.assertEqual(icmp.code, 1)
4806 self.assertTrue(icmp.haslayer(IPerror))
4807 inner_ip = icmp[IPerror]
4808 self.assertEqual(inner_ip[UDPerror].sport, 3001)
4809 self.assertEqual(inner_ip[UDPerror].dport, 3002)
4811 dms = self.vapi.nat_det_map_dump()
4813 self.assertEqual(1000, dms[0].ses_num)
4815 # verify IPFIX logging
4816 self.vapi.cli("ipfix flush") # FIXME this should be an API call
4818 capture = self.pg2.get_capture(2)
4819 ipfix = IPFIXDecoder()
4820 # first load template
4822 self.assertTrue(p.haslayer(IPFIX))
4823 if p.haslayer(Template):
4824 ipfix.add_template(p.getlayer(Template))
4825 # verify events in data set
4827 if p.haslayer(Data):
4828 data = ipfix.decode_data_set(p.getlayer(Set))
4829 self.verify_ipfix_max_entries_per_user(data)
4831 def clear_nat_det(self):
4833 Clear deterministic NAT configuration.
4835 self.vapi.nat_ipfix(enable=0)
4836 self.vapi.nat_det_set_timeouts()
4837 deterministic_mappings = self.vapi.nat_det_map_dump()
4838 for dsm in deterministic_mappings:
4839 self.vapi.nat_det_add_del_map(dsm.in_addr,
4845 interfaces = self.vapi.nat44_interface_dump()
4846 for intf in interfaces:
4847 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
4852 super(TestDeterministicNAT, self).tearDown()
4853 if not self.vpp_dead:
4854 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4856 self.vapi.cli("show nat44 deterministic mappings"))
4858 self.vapi.cli("show nat44 deterministic timeouts"))
4860 self.vapi.cli("show nat44 deterministic sessions"))
4861 self.clear_nat_det()
4864 class TestNAT64(MethodHolder):
4865 """ NAT64 Test Cases """
4868 def setUpConstants(cls):
4869 super(TestNAT64, cls).setUpConstants()
4870 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
4871 "nat64 st hash buckets 256", "}"])
4874 def setUpClass(cls):
4875 super(TestNAT64, cls).setUpClass()
4878 cls.tcp_port_in = 6303
4879 cls.tcp_port_out = 6303
4880 cls.udp_port_in = 6304
4881 cls.udp_port_out = 6304
4882 cls.icmp_id_in = 6305
4883 cls.icmp_id_out = 6305
4884 cls.nat_addr = '10.0.0.3'
4885 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4887 cls.vrf1_nat_addr = '10.0.10.3'
4888 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
4890 cls.ipfix_src_port = 4739
4891 cls.ipfix_domain_id = 1
4893 cls.create_pg_interfaces(range(5))
4894 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
4895 cls.ip6_interfaces.append(cls.pg_interfaces[2])
4896 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
4898 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
4900 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
4902 cls.pg0.generate_remote_hosts(2)
4904 for i in cls.ip6_interfaces:
4907 i.configure_ipv6_neighbors()
4909 for i in cls.ip4_interfaces:
4915 cls.pg3.config_ip4()
4916 cls.pg3.resolve_arp()
4917 cls.pg3.config_ip6()
4918 cls.pg3.configure_ipv6_neighbors()
4921 super(TestNAT64, cls).tearDownClass()
4924 def test_pool(self):
4925 """ Add/delete address to NAT64 pool """
4926 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
4928 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
4930 addresses = self.vapi.nat64_pool_addr_dump()
4931 self.assertEqual(len(addresses), 1)
4932 self.assertEqual(addresses[0].address, nat_addr)
4934 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
4936 addresses = self.vapi.nat64_pool_addr_dump()
4937 self.assertEqual(len(addresses), 0)
4939 def test_interface(self):
4940 """ Enable/disable NAT64 feature on the interface """
4941 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4942 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4944 interfaces = self.vapi.nat64_interface_dump()
4945 self.assertEqual(len(interfaces), 2)
4948 for intf in interfaces:
4949 if intf.sw_if_index == self.pg0.sw_if_index:
4950 self.assertEqual(intf.is_inside, 1)
4952 elif intf.sw_if_index == self.pg1.sw_if_index:
4953 self.assertEqual(intf.is_inside, 0)
4955 self.assertTrue(pg0_found)
4956 self.assertTrue(pg1_found)
4958 features = self.vapi.cli("show interface features pg0")
4959 self.assertNotEqual(features.find('nat64-in2out'), -1)
4960 features = self.vapi.cli("show interface features pg1")
4961 self.assertNotEqual(features.find('nat64-out2in'), -1)
4963 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
4964 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
4966 interfaces = self.vapi.nat64_interface_dump()
4967 self.assertEqual(len(interfaces), 0)
4969 def test_static_bib(self):
4970 """ Add/delete static BIB entry """
4971 in_addr = socket.inet_pton(socket.AF_INET6,
4972 '2001:db8:85a3::8a2e:370:7334')
4973 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
4976 proto = IP_PROTOS.tcp
4978 self.vapi.nat64_add_del_static_bib(in_addr,
4983 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4988 self.assertEqual(bibe.i_addr, in_addr)
4989 self.assertEqual(bibe.o_addr, out_addr)
4990 self.assertEqual(bibe.i_port, in_port)
4991 self.assertEqual(bibe.o_port, out_port)
4992 self.assertEqual(static_bib_num, 1)
4994 self.vapi.nat64_add_del_static_bib(in_addr,
5000 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
5005 self.assertEqual(static_bib_num, 0)
5007 def test_set_timeouts(self):
5008 """ Set NAT64 timeouts """
5009 # verify default values
5010 timeouts = self.vapi.nat64_get_timeouts()
5011 self.assertEqual(timeouts.udp, 300)
5012 self.assertEqual(timeouts.icmp, 60)
5013 self.assertEqual(timeouts.tcp_trans, 240)
5014 self.assertEqual(timeouts.tcp_est, 7440)
5015 self.assertEqual(timeouts.tcp_incoming_syn, 6)
5017 # set and verify custom values
5018 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
5019 tcp_est=7450, tcp_incoming_syn=10)
5020 timeouts = self.vapi.nat64_get_timeouts()
5021 self.assertEqual(timeouts.udp, 200)
5022 self.assertEqual(timeouts.icmp, 30)
5023 self.assertEqual(timeouts.tcp_trans, 250)
5024 self.assertEqual(timeouts.tcp_est, 7450)
5025 self.assertEqual(timeouts.tcp_incoming_syn, 10)
5027 def test_dynamic(self):
5028 """ NAT64 dynamic translation test """
5029 self.tcp_port_in = 6303
5030 self.udp_port_in = 6304
5031 self.icmp_id_in = 6305
5033 ses_num_start = self.nat64_get_ses_num()
5035 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5037 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5038 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5041 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5042 self.pg0.add_stream(pkts)
5043 self.pg_enable_capture(self.pg_interfaces)
5045 capture = self.pg1.get_capture(len(pkts))
5046 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5047 dst_ip=self.pg1.remote_ip4)
5050 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5051 self.pg1.add_stream(pkts)
5052 self.pg_enable_capture(self.pg_interfaces)
5054 capture = self.pg0.get_capture(len(pkts))
5055 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5056 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5059 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5060 self.pg0.add_stream(pkts)
5061 self.pg_enable_capture(self.pg_interfaces)
5063 capture = self.pg1.get_capture(len(pkts))
5064 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5065 dst_ip=self.pg1.remote_ip4)
5068 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5069 self.pg1.add_stream(pkts)
5070 self.pg_enable_capture(self.pg_interfaces)
5072 capture = self.pg0.get_capture(len(pkts))
5073 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5075 ses_num_end = self.nat64_get_ses_num()
5077 self.assertEqual(ses_num_end - ses_num_start, 3)
5079 # tenant with specific VRF
5080 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5081 self.vrf1_nat_addr_n,
5082 vrf_id=self.vrf1_id)
5083 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5085 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
5086 self.pg2.add_stream(pkts)
5087 self.pg_enable_capture(self.pg_interfaces)
5089 capture = self.pg1.get_capture(len(pkts))
5090 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5091 dst_ip=self.pg1.remote_ip4)
5093 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5094 self.pg1.add_stream(pkts)
5095 self.pg_enable_capture(self.pg_interfaces)
5097 capture = self.pg2.get_capture(len(pkts))
5098 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
5100 def test_static(self):
5101 """ NAT64 static translation test """
5102 self.tcp_port_in = 60303
5103 self.udp_port_in = 60304
5104 self.icmp_id_in = 60305
5105 self.tcp_port_out = 60303
5106 self.udp_port_out = 60304
5107 self.icmp_id_out = 60305
5109 ses_num_start = self.nat64_get_ses_num()
5111 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5113 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5114 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5116 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5121 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5126 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5133 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5134 self.pg0.add_stream(pkts)
5135 self.pg_enable_capture(self.pg_interfaces)
5137 capture = self.pg1.get_capture(len(pkts))
5138 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5139 dst_ip=self.pg1.remote_ip4, same_port=True)
5142 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5143 self.pg1.add_stream(pkts)
5144 self.pg_enable_capture(self.pg_interfaces)
5146 capture = self.pg0.get_capture(len(pkts))
5147 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5148 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5150 ses_num_end = self.nat64_get_ses_num()
5152 self.assertEqual(ses_num_end - ses_num_start, 3)
5154 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5155 def test_session_timeout(self):
5156 """ NAT64 session timeout """
5157 self.icmp_id_in = 1234
5158 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5160 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5161 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5162 self.vapi.nat64_set_timeouts(icmp=5)
5164 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5165 self.pg0.add_stream(pkts)
5166 self.pg_enable_capture(self.pg_interfaces)
5168 capture = self.pg1.get_capture(len(pkts))
5170 ses_num_before_timeout = self.nat64_get_ses_num()
5174 # ICMP session after timeout
5175 ses_num_after_timeout = self.nat64_get_ses_num()
5176 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
5178 def test_icmp_error(self):
5179 """ NAT64 ICMP Error message translation """
5180 self.tcp_port_in = 6303
5181 self.udp_port_in = 6304
5182 self.icmp_id_in = 6305
5184 ses_num_start = self.nat64_get_ses_num()
5186 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5188 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5189 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5191 # send some packets to create sessions
5192 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5193 self.pg0.add_stream(pkts)
5194 self.pg_enable_capture(self.pg_interfaces)
5196 capture_ip4 = self.pg1.get_capture(len(pkts))
5197 self.verify_capture_out(capture_ip4,
5198 nat_ip=self.nat_addr,
5199 dst_ip=self.pg1.remote_ip4)
5201 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5202 self.pg1.add_stream(pkts)
5203 self.pg_enable_capture(self.pg_interfaces)
5205 capture_ip6 = self.pg0.get_capture(len(pkts))
5206 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5207 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
5208 self.pg0.remote_ip6)
5211 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5212 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
5213 ICMPv6DestUnreach(code=1) /
5214 packet[IPv6] for packet in capture_ip6]
5215 self.pg0.add_stream(pkts)
5216 self.pg_enable_capture(self.pg_interfaces)
5218 capture = self.pg1.get_capture(len(pkts))
5219 for packet in capture:
5221 self.assertEqual(packet[IP].src, self.nat_addr)
5222 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5223 self.assertEqual(packet[ICMP].type, 3)
5224 self.assertEqual(packet[ICMP].code, 13)
5225 inner = packet[IPerror]
5226 self.assertEqual(inner.src, self.pg1.remote_ip4)
5227 self.assertEqual(inner.dst, self.nat_addr)
5228 self.check_icmp_checksum(packet)
5229 if inner.haslayer(TCPerror):
5230 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
5231 elif inner.haslayer(UDPerror):
5232 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
5234 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
5236 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5240 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5241 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5242 ICMP(type=3, code=13) /
5243 packet[IP] for packet in capture_ip4]
5244 self.pg1.add_stream(pkts)
5245 self.pg_enable_capture(self.pg_interfaces)
5247 capture = self.pg0.get_capture(len(pkts))
5248 for packet in capture:
5250 self.assertEqual(packet[IPv6].src, ip.src)
5251 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5252 icmp = packet[ICMPv6DestUnreach]
5253 self.assertEqual(icmp.code, 1)
5254 inner = icmp[IPerror6]
5255 self.assertEqual(inner.src, self.pg0.remote_ip6)
5256 self.assertEqual(inner.dst, ip.src)
5257 self.check_icmpv6_checksum(packet)
5258 if inner.haslayer(TCPerror):
5259 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
5260 elif inner.haslayer(UDPerror):
5261 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
5263 self.assertEqual(inner[ICMPv6EchoRequest].id,
5266 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5269 def test_hairpinning(self):
5270 """ NAT64 hairpinning """
5272 client = self.pg0.remote_hosts[0]
5273 server = self.pg0.remote_hosts[1]
5274 server_tcp_in_port = 22
5275 server_tcp_out_port = 4022
5276 server_udp_in_port = 23
5277 server_udp_out_port = 4023
5278 client_tcp_in_port = 1234
5279 client_udp_in_port = 1235
5280 client_tcp_out_port = 0
5281 client_udp_out_port = 0
5282 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5283 nat_addr_ip6 = ip.src
5285 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5287 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5288 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5290 self.vapi.nat64_add_del_static_bib(server.ip6n,
5293 server_tcp_out_port,
5295 self.vapi.nat64_add_del_static_bib(server.ip6n,
5298 server_udp_out_port,
5303 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5304 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5305 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5307 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5308 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5309 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
5311 self.pg0.add_stream(pkts)
5312 self.pg_enable_capture(self.pg_interfaces)
5314 capture = self.pg0.get_capture(len(pkts))
5315 for packet in capture:
5317 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5318 self.assertEqual(packet[IPv6].dst, server.ip6)
5319 if packet.haslayer(TCP):
5320 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
5321 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
5322 self.check_tcp_checksum(packet)
5323 client_tcp_out_port = packet[TCP].sport
5325 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
5326 self.assertEqual(packet[UDP].dport, server_udp_in_port)
5327 self.check_udp_checksum(packet)
5328 client_udp_out_port = packet[UDP].sport
5330 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5335 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5336 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5337 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
5339 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5340 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5341 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
5343 self.pg0.add_stream(pkts)
5344 self.pg_enable_capture(self.pg_interfaces)
5346 capture = self.pg0.get_capture(len(pkts))
5347 for packet in capture:
5349 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5350 self.assertEqual(packet[IPv6].dst, client.ip6)
5351 if packet.haslayer(TCP):
5352 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
5353 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
5354 self.check_tcp_checksum(packet)
5356 self.assertEqual(packet[UDP].sport, server_udp_out_port)
5357 self.assertEqual(packet[UDP].dport, client_udp_in_port)
5358 self.check_udp_checksum(packet)
5360 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5365 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5366 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5367 ICMPv6DestUnreach(code=1) /
5368 packet[IPv6] for packet in capture]
5369 self.pg0.add_stream(pkts)
5370 self.pg_enable_capture(self.pg_interfaces)
5372 capture = self.pg0.get_capture(len(pkts))
5373 for packet in capture:
5375 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5376 self.assertEqual(packet[IPv6].dst, server.ip6)
5377 icmp = packet[ICMPv6DestUnreach]
5378 self.assertEqual(icmp.code, 1)
5379 inner = icmp[IPerror6]
5380 self.assertEqual(inner.src, server.ip6)
5381 self.assertEqual(inner.dst, nat_addr_ip6)
5382 self.check_icmpv6_checksum(packet)
5383 if inner.haslayer(TCPerror):
5384 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
5385 self.assertEqual(inner[TCPerror].dport,
5386 client_tcp_out_port)
5388 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
5389 self.assertEqual(inner[UDPerror].dport,
5390 client_udp_out_port)
5392 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5395 def test_prefix(self):
5396 """ NAT64 Network-Specific Prefix """
5398 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5400 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5401 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5402 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5403 self.vrf1_nat_addr_n,
5404 vrf_id=self.vrf1_id)
5405 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5408 global_pref64 = "2001:db8::"
5409 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
5410 global_pref64_len = 32
5411 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
5413 prefix = self.vapi.nat64_prefix_dump()
5414 self.assertEqual(len(prefix), 1)
5415 self.assertEqual(prefix[0].prefix, global_pref64_n)
5416 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
5417 self.assertEqual(prefix[0].vrf_id, 0)
5419 # Add tenant specific prefix
5420 vrf1_pref64 = "2001:db8:122:300::"
5421 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
5422 vrf1_pref64_len = 56
5423 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
5425 vrf_id=self.vrf1_id)
5426 prefix = self.vapi.nat64_prefix_dump()
5427 self.assertEqual(len(prefix), 2)
5430 pkts = self.create_stream_in_ip6(self.pg0,
5433 plen=global_pref64_len)
5434 self.pg0.add_stream(pkts)
5435 self.pg_enable_capture(self.pg_interfaces)
5437 capture = self.pg1.get_capture(len(pkts))
5438 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5439 dst_ip=self.pg1.remote_ip4)
5441 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5442 self.pg1.add_stream(pkts)
5443 self.pg_enable_capture(self.pg_interfaces)
5445 capture = self.pg0.get_capture(len(pkts))
5446 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5449 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
5451 # Tenant specific prefix
5452 pkts = self.create_stream_in_ip6(self.pg2,
5455 plen=vrf1_pref64_len)
5456 self.pg2.add_stream(pkts)
5457 self.pg_enable_capture(self.pg_interfaces)
5459 capture = self.pg1.get_capture(len(pkts))
5460 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5461 dst_ip=self.pg1.remote_ip4)
5463 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5464 self.pg1.add_stream(pkts)
5465 self.pg_enable_capture(self.pg_interfaces)
5467 capture = self.pg2.get_capture(len(pkts))
5468 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5471 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
5473 def test_unknown_proto(self):
5474 """ NAT64 translate packet with unknown protocol """
5476 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5478 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5479 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5480 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5483 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5484 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
5485 TCP(sport=self.tcp_port_in, dport=20))
5486 self.pg0.add_stream(p)
5487 self.pg_enable_capture(self.pg_interfaces)
5489 p = self.pg1.get_capture(1)
5491 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5492 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
5494 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5495 TCP(sport=1234, dport=1234))
5496 self.pg0.add_stream(p)
5497 self.pg_enable_capture(self.pg_interfaces)
5499 p = self.pg1.get_capture(1)
5502 self.assertEqual(packet[IP].src, self.nat_addr)
5503 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5504 self.assertTrue(packet.haslayer(GRE))
5505 self.check_ip_checksum(packet)
5507 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5511 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5512 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5514 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5515 TCP(sport=1234, dport=1234))
5516 self.pg1.add_stream(p)
5517 self.pg_enable_capture(self.pg_interfaces)
5519 p = self.pg0.get_capture(1)
5522 self.assertEqual(packet[IPv6].src, remote_ip6)
5523 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5524 self.assertEqual(packet[IPv6].nh, 47)
5526 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5529 def test_hairpinning_unknown_proto(self):
5530 """ NAT64 translate packet with unknown protocol - hairpinning """
5532 client = self.pg0.remote_hosts[0]
5533 server = self.pg0.remote_hosts[1]
5534 server_tcp_in_port = 22
5535 server_tcp_out_port = 4022
5536 client_tcp_in_port = 1234
5537 client_tcp_out_port = 1235
5538 server_nat_ip = "10.0.0.100"
5539 client_nat_ip = "10.0.0.110"
5540 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
5541 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
5542 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
5543 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
5545 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
5547 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5548 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5550 self.vapi.nat64_add_del_static_bib(server.ip6n,
5553 server_tcp_out_port,
5556 self.vapi.nat64_add_del_static_bib(server.ip6n,
5562 self.vapi.nat64_add_del_static_bib(client.ip6n,
5565 client_tcp_out_port,
5569 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5570 IPv6(src=client.ip6, dst=server_nat_ip6) /
5571 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5572 self.pg0.add_stream(p)
5573 self.pg_enable_capture(self.pg_interfaces)
5575 p = self.pg0.get_capture(1)
5577 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5578 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
5580 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5581 TCP(sport=1234, dport=1234))
5582 self.pg0.add_stream(p)
5583 self.pg_enable_capture(self.pg_interfaces)
5585 p = self.pg0.get_capture(1)
5588 self.assertEqual(packet[IPv6].src, client_nat_ip6)
5589 self.assertEqual(packet[IPv6].dst, server.ip6)
5590 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5592 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5596 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5597 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
5599 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5600 TCP(sport=1234, dport=1234))
5601 self.pg0.add_stream(p)
5602 self.pg_enable_capture(self.pg_interfaces)
5604 p = self.pg0.get_capture(1)
5607 self.assertEqual(packet[IPv6].src, server_nat_ip6)
5608 self.assertEqual(packet[IPv6].dst, client.ip6)
5609 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5611 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5614 def test_one_armed_nat64(self):
5615 """ One armed NAT64 """
5617 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
5621 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5623 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
5624 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
5627 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5628 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
5629 TCP(sport=12345, dport=80))
5630 self.pg3.add_stream(p)
5631 self.pg_enable_capture(self.pg_interfaces)
5633 capture = self.pg3.get_capture(1)
5638 self.assertEqual(ip.src, self.nat_addr)
5639 self.assertEqual(ip.dst, self.pg3.remote_ip4)
5640 self.assertNotEqual(tcp.sport, 12345)
5641 external_port = tcp.sport
5642 self.assertEqual(tcp.dport, 80)
5643 self.check_tcp_checksum(p)
5644 self.check_ip_checksum(p)
5646 self.logger.error(ppp("Unexpected or invalid packet:", p))
5650 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5651 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5652 TCP(sport=80, dport=external_port))
5653 self.pg3.add_stream(p)
5654 self.pg_enable_capture(self.pg_interfaces)
5656 capture = self.pg3.get_capture(1)
5661 self.assertEqual(ip.src, remote_host_ip6)
5662 self.assertEqual(ip.dst, self.pg3.remote_ip6)
5663 self.assertEqual(tcp.sport, 80)
5664 self.assertEqual(tcp.dport, 12345)
5665 self.check_tcp_checksum(p)
5667 self.logger.error(ppp("Unexpected or invalid packet:", p))
5670 def test_frag_in_order(self):
5671 """ NAT64 translate fragments arriving in order """
5672 self.tcp_port_in = random.randint(1025, 65535)
5674 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5676 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5677 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5679 reass = self.vapi.nat_reass_dump()
5680 reass_n_start = len(reass)
5684 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5685 self.tcp_port_in, 20, data)
5686 self.pg0.add_stream(pkts)
5687 self.pg_enable_capture(self.pg_interfaces)
5689 frags = self.pg1.get_capture(len(pkts))
5690 p = self.reass_frags_and_verify(frags,
5692 self.pg1.remote_ip4)
5693 self.assertEqual(p[TCP].dport, 20)
5694 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5695 self.tcp_port_out = p[TCP].sport
5696 self.assertEqual(data, p[Raw].load)
5699 data = "A" * 4 + "b" * 16 + "C" * 3
5700 pkts = self.create_stream_frag(self.pg1,
5705 self.pg1.add_stream(pkts)
5706 self.pg_enable_capture(self.pg_interfaces)
5708 frags = self.pg0.get_capture(len(pkts))
5709 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5710 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5711 self.assertEqual(p[TCP].sport, 20)
5712 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5713 self.assertEqual(data, p[Raw].load)
5715 reass = self.vapi.nat_reass_dump()
5716 reass_n_end = len(reass)
5718 self.assertEqual(reass_n_end - reass_n_start, 2)
5720 def test_reass_hairpinning(self):
5721 """ NAT64 fragments hairpinning """
5723 client = self.pg0.remote_hosts[0]
5724 server = self.pg0.remote_hosts[1]
5725 server_in_port = random.randint(1025, 65535)
5726 server_out_port = random.randint(1025, 65535)
5727 client_in_port = random.randint(1025, 65535)
5728 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5729 nat_addr_ip6 = ip.src
5731 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5733 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5734 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5736 # add static BIB entry for server
5737 self.vapi.nat64_add_del_static_bib(server.ip6n,
5743 # send packet from host to server
5744 pkts = self.create_stream_frag_ip6(self.pg0,
5749 self.pg0.add_stream(pkts)
5750 self.pg_enable_capture(self.pg_interfaces)
5752 frags = self.pg0.get_capture(len(pkts))
5753 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
5754 self.assertNotEqual(p[TCP].sport, client_in_port)
5755 self.assertEqual(p[TCP].dport, server_in_port)
5756 self.assertEqual(data, p[Raw].load)
5758 def test_frag_out_of_order(self):
5759 """ NAT64 translate fragments arriving out of order """
5760 self.tcp_port_in = random.randint(1025, 65535)
5762 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5764 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5765 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5769 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5770 self.tcp_port_in, 20, data)
5772 self.pg0.add_stream(pkts)
5773 self.pg_enable_capture(self.pg_interfaces)
5775 frags = self.pg1.get_capture(len(pkts))
5776 p = self.reass_frags_and_verify(frags,
5778 self.pg1.remote_ip4)
5779 self.assertEqual(p[TCP].dport, 20)
5780 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5781 self.tcp_port_out = p[TCP].sport
5782 self.assertEqual(data, p[Raw].load)
5785 data = "A" * 4 + "B" * 16 + "C" * 3
5786 pkts = self.create_stream_frag(self.pg1,
5792 self.pg1.add_stream(pkts)
5793 self.pg_enable_capture(self.pg_interfaces)
5795 frags = self.pg0.get_capture(len(pkts))
5796 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5797 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5798 self.assertEqual(p[TCP].sport, 20)
5799 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5800 self.assertEqual(data, p[Raw].load)
5802 def test_interface_addr(self):
5803 """ Acquire NAT64 pool addresses from interface """
5804 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
5806 # no address in NAT64 pool
5807 adresses = self.vapi.nat44_address_dump()
5808 self.assertEqual(0, len(adresses))
5810 # configure interface address and check NAT64 address pool
5811 self.pg4.config_ip4()
5812 addresses = self.vapi.nat64_pool_addr_dump()
5813 self.assertEqual(len(addresses), 1)
5814 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
5816 # remove interface address and check NAT64 address pool
5817 self.pg4.unconfig_ip4()
5818 addresses = self.vapi.nat64_pool_addr_dump()
5819 self.assertEqual(0, len(adresses))
5821 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5822 def test_ipfix_max_bibs_sessions(self):
5823 """ IPFIX logging maximum session and BIB entries exceeded """
5826 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5830 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5832 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5833 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5837 for i in range(0, max_bibs):
5838 src = "fd01:aa::%x" % (i)
5839 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5840 IPv6(src=src, dst=remote_host_ip6) /
5841 TCP(sport=12345, dport=80))
5843 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5844 IPv6(src=src, dst=remote_host_ip6) /
5845 TCP(sport=12345, dport=22))
5847 self.pg0.add_stream(pkts)
5848 self.pg_enable_capture(self.pg_interfaces)
5850 self.pg1.get_capture(max_sessions)
5852 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5853 src_address=self.pg3.local_ip4n,
5855 template_interval=10)
5856 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5857 src_port=self.ipfix_src_port)
5859 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5860 IPv6(src=src, dst=remote_host_ip6) /
5861 TCP(sport=12345, dport=25))
5862 self.pg0.add_stream(p)
5863 self.pg_enable_capture(self.pg_interfaces)
5865 self.pg1.get_capture(0)
5866 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5867 capture = self.pg3.get_capture(9)
5868 ipfix = IPFIXDecoder()
5869 # first load template
5871 self.assertTrue(p.haslayer(IPFIX))
5872 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5873 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5874 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5875 self.assertEqual(p[UDP].dport, 4739)
5876 self.assertEqual(p[IPFIX].observationDomainID,
5877 self.ipfix_domain_id)
5878 if p.haslayer(Template):
5879 ipfix.add_template(p.getlayer(Template))
5880 # verify events in data set
5882 if p.haslayer(Data):
5883 data = ipfix.decode_data_set(p.getlayer(Set))
5884 self.verify_ipfix_max_sessions(data, max_sessions)
5886 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5887 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5888 TCP(sport=12345, dport=80))
5889 self.pg0.add_stream(p)
5890 self.pg_enable_capture(self.pg_interfaces)
5892 self.pg1.get_capture(0)
5893 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5894 capture = self.pg3.get_capture(1)
5895 # verify events in data set
5897 self.assertTrue(p.haslayer(IPFIX))
5898 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5899 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5900 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5901 self.assertEqual(p[UDP].dport, 4739)
5902 self.assertEqual(p[IPFIX].observationDomainID,
5903 self.ipfix_domain_id)
5904 if p.haslayer(Data):
5905 data = ipfix.decode_data_set(p.getlayer(Set))
5906 self.verify_ipfix_max_bibs(data, max_bibs)
5908 def test_ipfix_max_frags(self):
5909 """ IPFIX logging maximum fragments pending reassembly exceeded """
5910 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5912 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5913 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5914 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
5915 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5916 src_address=self.pg3.local_ip4n,
5918 template_interval=10)
5919 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5920 src_port=self.ipfix_src_port)
5923 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5924 self.tcp_port_in, 20, data)
5925 self.pg0.add_stream(pkts[-1])
5926 self.pg_enable_capture(self.pg_interfaces)
5928 self.pg1.get_capture(0)
5929 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5930 capture = self.pg3.get_capture(9)
5931 ipfix = IPFIXDecoder()
5932 # first load template
5934 self.assertTrue(p.haslayer(IPFIX))
5935 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5936 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5937 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5938 self.assertEqual(p[UDP].dport, 4739)
5939 self.assertEqual(p[IPFIX].observationDomainID,
5940 self.ipfix_domain_id)
5941 if p.haslayer(Template):
5942 ipfix.add_template(p.getlayer(Template))
5943 # verify events in data set
5945 if p.haslayer(Data):
5946 data = ipfix.decode_data_set(p.getlayer(Set))
5947 self.verify_ipfix_max_fragments_ip6(data, 0,
5948 self.pg0.remote_ip6n)
5950 def test_ipfix_bib_ses(self):
5951 """ IPFIX logging NAT64 BIB/session create and delete events """
5952 self.tcp_port_in = random.randint(1025, 65535)
5953 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5957 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5959 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5960 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5961 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5962 src_address=self.pg3.local_ip4n,
5964 template_interval=10)
5965 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5966 src_port=self.ipfix_src_port)
5969 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5970 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5971 TCP(sport=self.tcp_port_in, dport=25))
5972 self.pg0.add_stream(p)
5973 self.pg_enable_capture(self.pg_interfaces)
5975 p = self.pg1.get_capture(1)
5976 self.tcp_port_out = p[0][TCP].sport
5977 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5978 capture = self.pg3.get_capture(10)
5979 ipfix = IPFIXDecoder()
5980 # first load template
5982 self.assertTrue(p.haslayer(IPFIX))
5983 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5984 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5985 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5986 self.assertEqual(p[UDP].dport, 4739)
5987 self.assertEqual(p[IPFIX].observationDomainID,
5988 self.ipfix_domain_id)
5989 if p.haslayer(Template):
5990 ipfix.add_template(p.getlayer(Template))
5991 # verify events in data set
5993 if p.haslayer(Data):
5994 data = ipfix.decode_data_set(p.getlayer(Set))
5995 if ord(data[0][230]) == 10:
5996 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
5997 elif ord(data[0][230]) == 6:
5998 self.verify_ipfix_nat64_ses(data,
6000 self.pg0.remote_ip6n,
6001 self.pg1.remote_ip4,
6004 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6007 self.pg_enable_capture(self.pg_interfaces)
6008 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6011 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6012 capture = self.pg3.get_capture(2)
6013 # verify events in data set
6015 self.assertTrue(p.haslayer(IPFIX))
6016 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6017 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6018 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6019 self.assertEqual(p[UDP].dport, 4739)
6020 self.assertEqual(p[IPFIX].observationDomainID,
6021 self.ipfix_domain_id)
6022 if p.haslayer(Data):
6023 data = ipfix.decode_data_set(p.getlayer(Set))
6024 if ord(data[0][230]) == 11:
6025 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
6026 elif ord(data[0][230]) == 7:
6027 self.verify_ipfix_nat64_ses(data,
6029 self.pg0.remote_ip6n,
6030 self.pg1.remote_ip4,
6033 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6035 def nat64_get_ses_num(self):
6037 Return number of active NAT64 sessions.
6039 st = self.vapi.nat64_st_dump()
6042 def clear_nat64(self):
6044 Clear NAT64 configuration.
6046 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
6047 domain_id=self.ipfix_domain_id)
6048 self.ipfix_src_port = 4739
6049 self.ipfix_domain_id = 1
6051 self.vapi.nat64_set_timeouts()
6053 interfaces = self.vapi.nat64_interface_dump()
6054 for intf in interfaces:
6055 if intf.is_inside > 1:
6056 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6059 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6063 bib = self.vapi.nat64_bib_dump(255)
6066 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
6074 adresses = self.vapi.nat64_pool_addr_dump()
6075 for addr in adresses:
6076 self.vapi.nat64_add_del_pool_addr_range(addr.address,
6081 prefixes = self.vapi.nat64_prefix_dump()
6082 for prefix in prefixes:
6083 self.vapi.nat64_add_del_prefix(prefix.prefix,
6085 vrf_id=prefix.vrf_id,
6089 super(TestNAT64, self).tearDown()
6090 if not self.vpp_dead:
6091 self.logger.info(self.vapi.cli("show nat64 pool"))
6092 self.logger.info(self.vapi.cli("show nat64 interfaces"))
6093 self.logger.info(self.vapi.cli("show nat64 prefix"))
6094 self.logger.info(self.vapi.cli("show nat64 bib all"))
6095 self.logger.info(self.vapi.cli("show nat64 session table all"))
6096 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
6100 class TestDSlite(MethodHolder):
6101 """ DS-Lite Test Cases """
6104 def setUpClass(cls):
6105 super(TestDSlite, cls).setUpClass()
6108 cls.nat_addr = '10.0.0.3'
6109 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
6111 cls.create_pg_interfaces(range(2))
6113 cls.pg0.config_ip4()
6114 cls.pg0.resolve_arp()
6116 cls.pg1.config_ip6()
6117 cls.pg1.generate_remote_hosts(2)
6118 cls.pg1.configure_ipv6_neighbors()
6121 super(TestDSlite, cls).tearDownClass()
6124 def test_dslite(self):
6125 """ Test DS-Lite """
6126 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
6128 aftr_ip4 = '192.0.0.1'
6129 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6130 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6131 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6132 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6135 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6136 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
6137 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6138 UDP(sport=20000, dport=10000))
6139 self.pg1.add_stream(p)
6140 self.pg_enable_capture(self.pg_interfaces)
6142 capture = self.pg0.get_capture(1)
6143 capture = capture[0]
6144 self.assertFalse(capture.haslayer(IPv6))
6145 self.assertEqual(capture[IP].src, self.nat_addr)
6146 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6147 self.assertNotEqual(capture[UDP].sport, 20000)
6148 self.assertEqual(capture[UDP].dport, 10000)
6149 self.check_ip_checksum(capture)
6150 out_port = capture[UDP].sport
6152 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6153 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6154 UDP(sport=10000, dport=out_port))
6155 self.pg0.add_stream(p)
6156 self.pg_enable_capture(self.pg_interfaces)
6158 capture = self.pg1.get_capture(1)
6159 capture = capture[0]
6160 self.assertEqual(capture[IPv6].src, aftr_ip6)
6161 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6162 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6163 self.assertEqual(capture[IP].dst, '192.168.1.1')
6164 self.assertEqual(capture[UDP].sport, 10000)
6165 self.assertEqual(capture[UDP].dport, 20000)
6166 self.check_ip_checksum(capture)
6169 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6170 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6171 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6172 TCP(sport=20001, dport=10001))
6173 self.pg1.add_stream(p)
6174 self.pg_enable_capture(self.pg_interfaces)
6176 capture = self.pg0.get_capture(1)
6177 capture = capture[0]
6178 self.assertFalse(capture.haslayer(IPv6))
6179 self.assertEqual(capture[IP].src, self.nat_addr)
6180 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6181 self.assertNotEqual(capture[TCP].sport, 20001)
6182 self.assertEqual(capture[TCP].dport, 10001)
6183 self.check_ip_checksum(capture)
6184 self.check_tcp_checksum(capture)
6185 out_port = capture[TCP].sport
6187 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6188 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6189 TCP(sport=10001, dport=out_port))
6190 self.pg0.add_stream(p)
6191 self.pg_enable_capture(self.pg_interfaces)
6193 capture = self.pg1.get_capture(1)
6194 capture = capture[0]
6195 self.assertEqual(capture[IPv6].src, aftr_ip6)
6196 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6197 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6198 self.assertEqual(capture[IP].dst, '192.168.1.1')
6199 self.assertEqual(capture[TCP].sport, 10001)
6200 self.assertEqual(capture[TCP].dport, 20001)
6201 self.check_ip_checksum(capture)
6202 self.check_tcp_checksum(capture)
6205 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6206 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6207 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6208 ICMP(id=4000, type='echo-request'))
6209 self.pg1.add_stream(p)
6210 self.pg_enable_capture(self.pg_interfaces)
6212 capture = self.pg0.get_capture(1)
6213 capture = capture[0]
6214 self.assertFalse(capture.haslayer(IPv6))
6215 self.assertEqual(capture[IP].src, self.nat_addr)
6216 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6217 self.assertNotEqual(capture[ICMP].id, 4000)
6218 self.check_ip_checksum(capture)
6219 self.check_icmp_checksum(capture)
6220 out_id = capture[ICMP].id
6222 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6223 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6224 ICMP(id=out_id, type='echo-reply'))
6225 self.pg0.add_stream(p)
6226 self.pg_enable_capture(self.pg_interfaces)
6228 capture = self.pg1.get_capture(1)
6229 capture = capture[0]
6230 self.assertEqual(capture[IPv6].src, aftr_ip6)
6231 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6232 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6233 self.assertEqual(capture[IP].dst, '192.168.1.1')
6234 self.assertEqual(capture[ICMP].id, 4000)
6235 self.check_ip_checksum(capture)
6236 self.check_icmp_checksum(capture)
6238 # ping DS-Lite AFTR tunnel endpoint address
6239 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6240 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
6241 ICMPv6EchoRequest())
6242 self.pg1.add_stream(p)
6243 self.pg_enable_capture(self.pg_interfaces)
6245 capture = self.pg1.get_capture(1)
6246 self.assertEqual(1, len(capture))
6247 capture = capture[0]
6248 self.assertEqual(capture[IPv6].src, aftr_ip6)
6249 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6250 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6253 super(TestDSlite, self).tearDown()
6254 if not self.vpp_dead:
6255 self.logger.info(self.vapi.cli("show dslite pool"))
6257 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6258 self.logger.info(self.vapi.cli("show dslite sessions"))
6261 class TestDSliteCE(MethodHolder):
6262 """ DS-Lite CE Test Cases """
6265 def setUpConstants(cls):
6266 super(TestDSliteCE, cls).setUpConstants()
6267 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
6270 def setUpClass(cls):
6271 super(TestDSliteCE, cls).setUpClass()
6274 cls.create_pg_interfaces(range(2))
6276 cls.pg0.config_ip4()
6277 cls.pg0.resolve_arp()
6279 cls.pg1.config_ip6()
6280 cls.pg1.generate_remote_hosts(1)
6281 cls.pg1.configure_ipv6_neighbors()
6284 super(TestDSliteCE, cls).tearDownClass()
6287 def test_dslite_ce(self):
6288 """ Test DS-Lite CE """
6290 b4_ip4 = '192.0.0.2'
6291 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
6292 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
6293 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
6294 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
6296 aftr_ip4 = '192.0.0.1'
6297 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6298 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6299 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6300 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6302 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
6303 dst_address_length=128,
6304 next_hop_address=self.pg1.remote_ip6n,
6305 next_hop_sw_if_index=self.pg1.sw_if_index,
6309 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6310 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
6311 UDP(sport=10000, dport=20000))
6312 self.pg0.add_stream(p)
6313 self.pg_enable_capture(self.pg_interfaces)
6315 capture = self.pg1.get_capture(1)
6316 capture = capture[0]
6317 self.assertEqual(capture[IPv6].src, b4_ip6)
6318 self.assertEqual(capture[IPv6].dst, aftr_ip6)
6319 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6320 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
6321 self.assertEqual(capture[UDP].sport, 10000)
6322 self.assertEqual(capture[UDP].dport, 20000)
6323 self.check_ip_checksum(capture)
6326 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6327 IPv6(dst=b4_ip6, src=aftr_ip6) /
6328 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
6329 UDP(sport=20000, dport=10000))
6330 self.pg1.add_stream(p)
6331 self.pg_enable_capture(self.pg_interfaces)
6333 capture = self.pg0.get_capture(1)
6334 capture = capture[0]
6335 self.assertFalse(capture.haslayer(IPv6))
6336 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
6337 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6338 self.assertEqual(capture[UDP].sport, 20000)
6339 self.assertEqual(capture[UDP].dport, 10000)
6340 self.check_ip_checksum(capture)
6342 # ping DS-Lite B4 tunnel endpoint address
6343 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6344 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
6345 ICMPv6EchoRequest())
6346 self.pg1.add_stream(p)
6347 self.pg_enable_capture(self.pg_interfaces)
6349 capture = self.pg1.get_capture(1)
6350 self.assertEqual(1, len(capture))
6351 capture = capture[0]
6352 self.assertEqual(capture[IPv6].src, b4_ip6)
6353 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6354 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6357 super(TestDSliteCE, self).tearDown()
6358 if not self.vpp_dead:
6360 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6362 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
6365 class TestNAT66(MethodHolder):
6366 """ NAT66 Test Cases """
6369 def setUpClass(cls):
6370 super(TestNAT66, cls).setUpClass()
6373 cls.nat_addr = 'fd01:ff::2'
6374 cls.nat_addr_n = socket.inet_pton(socket.AF_INET6, cls.nat_addr)
6376 cls.create_pg_interfaces(range(2))
6377 cls.interfaces = list(cls.pg_interfaces)
6379 for i in cls.interfaces:
6382 i.configure_ipv6_neighbors()
6385 super(TestNAT66, cls).tearDownClass()
6388 def test_static(self):
6389 """ 1:1 NAT66 test """
6390 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
6391 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6392 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
6397 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6398 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6401 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6402 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6405 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6406 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6407 ICMPv6EchoRequest())
6409 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6410 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6411 GRE() / IP() / TCP())
6413 self.pg0.add_stream(pkts)
6414 self.pg_enable_capture(self.pg_interfaces)
6416 capture = self.pg1.get_capture(len(pkts))
6417 for packet in capture:
6419 self.assertEqual(packet[IPv6].src, self.nat_addr)
6420 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
6421 if packet.haslayer(TCP):
6422 self.check_tcp_checksum(packet)
6423 elif packet.haslayer(UDP):
6424 self.check_udp_checksum(packet)
6425 elif packet.haslayer(ICMPv6EchoRequest):
6426 self.check_icmpv6_checksum(packet)
6428 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6433 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6434 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6437 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6438 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6441 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6442 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6445 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6446 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6447 GRE() / IP() / TCP())
6449 self.pg1.add_stream(pkts)
6450 self.pg_enable_capture(self.pg_interfaces)
6452 capture = self.pg0.get_capture(len(pkts))
6453 for packet in capture:
6455 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
6456 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6457 if packet.haslayer(TCP):
6458 self.check_tcp_checksum(packet)
6459 elif packet.haslayer(UDP):
6460 self.check_udp_checksum(packet)
6461 elif packet.haslayer(ICMPv6EchoReply):
6462 self.check_icmpv6_checksum(packet)
6464 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6467 sm = self.vapi.nat66_static_mapping_dump()
6468 self.assertEqual(len(sm), 1)
6469 self.assertEqual(sm[0].total_pkts, 8)
6471 def test_check_no_translate(self):
6472 """ NAT66 translate only when egress interface is outside interface """
6473 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
6474 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index)
6475 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
6479 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6480 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6482 self.pg0.add_stream([p])
6483 self.pg_enable_capture(self.pg_interfaces)
6485 capture = self.pg1.get_capture(1)
6488 self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
6489 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
6491 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6494 def clear_nat66(self):
6496 Clear NAT66 configuration.
6498 interfaces = self.vapi.nat66_interface_dump()
6499 for intf in interfaces:
6500 self.vapi.nat66_add_del_interface(intf.sw_if_index,
6504 static_mappings = self.vapi.nat66_static_mapping_dump()
6505 for sm in static_mappings:
6506 self.vapi.nat66_add_del_static_mapping(sm.local_ip_address,
6507 sm.external_ip_address,
6512 super(TestNAT66, self).tearDown()
6513 if not self.vpp_dead:
6514 self.logger.info(self.vapi.cli("show nat66 interfaces"))
6515 self.logger.info(self.vapi.cli("show nat66 static mappings"))
6518 if __name__ == '__main__':
6519 unittest.main(testRunner=VppTestRunner)