9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
12 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
13 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
14 from scapy.layers.l2 import Ether, ARP, GRE
15 from scapy.data import IP_PROTOS
16 from scapy.packet import bind_layers, Raw
17 from scapy.all import fragment6
19 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
20 from time import sleep
21 from util import ip4_range
22 from util import mactobinary
25 class MethodHolder(VppTestCase):
26 """ NAT create capture and verify method holder """
28 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
30 Create packet stream for inside network
32 :param in_if: Inside interface
33 :param out_if: Outside interface
34 :param dst_ip: Destination address
35 :param ttl: TTL of generated packets
38 dst_ip = out_if.remote_ip4
42 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
43 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
44 TCP(sport=self.tcp_port_in, dport=20))
48 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
49 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
50 UDP(sport=self.udp_port_in, dport=20))
54 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
55 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
56 ICMP(id=self.icmp_id_in, type='echo-request'))
61 def compose_ip6(self, ip4, pref, plen):
63 Compose IPv4-embedded IPv6 addresses
65 :param ip4: IPv4 address
66 :param pref: IPv6 prefix
67 :param plen: IPv6 prefix length
68 :returns: IPv4-embedded IPv6 addresses
70 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
71 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
100 pref_n[14] = ip4_n[2]
101 pref_n[15] = ip4_n[3]
102 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
104 def extract_ip4(self, ip6, plen):
106 Extract IPv4 address embedded in IPv6 addresses
108 :param ip6: IPv6 address
109 :param plen: IPv6 prefix length
110 :returns: extracted IPv4 address
112 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
144 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
146 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
148 Create IPv6 packet stream for inside network
150 :param in_if: Inside interface
151 :param out_if: Outside interface
152 :param ttl: Hop Limit of generated packets
153 :param pref: NAT64 prefix
154 :param plen: NAT64 prefix length
158 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
160 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
163 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
165 TCP(sport=self.tcp_port_in, dport=20))
169 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
170 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
171 UDP(sport=self.udp_port_in, dport=20))
175 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
176 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
177 ICMPv6EchoRequest(id=self.icmp_id_in))
182 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
183 use_inside_ports=False):
185 Create packet stream for outside network
187 :param out_if: Outside interface
188 :param dst_ip: Destination IP address (Default use global NAT address)
189 :param ttl: TTL of generated packets
190 :param use_inside_ports: Use inside NAT ports as destination ports
191 instead of outside ports
194 dst_ip = self.nat_addr
195 if not use_inside_ports:
196 tcp_port = self.tcp_port_out
197 udp_port = self.udp_port_out
198 icmp_id = self.icmp_id_out
200 tcp_port = self.tcp_port_in
201 udp_port = self.udp_port_in
202 icmp_id = self.icmp_id_in
205 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
206 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
207 TCP(dport=tcp_port, sport=20))
211 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
212 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
213 UDP(dport=udp_port, sport=20))
217 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
218 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
219 ICMP(id=icmp_id, type='echo-reply'))
224 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
226 Create packet stream for outside network
228 :param out_if: Outside interface
229 :param dst_ip: Destination IP address (Default use global NAT address)
230 :param hl: HL of generated packets
234 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
235 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
236 TCP(dport=self.tcp_port_out, sport=20))
240 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
241 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
242 UDP(dport=self.udp_port_out, sport=20))
246 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
247 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
248 ICMPv6EchoReply(id=self.icmp_id_out))
253 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
254 packet_num=3, dst_ip=None, is_ip6=False):
256 Verify captured packets on outside network
258 :param capture: Captured packets
259 :param nat_ip: Translated IP address (Default use global NAT address)
260 :param same_port: Sorce port number is not translated (Default False)
261 :param packet_num: Expected number of packets (Default 3)
262 :param dst_ip: Destination IP address (Default do not verify)
263 :param is_ip6: If L3 protocol is IPv6 (Default False)
267 ICMP46 = ICMPv6EchoRequest
272 nat_ip = self.nat_addr
273 self.assertEqual(packet_num, len(capture))
274 for packet in capture:
277 self.assert_packet_checksums_valid(packet)
278 self.assertEqual(packet[IP46].src, nat_ip)
279 if dst_ip is not None:
280 self.assertEqual(packet[IP46].dst, dst_ip)
281 if packet.haslayer(TCP):
283 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
286 packet[TCP].sport, self.tcp_port_in)
287 self.tcp_port_out = packet[TCP].sport
288 self.assert_packet_checksums_valid(packet)
289 elif packet.haslayer(UDP):
291 self.assertEqual(packet[UDP].sport, self.udp_port_in)
294 packet[UDP].sport, self.udp_port_in)
295 self.udp_port_out = packet[UDP].sport
298 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
300 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
301 self.icmp_id_out = packet[ICMP46].id
302 self.assert_packet_checksums_valid(packet)
304 self.logger.error(ppp("Unexpected or invalid packet "
305 "(outside network):", packet))
308 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
309 packet_num=3, dst_ip=None):
311 Verify captured packets on outside network
313 :param capture: Captured packets
314 :param nat_ip: Translated IP address
315 :param same_port: Sorce port number is not translated (Default False)
316 :param packet_num: Expected number of packets (Default 3)
317 :param dst_ip: Destination IP address (Default do not verify)
319 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
322 def verify_capture_in(self, capture, in_if, packet_num=3):
324 Verify captured packets on inside network
326 :param capture: Captured packets
327 :param in_if: Inside interface
328 :param packet_num: Expected number of packets (Default 3)
330 self.assertEqual(packet_num, len(capture))
331 for packet in capture:
333 self.assert_packet_checksums_valid(packet)
334 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
335 if packet.haslayer(TCP):
336 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
337 elif packet.haslayer(UDP):
338 self.assertEqual(packet[UDP].dport, self.udp_port_in)
340 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
342 self.logger.error(ppp("Unexpected or invalid packet "
343 "(inside network):", packet))
346 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
348 Verify captured IPv6 packets on inside network
350 :param capture: Captured packets
351 :param src_ip: Source IP
352 :param dst_ip: Destination IP address
353 :param packet_num: Expected number of packets (Default 3)
355 self.assertEqual(packet_num, len(capture))
356 for packet in capture:
358 self.assertEqual(packet[IPv6].src, src_ip)
359 self.assertEqual(packet[IPv6].dst, dst_ip)
360 self.assert_packet_checksums_valid(packet)
361 if packet.haslayer(TCP):
362 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
363 elif packet.haslayer(UDP):
364 self.assertEqual(packet[UDP].dport, self.udp_port_in)
366 self.assertEqual(packet[ICMPv6EchoReply].id,
369 self.logger.error(ppp("Unexpected or invalid packet "
370 "(inside network):", packet))
373 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
375 Verify captured packet that don't have to be translated
377 :param capture: Captured packets
378 :param ingress_if: Ingress interface
379 :param egress_if: Egress interface
381 for packet in capture:
383 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
384 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
385 if packet.haslayer(TCP):
386 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
387 elif packet.haslayer(UDP):
388 self.assertEqual(packet[UDP].sport, self.udp_port_in)
390 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
392 self.logger.error(ppp("Unexpected or invalid packet "
393 "(inside network):", packet))
396 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
397 packet_num=3, icmp_type=11):
399 Verify captured packets with ICMP errors on outside network
401 :param capture: Captured packets
402 :param src_ip: Translated IP address or IP address of VPP
403 (Default use global NAT address)
404 :param packet_num: Expected number of packets (Default 3)
405 :param icmp_type: Type of error ICMP packet
406 we are expecting (Default 11)
409 src_ip = self.nat_addr
410 self.assertEqual(packet_num, len(capture))
411 for packet in capture:
413 self.assertEqual(packet[IP].src, src_ip)
414 self.assertTrue(packet.haslayer(ICMP))
416 self.assertEqual(icmp.type, icmp_type)
417 self.assertTrue(icmp.haslayer(IPerror))
418 inner_ip = icmp[IPerror]
419 if inner_ip.haslayer(TCPerror):
420 self.assertEqual(inner_ip[TCPerror].dport,
422 elif inner_ip.haslayer(UDPerror):
423 self.assertEqual(inner_ip[UDPerror].dport,
426 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
428 self.logger.error(ppp("Unexpected or invalid packet "
429 "(outside network):", packet))
432 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
435 Verify captured packets with ICMP errors on inside network
437 :param capture: Captured packets
438 :param in_if: Inside interface
439 :param packet_num: Expected number of packets (Default 3)
440 :param icmp_type: Type of error ICMP packet
441 we are expecting (Default 11)
443 self.assertEqual(packet_num, len(capture))
444 for packet in capture:
446 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447 self.assertTrue(packet.haslayer(ICMP))
449 self.assertEqual(icmp.type, icmp_type)
450 self.assertTrue(icmp.haslayer(IPerror))
451 inner_ip = icmp[IPerror]
452 if inner_ip.haslayer(TCPerror):
453 self.assertEqual(inner_ip[TCPerror].sport,
455 elif inner_ip.haslayer(UDPerror):
456 self.assertEqual(inner_ip[UDPerror].sport,
459 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
461 self.logger.error(ppp("Unexpected or invalid packet "
462 "(inside network):", packet))
465 def create_stream_frag(self, src_if, dst, sport, dport, data):
467 Create fragmented packet stream
469 :param src_if: Source interface
470 :param dst: Destination IPv4 address
471 :param sport: Source TCP port
472 :param dport: Destination TCP port
473 :param data: Payload data
476 id = random.randint(0, 65535)
477 p = (IP(src=src_if.remote_ip4, dst=dst) /
478 TCP(sport=sport, dport=dport) /
480 p = p.__class__(str(p))
481 chksum = p['TCP'].chksum
483 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
484 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
485 TCP(sport=sport, dport=dport, chksum=chksum) /
488 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
489 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
490 proto=IP_PROTOS.tcp) /
493 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
494 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
500 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
501 pref=None, plen=0, frag_size=128):
503 Create fragmented packet stream
505 :param src_if: Source interface
506 :param dst: Destination IPv4 address
507 :param sport: Source TCP port
508 :param dport: Destination TCP port
509 :param data: Payload data
510 :param pref: NAT64 prefix
511 :param plen: NAT64 prefix length
512 :param fragsize: size of fragments
516 dst_ip6 = ''.join(['64:ff9b::', dst])
518 dst_ip6 = self.compose_ip6(dst, pref, plen)
520 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
521 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
522 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
523 TCP(sport=sport, dport=dport) /
526 return fragment6(p, frag_size)
528 def reass_frags_and_verify(self, frags, src, dst):
530 Reassemble and verify fragmented packet
532 :param frags: Captured fragments
533 :param src: Source IPv4 address to verify
534 :param dst: Destination IPv4 address to verify
536 :returns: Reassembled IPv4 packet
538 buffer = StringIO.StringIO()
540 self.assertEqual(p[IP].src, src)
541 self.assertEqual(p[IP].dst, dst)
542 self.assert_ip_checksum_valid(p)
543 buffer.seek(p[IP].frag * 8)
544 buffer.write(p[IP].payload)
545 ip = frags[0].getlayer(IP)
546 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
547 proto=frags[0][IP].proto)
548 if ip.proto == IP_PROTOS.tcp:
549 p = (ip / TCP(buffer.getvalue()))
550 self.assert_tcp_checksum_valid(p)
551 elif ip.proto == IP_PROTOS.udp:
552 p = (ip / UDP(buffer.getvalue()))
555 def reass_frags_and_verify_ip6(self, frags, src, dst):
557 Reassemble and verify fragmented packet
559 :param frags: Captured fragments
560 :param src: Source IPv6 address to verify
561 :param dst: Destination IPv6 address to verify
563 :returns: Reassembled IPv6 packet
565 buffer = StringIO.StringIO()
567 self.assertEqual(p[IPv6].src, src)
568 self.assertEqual(p[IPv6].dst, dst)
569 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
570 buffer.write(p[IPv6ExtHdrFragment].payload)
571 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
572 nh=frags[0][IPv6ExtHdrFragment].nh)
573 if ip.nh == IP_PROTOS.tcp:
574 p = (ip / TCP(buffer.getvalue()))
575 elif ip.nh == IP_PROTOS.udp:
576 p = (ip / UDP(buffer.getvalue()))
577 self.assert_packet_checksums_valid(p)
580 def initiate_tcp_session(self, in_if, out_if):
582 Initiates TCP session
584 :param in_if: Inside interface
585 :param out_if: Outside interface
589 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
590 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
591 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
594 self.pg_enable_capture(self.pg_interfaces)
596 capture = out_if.get_capture(1)
598 self.tcp_port_out = p[TCP].sport
600 # SYN + ACK packet out->in
601 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
602 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
603 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
606 self.pg_enable_capture(self.pg_interfaces)
611 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
612 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
613 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
616 self.pg_enable_capture(self.pg_interfaces)
618 out_if.get_capture(1)
621 self.logger.error("TCP 3 way handshake failed")
624 def verify_ipfix_nat44_ses(self, data):
626 Verify IPFIX NAT44 session create/delete event
628 :param data: Decoded IPFIX data records
630 nat44_ses_create_num = 0
631 nat44_ses_delete_num = 0
632 self.assertEqual(6, len(data))
635 self.assertIn(ord(record[230]), [4, 5])
636 if ord(record[230]) == 4:
637 nat44_ses_create_num += 1
639 nat44_ses_delete_num += 1
641 self.assertEqual(self.pg0.remote_ip4n, record[8])
642 # postNATSourceIPv4Address
643 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
646 self.assertEqual(struct.pack("!I", 0), record[234])
647 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
648 if IP_PROTOS.icmp == ord(record[4]):
649 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
650 self.assertEqual(struct.pack("!H", self.icmp_id_out),
652 elif IP_PROTOS.tcp == ord(record[4]):
653 self.assertEqual(struct.pack("!H", self.tcp_port_in),
655 self.assertEqual(struct.pack("!H", self.tcp_port_out),
657 elif IP_PROTOS.udp == ord(record[4]):
658 self.assertEqual(struct.pack("!H", self.udp_port_in),
660 self.assertEqual(struct.pack("!H", self.udp_port_out),
663 self.fail("Invalid protocol")
664 self.assertEqual(3, nat44_ses_create_num)
665 self.assertEqual(3, nat44_ses_delete_num)
667 def verify_ipfix_addr_exhausted(self, data):
669 Verify IPFIX NAT addresses event
671 :param data: Decoded IPFIX data records
673 self.assertEqual(1, len(data))
676 self.assertEqual(ord(record[230]), 3)
678 self.assertEqual(struct.pack("!I", 0), record[283])
680 def verify_ipfix_max_sessions(self, data, limit):
682 Verify IPFIX maximum session entries exceeded event
684 :param data: Decoded IPFIX data records
685 :param limit: Number of maximum session entries that can be created.
687 self.assertEqual(1, len(data))
690 self.assertEqual(ord(record[230]), 13)
691 # natQuotaExceededEvent
692 self.assertEqual(struct.pack("I", 1), record[466])
694 self.assertEqual(struct.pack("I", limit), record[471])
696 def verify_ipfix_max_bibs(self, data, limit):
698 Verify IPFIX maximum BIB entries exceeded event
700 :param data: Decoded IPFIX data records
701 :param limit: Number of maximum BIB entries that can be created.
703 self.assertEqual(1, len(data))
706 self.assertEqual(ord(record[230]), 13)
707 # natQuotaExceededEvent
708 self.assertEqual(struct.pack("I", 2), record[466])
710 self.assertEqual(struct.pack("I", limit), record[472])
712 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
714 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
716 :param data: Decoded IPFIX data records
717 :param limit: Number of maximum fragments pending reassembly
718 :param src_addr: IPv6 source address
720 self.assertEqual(1, len(data))
723 self.assertEqual(ord(record[230]), 13)
724 # natQuotaExceededEvent
725 self.assertEqual(struct.pack("I", 5), record[466])
726 # maxFragmentsPendingReassembly
727 self.assertEqual(struct.pack("I", limit), record[475])
729 self.assertEqual(src_addr, record[27])
731 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
733 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
735 :param data: Decoded IPFIX data records
736 :param limit: Number of maximum fragments pending reassembly
737 :param src_addr: IPv4 source address
739 self.assertEqual(1, len(data))
742 self.assertEqual(ord(record[230]), 13)
743 # natQuotaExceededEvent
744 self.assertEqual(struct.pack("I", 5), record[466])
745 # maxFragmentsPendingReassembly
746 self.assertEqual(struct.pack("I", limit), record[475])
748 self.assertEqual(src_addr, record[8])
750 def verify_ipfix_bib(self, data, is_create, src_addr):
752 Verify IPFIX NAT64 BIB create and delete events
754 :param data: Decoded IPFIX data records
755 :param is_create: Create event if nonzero value otherwise delete event
756 :param src_addr: IPv6 source address
758 self.assertEqual(1, len(data))
762 self.assertEqual(ord(record[230]), 10)
764 self.assertEqual(ord(record[230]), 11)
766 self.assertEqual(src_addr, record[27])
767 # postNATSourceIPv4Address
768 self.assertEqual(self.nat_addr_n, record[225])
770 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
772 self.assertEqual(struct.pack("!I", 0), record[234])
773 # sourceTransportPort
774 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
775 # postNAPTSourceTransportPort
776 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
778 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
781 Verify IPFIX NAT64 session create and delete events
783 :param data: Decoded IPFIX data records
784 :param is_create: Create event if nonzero value otherwise delete event
785 :param src_addr: IPv6 source address
786 :param dst_addr: IPv4 destination address
787 :param dst_port: destination TCP port
789 self.assertEqual(1, len(data))
793 self.assertEqual(ord(record[230]), 6)
795 self.assertEqual(ord(record[230]), 7)
797 self.assertEqual(src_addr, record[27])
798 # destinationIPv6Address
799 self.assertEqual(socket.inet_pton(socket.AF_INET6,
800 self.compose_ip6(dst_addr,
804 # postNATSourceIPv4Address
805 self.assertEqual(self.nat_addr_n, record[225])
806 # postNATDestinationIPv4Address
807 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
810 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
812 self.assertEqual(struct.pack("!I", 0), record[234])
813 # sourceTransportPort
814 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
815 # postNAPTSourceTransportPort
816 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
817 # destinationTransportPort
818 self.assertEqual(struct.pack("!H", dst_port), record[11])
819 # postNAPTDestinationTransportPort
820 self.assertEqual(struct.pack("!H", dst_port), record[228])
823 class TestNAT44(MethodHolder):
824 """ NAT44 Test Cases """
828 super(TestNAT44, cls).setUpClass()
829 cls.vapi.cli("set log class nat level debug")
832 cls.tcp_port_in = 6303
833 cls.tcp_port_out = 6303
834 cls.udp_port_in = 6304
835 cls.udp_port_out = 6304
836 cls.icmp_id_in = 6305
837 cls.icmp_id_out = 6305
838 cls.nat_addr = '10.0.0.3'
839 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
840 cls.ipfix_src_port = 4739
841 cls.ipfix_domain_id = 1
842 cls.tcp_external_port = 80
844 cls.create_pg_interfaces(range(10))
845 cls.interfaces = list(cls.pg_interfaces[0:4])
847 for i in cls.interfaces:
852 cls.pg0.generate_remote_hosts(3)
853 cls.pg0.configure_ipv4_neighbors()
855 cls.pg1.generate_remote_hosts(1)
856 cls.pg1.configure_ipv4_neighbors()
858 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
859 cls.vapi.ip_table_add_del(10, is_add=1)
860 cls.vapi.ip_table_add_del(20, is_add=1)
862 cls.pg4._local_ip4 = "172.16.255.1"
863 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
864 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
865 cls.pg4.set_table_ip4(10)
866 cls.pg5._local_ip4 = "172.17.255.3"
867 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
868 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
869 cls.pg5.set_table_ip4(10)
870 cls.pg6._local_ip4 = "172.16.255.1"
871 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
872 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
873 cls.pg6.set_table_ip4(20)
874 for i in cls.overlapping_interfaces:
882 cls.pg9.generate_remote_hosts(2)
884 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
885 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
889 cls.pg9.resolve_arp()
890 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
891 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
892 cls.pg9.resolve_arp()
895 super(TestNAT44, cls).tearDownClass()
898 def clear_nat44(self):
900 Clear NAT44 configuration.
902 # I found no elegant way to do this
903 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
904 dst_address_length=32,
905 next_hop_address=self.pg7.remote_ip4n,
906 next_hop_sw_if_index=self.pg7.sw_if_index,
908 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
909 dst_address_length=32,
910 next_hop_address=self.pg8.remote_ip4n,
911 next_hop_sw_if_index=self.pg8.sw_if_index,
914 for intf in [self.pg7, self.pg8]:
915 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
917 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
922 if self.pg7.has_ip4_config:
923 self.pg7.unconfig_ip4()
925 self.vapi.nat44_forwarding_enable_disable(0)
927 interfaces = self.vapi.nat44_interface_addr_dump()
928 for intf in interfaces:
929 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
930 twice_nat=intf.twice_nat,
933 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
934 domain_id=self.ipfix_domain_id)
935 self.ipfix_src_port = 4739
936 self.ipfix_domain_id = 1
938 interfaces = self.vapi.nat44_interface_dump()
939 for intf in interfaces:
940 if intf.is_inside > 1:
941 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
944 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
948 interfaces = self.vapi.nat44_interface_output_feature_dump()
949 for intf in interfaces:
950 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
954 static_mappings = self.vapi.nat44_static_mapping_dump()
955 for sm in static_mappings:
956 self.vapi.nat44_add_del_static_mapping(
958 sm.external_ip_address,
959 local_port=sm.local_port,
960 external_port=sm.external_port,
961 addr_only=sm.addr_only,
963 protocol=sm.protocol,
964 twice_nat=sm.twice_nat,
965 self_twice_nat=sm.self_twice_nat,
966 out2in_only=sm.out2in_only,
968 external_sw_if_index=sm.external_sw_if_index,
971 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
972 for lb_sm in lb_static_mappings:
973 self.vapi.nat44_add_del_lb_static_mapping(
978 twice_nat=lb_sm.twice_nat,
979 self_twice_nat=lb_sm.self_twice_nat,
980 out2in_only=lb_sm.out2in_only,
986 identity_mappings = self.vapi.nat44_identity_mapping_dump()
987 for id_m in identity_mappings:
988 self.vapi.nat44_add_del_identity_mapping(
989 addr_only=id_m.addr_only,
992 sw_if_index=id_m.sw_if_index,
994 protocol=id_m.protocol,
997 adresses = self.vapi.nat44_address_dump()
998 for addr in adresses:
999 self.vapi.nat44_add_del_address_range(addr.ip_address,
1001 twice_nat=addr.twice_nat,
1004 self.vapi.nat_set_reass()
1005 self.vapi.nat_set_reass(is_ip6=1)
1007 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
1008 local_port=0, external_port=0, vrf_id=0,
1009 is_add=1, external_sw_if_index=0xFFFFFFFF,
1010 proto=0, twice_nat=0, self_twice_nat=0,
1011 out2in_only=0, tag=""):
1013 Add/delete NAT44 static mapping
1015 :param local_ip: Local IP address
1016 :param external_ip: External IP address
1017 :param local_port: Local port number (Optional)
1018 :param external_port: External port number (Optional)
1019 :param vrf_id: VRF ID (Default 0)
1020 :param is_add: 1 if add, 0 if delete (Default add)
1021 :param external_sw_if_index: External interface instead of IP address
1022 :param proto: IP protocol (Mandatory if port specified)
1023 :param twice_nat: 1 if translate external host address and port
1024 :param self_twice_nat: 1 if translate external host address and port
1025 whenever external host address equals
1026 local address of internal host
1027 :param out2in_only: if 1 rule is matching only out2in direction
1028 :param tag: Opaque string tag
1031 if local_port and external_port:
1033 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
1034 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
1035 self.vapi.nat44_add_del_static_mapping(
1038 external_sw_if_index,
1050 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
1052 Add/delete NAT44 address
1054 :param ip: IP address
1055 :param is_add: 1 if add, 0 if delete (Default add)
1056 :param twice_nat: twice NAT address for extenal hosts
1058 nat_addr = socket.inet_pton(socket.AF_INET, ip)
1059 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
1061 twice_nat=twice_nat)
1063 def test_dynamic(self):
1064 """ NAT44 dynamic translation test """
1066 self.nat44_add_address(self.nat_addr)
1067 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1068 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1072 pkts = self.create_stream_in(self.pg0, self.pg1)
1073 self.pg0.add_stream(pkts)
1074 self.pg_enable_capture(self.pg_interfaces)
1076 capture = self.pg1.get_capture(len(pkts))
1077 self.verify_capture_out(capture)
1080 pkts = self.create_stream_out(self.pg1)
1081 self.pg1.add_stream(pkts)
1082 self.pg_enable_capture(self.pg_interfaces)
1084 capture = self.pg0.get_capture(len(pkts))
1085 self.verify_capture_in(capture, self.pg0)
1087 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1088 """ NAT44 handling of client packets with TTL=1 """
1090 self.nat44_add_address(self.nat_addr)
1091 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1092 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1095 # Client side - generate traffic
1096 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1097 self.pg0.add_stream(pkts)
1098 self.pg_enable_capture(self.pg_interfaces)
1101 # Client side - verify ICMP type 11 packets
1102 capture = self.pg0.get_capture(len(pkts))
1103 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1105 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1106 """ NAT44 handling of server packets with TTL=1 """
1108 self.nat44_add_address(self.nat_addr)
1109 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1110 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1113 # Client side - create sessions
1114 pkts = self.create_stream_in(self.pg0, self.pg1)
1115 self.pg0.add_stream(pkts)
1116 self.pg_enable_capture(self.pg_interfaces)
1119 # Server side - generate traffic
1120 capture = self.pg1.get_capture(len(pkts))
1121 self.verify_capture_out(capture)
1122 pkts = self.create_stream_out(self.pg1, ttl=1)
1123 self.pg1.add_stream(pkts)
1124 self.pg_enable_capture(self.pg_interfaces)
1127 # Server side - verify ICMP type 11 packets
1128 capture = self.pg1.get_capture(len(pkts))
1129 self.verify_capture_out_with_icmp_errors(capture,
1130 src_ip=self.pg1.local_ip4)
1132 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1133 """ NAT44 handling of error responses to client packets with TTL=2 """
1135 self.nat44_add_address(self.nat_addr)
1136 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1137 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1140 # Client side - generate traffic
1141 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1142 self.pg0.add_stream(pkts)
1143 self.pg_enable_capture(self.pg_interfaces)
1146 # Server side - simulate ICMP type 11 response
1147 capture = self.pg1.get_capture(len(pkts))
1148 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1149 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1150 ICMP(type=11) / packet[IP] for packet in capture]
1151 self.pg1.add_stream(pkts)
1152 self.pg_enable_capture(self.pg_interfaces)
1155 # Client side - verify ICMP type 11 packets
1156 capture = self.pg0.get_capture(len(pkts))
1157 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1159 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1160 """ NAT44 handling of error responses to server packets with TTL=2 """
1162 self.nat44_add_address(self.nat_addr)
1163 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1164 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1167 # Client side - create sessions
1168 pkts = self.create_stream_in(self.pg0, self.pg1)
1169 self.pg0.add_stream(pkts)
1170 self.pg_enable_capture(self.pg_interfaces)
1173 # Server side - generate traffic
1174 capture = self.pg1.get_capture(len(pkts))
1175 self.verify_capture_out(capture)
1176 pkts = self.create_stream_out(self.pg1, ttl=2)
1177 self.pg1.add_stream(pkts)
1178 self.pg_enable_capture(self.pg_interfaces)
1181 # Client side - simulate ICMP type 11 response
1182 capture = self.pg0.get_capture(len(pkts))
1183 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1184 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1185 ICMP(type=11) / packet[IP] for packet in capture]
1186 self.pg0.add_stream(pkts)
1187 self.pg_enable_capture(self.pg_interfaces)
1190 # Server side - verify ICMP type 11 packets
1191 capture = self.pg1.get_capture(len(pkts))
1192 self.verify_capture_out_with_icmp_errors(capture)
1194 def test_ping_out_interface_from_outside(self):
1195 """ Ping NAT44 out interface from outside network """
1197 self.nat44_add_address(self.nat_addr)
1198 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1199 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1202 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1203 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1204 ICMP(id=self.icmp_id_out, type='echo-request'))
1206 self.pg1.add_stream(pkts)
1207 self.pg_enable_capture(self.pg_interfaces)
1209 capture = self.pg1.get_capture(len(pkts))
1210 self.assertEqual(1, len(capture))
1213 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1214 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1215 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1216 self.assertEqual(packet[ICMP].type, 0) # echo reply
1218 self.logger.error(ppp("Unexpected or invalid packet "
1219 "(outside network):", packet))
1222 def test_ping_internal_host_from_outside(self):
1223 """ Ping internal host from outside network """
1225 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1226 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1227 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1231 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1232 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1233 ICMP(id=self.icmp_id_out, type='echo-request'))
1234 self.pg1.add_stream(pkt)
1235 self.pg_enable_capture(self.pg_interfaces)
1237 capture = self.pg0.get_capture(1)
1238 self.verify_capture_in(capture, self.pg0, packet_num=1)
1239 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1242 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1243 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1244 ICMP(id=self.icmp_id_in, type='echo-reply'))
1245 self.pg0.add_stream(pkt)
1246 self.pg_enable_capture(self.pg_interfaces)
1248 capture = self.pg1.get_capture(1)
1249 self.verify_capture_out(capture, same_port=True, packet_num=1)
1250 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1252 def test_forwarding(self):
1253 """ NAT44 forwarding test """
1255 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1256 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1258 self.vapi.nat44_forwarding_enable_disable(1)
1260 real_ip = self.pg0.remote_ip4n
1261 alias_ip = self.nat_addr_n
1262 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1263 external_ip=alias_ip)
1266 # in2out - static mapping match
1268 pkts = self.create_stream_out(self.pg1)
1269 self.pg1.add_stream(pkts)
1270 self.pg_enable_capture(self.pg_interfaces)
1272 capture = self.pg0.get_capture(len(pkts))
1273 self.verify_capture_in(capture, self.pg0)
1275 pkts = self.create_stream_in(self.pg0, self.pg1)
1276 self.pg0.add_stream(pkts)
1277 self.pg_enable_capture(self.pg_interfaces)
1279 capture = self.pg1.get_capture(len(pkts))
1280 self.verify_capture_out(capture, same_port=True)
1282 # in2out - no static mapping match
1284 host0 = self.pg0.remote_hosts[0]
1285 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1287 pkts = self.create_stream_out(self.pg1,
1288 dst_ip=self.pg0.remote_ip4,
1289 use_inside_ports=True)
1290 self.pg1.add_stream(pkts)
1291 self.pg_enable_capture(self.pg_interfaces)
1293 capture = self.pg0.get_capture(len(pkts))
1294 self.verify_capture_in(capture, self.pg0)
1296 pkts = self.create_stream_in(self.pg0, self.pg1)
1297 self.pg0.add_stream(pkts)
1298 self.pg_enable_capture(self.pg_interfaces)
1300 capture = self.pg1.get_capture(len(pkts))
1301 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1304 self.pg0.remote_hosts[0] = host0
1306 user = self.pg0.remote_hosts[1]
1307 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
1308 self.assertEqual(len(sessions), 3)
1309 self.assertTrue(sessions[0].ext_host_valid)
1310 self.vapi.nat44_del_session(
1311 sessions[0].inside_ip_address,
1312 sessions[0].inside_port,
1313 sessions[0].protocol,
1314 ext_host_address=sessions[0].ext_host_address,
1315 ext_host_port=sessions[0].ext_host_port)
1316 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
1317 self.assertEqual(len(sessions), 2)
1320 self.vapi.nat44_forwarding_enable_disable(0)
1321 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1322 external_ip=alias_ip,
1325 def test_static_in(self):
1326 """ 1:1 NAT initialized from inside network """
1328 nat_ip = "10.0.0.10"
1329 self.tcp_port_out = 6303
1330 self.udp_port_out = 6304
1331 self.icmp_id_out = 6305
1333 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1334 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1335 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1337 sm = self.vapi.nat44_static_mapping_dump()
1338 self.assertEqual(len(sm), 1)
1339 self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1340 self.assertEqual(sm[0].protocol, 0)
1341 self.assertEqual(sm[0].local_port, 0)
1342 self.assertEqual(sm[0].external_port, 0)
1345 pkts = self.create_stream_in(self.pg0, self.pg1)
1346 self.pg0.add_stream(pkts)
1347 self.pg_enable_capture(self.pg_interfaces)
1349 capture = self.pg1.get_capture(len(pkts))
1350 self.verify_capture_out(capture, nat_ip, True)
1353 pkts = self.create_stream_out(self.pg1, nat_ip)
1354 self.pg1.add_stream(pkts)
1355 self.pg_enable_capture(self.pg_interfaces)
1357 capture = self.pg0.get_capture(len(pkts))
1358 self.verify_capture_in(capture, self.pg0)
1360 def test_static_out(self):
1361 """ 1:1 NAT initialized from outside network """
1363 nat_ip = "10.0.0.20"
1364 self.tcp_port_out = 6303
1365 self.udp_port_out = 6304
1366 self.icmp_id_out = 6305
1369 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1370 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1371 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1373 sm = self.vapi.nat44_static_mapping_dump()
1374 self.assertEqual(len(sm), 1)
1375 self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1378 pkts = self.create_stream_out(self.pg1, nat_ip)
1379 self.pg1.add_stream(pkts)
1380 self.pg_enable_capture(self.pg_interfaces)
1382 capture = self.pg0.get_capture(len(pkts))
1383 self.verify_capture_in(capture, self.pg0)
1386 pkts = self.create_stream_in(self.pg0, self.pg1)
1387 self.pg0.add_stream(pkts)
1388 self.pg_enable_capture(self.pg_interfaces)
1390 capture = self.pg1.get_capture(len(pkts))
1391 self.verify_capture_out(capture, nat_ip, True)
1393 def test_static_with_port_in(self):
1394 """ 1:1 NAPT initialized from inside network """
1396 self.tcp_port_out = 3606
1397 self.udp_port_out = 3607
1398 self.icmp_id_out = 3608
1400 self.nat44_add_address(self.nat_addr)
1401 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1402 self.tcp_port_in, self.tcp_port_out,
1403 proto=IP_PROTOS.tcp)
1404 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1405 self.udp_port_in, self.udp_port_out,
1406 proto=IP_PROTOS.udp)
1407 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1408 self.icmp_id_in, self.icmp_id_out,
1409 proto=IP_PROTOS.icmp)
1410 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1411 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1415 pkts = self.create_stream_in(self.pg0, self.pg1)
1416 self.pg0.add_stream(pkts)
1417 self.pg_enable_capture(self.pg_interfaces)
1419 capture = self.pg1.get_capture(len(pkts))
1420 self.verify_capture_out(capture)
1423 pkts = self.create_stream_out(self.pg1)
1424 self.pg1.add_stream(pkts)
1425 self.pg_enable_capture(self.pg_interfaces)
1427 capture = self.pg0.get_capture(len(pkts))
1428 self.verify_capture_in(capture, self.pg0)
1430 def test_static_with_port_out(self):
1431 """ 1:1 NAPT initialized from outside network """
1433 self.tcp_port_out = 30606
1434 self.udp_port_out = 30607
1435 self.icmp_id_out = 30608
1437 self.nat44_add_address(self.nat_addr)
1438 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1439 self.tcp_port_in, self.tcp_port_out,
1440 proto=IP_PROTOS.tcp)
1441 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1442 self.udp_port_in, self.udp_port_out,
1443 proto=IP_PROTOS.udp)
1444 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1445 self.icmp_id_in, self.icmp_id_out,
1446 proto=IP_PROTOS.icmp)
1447 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1448 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1452 pkts = self.create_stream_out(self.pg1)
1453 self.pg1.add_stream(pkts)
1454 self.pg_enable_capture(self.pg_interfaces)
1456 capture = self.pg0.get_capture(len(pkts))
1457 self.verify_capture_in(capture, self.pg0)
1460 pkts = self.create_stream_in(self.pg0, self.pg1)
1461 self.pg0.add_stream(pkts)
1462 self.pg_enable_capture(self.pg_interfaces)
1464 capture = self.pg1.get_capture(len(pkts))
1465 self.verify_capture_out(capture)
1467 def test_static_with_port_out2(self):
1468 """ 1:1 NAPT symmetrical rule """
1473 self.vapi.nat44_forwarding_enable_disable(1)
1474 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1475 local_port, external_port,
1476 proto=IP_PROTOS.tcp, out2in_only=1)
1477 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1478 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1481 # from client to service
1482 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1483 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1484 TCP(sport=12345, dport=external_port))
1485 self.pg1.add_stream(p)
1486 self.pg_enable_capture(self.pg_interfaces)
1488 capture = self.pg0.get_capture(1)
1493 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1494 self.assertEqual(tcp.dport, local_port)
1495 self.assert_packet_checksums_valid(p)
1497 self.logger.error(ppp("Unexpected or invalid packet:", p))
1501 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1502 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1503 ICMP(type=11) / capture[0][IP])
1504 self.pg0.add_stream(p)
1505 self.pg_enable_capture(self.pg_interfaces)
1507 capture = self.pg1.get_capture(1)
1510 self.assertEqual(p[IP].src, self.nat_addr)
1512 self.assertEqual(inner.dst, self.nat_addr)
1513 self.assertEqual(inner[TCPerror].dport, external_port)
1515 self.logger.error(ppp("Unexpected or invalid packet:", p))
1518 # from service back to client
1519 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1520 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1521 TCP(sport=local_port, dport=12345))
1522 self.pg0.add_stream(p)
1523 self.pg_enable_capture(self.pg_interfaces)
1525 capture = self.pg1.get_capture(1)
1530 self.assertEqual(ip.src, self.nat_addr)
1531 self.assertEqual(tcp.sport, external_port)
1532 self.assert_packet_checksums_valid(p)
1534 self.logger.error(ppp("Unexpected or invalid packet:", p))
1538 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1539 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1540 ICMP(type=11) / capture[0][IP])
1541 self.pg1.add_stream(p)
1542 self.pg_enable_capture(self.pg_interfaces)
1544 capture = self.pg0.get_capture(1)
1547 self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
1549 self.assertEqual(inner.src, self.pg0.remote_ip4)
1550 self.assertEqual(inner[TCPerror].sport, local_port)
1552 self.logger.error(ppp("Unexpected or invalid packet:", p))
1555 # from client to server (no translation)
1556 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1557 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1558 TCP(sport=12346, dport=local_port))
1559 self.pg1.add_stream(p)
1560 self.pg_enable_capture(self.pg_interfaces)
1562 capture = self.pg0.get_capture(1)
1567 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1568 self.assertEqual(tcp.dport, local_port)
1569 self.assert_packet_checksums_valid(p)
1571 self.logger.error(ppp("Unexpected or invalid packet:", p))
1574 # from service back to client (no translation)
1575 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1576 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1577 TCP(sport=local_port, dport=12346))
1578 self.pg0.add_stream(p)
1579 self.pg_enable_capture(self.pg_interfaces)
1581 capture = self.pg1.get_capture(1)
1586 self.assertEqual(ip.src, self.pg0.remote_ip4)
1587 self.assertEqual(tcp.sport, local_port)
1588 self.assert_packet_checksums_valid(p)
1590 self.logger.error(ppp("Unexpected or invalid packet:", p))
1593 def test_static_vrf_aware(self):
1594 """ 1:1 NAT VRF awareness """
1596 nat_ip1 = "10.0.0.30"
1597 nat_ip2 = "10.0.0.40"
1598 self.tcp_port_out = 6303
1599 self.udp_port_out = 6304
1600 self.icmp_id_out = 6305
1602 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1604 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1606 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1608 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1609 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1611 # inside interface VRF match NAT44 static mapping VRF
1612 pkts = self.create_stream_in(self.pg4, self.pg3)
1613 self.pg4.add_stream(pkts)
1614 self.pg_enable_capture(self.pg_interfaces)
1616 capture = self.pg3.get_capture(len(pkts))
1617 self.verify_capture_out(capture, nat_ip1, True)
1619 # inside interface VRF don't match NAT44 static mapping VRF (packets
1621 pkts = self.create_stream_in(self.pg0, self.pg3)
1622 self.pg0.add_stream(pkts)
1623 self.pg_enable_capture(self.pg_interfaces)
1625 self.pg3.assert_nothing_captured()
1627 def test_dynamic_to_static(self):
1628 """ Switch from dynamic translation to 1:1NAT """
1629 nat_ip = "10.0.0.10"
1630 self.tcp_port_out = 6303
1631 self.udp_port_out = 6304
1632 self.icmp_id_out = 6305
1634 self.nat44_add_address(self.nat_addr)
1635 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1636 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1640 pkts = self.create_stream_in(self.pg0, self.pg1)
1641 self.pg0.add_stream(pkts)
1642 self.pg_enable_capture(self.pg_interfaces)
1644 capture = self.pg1.get_capture(len(pkts))
1645 self.verify_capture_out(capture)
1648 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1649 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
1650 self.assertEqual(len(sessions), 0)
1651 pkts = self.create_stream_in(self.pg0, self.pg1)
1652 self.pg0.add_stream(pkts)
1653 self.pg_enable_capture(self.pg_interfaces)
1655 capture = self.pg1.get_capture(len(pkts))
1656 self.verify_capture_out(capture, nat_ip, True)
1658 def test_identity_nat(self):
1659 """ Identity NAT """
1661 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1662 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1663 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1666 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1667 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1668 TCP(sport=12345, dport=56789))
1669 self.pg1.add_stream(p)
1670 self.pg_enable_capture(self.pg_interfaces)
1672 capture = self.pg0.get_capture(1)
1677 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1678 self.assertEqual(ip.src, self.pg1.remote_ip4)
1679 self.assertEqual(tcp.dport, 56789)
1680 self.assertEqual(tcp.sport, 12345)
1681 self.assert_packet_checksums_valid(p)
1683 self.logger.error(ppp("Unexpected or invalid packet:", p))
1686 def test_static_lb(self):
1687 """ NAT44 local service load balancing """
1688 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1691 server1 = self.pg0.remote_hosts[0]
1692 server2 = self.pg0.remote_hosts[1]
1694 locals = [{'addr': server1.ip4n,
1697 {'addr': server2.ip4n,
1701 self.nat44_add_address(self.nat_addr)
1702 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1705 local_num=len(locals),
1707 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1708 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1711 # from client to service
1712 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1713 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1714 TCP(sport=12345, dport=external_port))
1715 self.pg1.add_stream(p)
1716 self.pg_enable_capture(self.pg_interfaces)
1718 capture = self.pg0.get_capture(1)
1724 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1725 if ip.dst == server1.ip4:
1729 self.assertEqual(tcp.dport, local_port)
1730 self.assert_packet_checksums_valid(p)
1732 self.logger.error(ppp("Unexpected or invalid packet:", p))
1735 # from service back to client
1736 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1737 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1738 TCP(sport=local_port, dport=12345))
1739 self.pg0.add_stream(p)
1740 self.pg_enable_capture(self.pg_interfaces)
1742 capture = self.pg1.get_capture(1)
1747 self.assertEqual(ip.src, self.nat_addr)
1748 self.assertEqual(tcp.sport, external_port)
1749 self.assert_packet_checksums_valid(p)
1751 self.logger.error(ppp("Unexpected or invalid packet:", p))
1754 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
1755 self.assertEqual(len(sessions), 1)
1756 self.assertTrue(sessions[0].ext_host_valid)
1757 self.vapi.nat44_del_session(
1758 sessions[0].inside_ip_address,
1759 sessions[0].inside_port,
1760 sessions[0].protocol,
1761 ext_host_address=sessions[0].ext_host_address,
1762 ext_host_port=sessions[0].ext_host_port)
1763 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
1764 self.assertEqual(len(sessions), 0)
1766 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1767 def test_static_lb_multi_clients(self):
1768 """ NAT44 local service load balancing - multiple clients"""
1770 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1773 server1 = self.pg0.remote_hosts[0]
1774 server2 = self.pg0.remote_hosts[1]
1776 locals = [{'addr': server1.ip4n,
1779 {'addr': server2.ip4n,
1783 self.nat44_add_address(self.nat_addr)
1784 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1787 local_num=len(locals),
1789 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1790 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1795 clients = ip4_range(self.pg1.remote_ip4, 10, 50)
1797 for client in clients:
1798 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1799 IP(src=client, dst=self.nat_addr) /
1800 TCP(sport=12345, dport=external_port))
1802 self.pg1.add_stream(pkts)
1803 self.pg_enable_capture(self.pg_interfaces)
1805 capture = self.pg0.get_capture(len(pkts))
1807 if p[IP].dst == server1.ip4:
1811 self.assertTrue(server1_n > server2_n)
1813 def test_static_lb_2(self):
1814 """ NAT44 local service load balancing (asymmetrical rule) """
1815 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1818 server1 = self.pg0.remote_hosts[0]
1819 server2 = self.pg0.remote_hosts[1]
1821 locals = [{'addr': server1.ip4n,
1824 {'addr': server2.ip4n,
1828 self.vapi.nat44_forwarding_enable_disable(1)
1829 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1833 local_num=len(locals),
1835 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1836 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1839 # from client to service
1840 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1841 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1842 TCP(sport=12345, dport=external_port))
1843 self.pg1.add_stream(p)
1844 self.pg_enable_capture(self.pg_interfaces)
1846 capture = self.pg0.get_capture(1)
1852 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1853 if ip.dst == server1.ip4:
1857 self.assertEqual(tcp.dport, local_port)
1858 self.assert_packet_checksums_valid(p)
1860 self.logger.error(ppp("Unexpected or invalid packet:", p))
1863 # from service back to client
1864 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1865 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1866 TCP(sport=local_port, dport=12345))
1867 self.pg0.add_stream(p)
1868 self.pg_enable_capture(self.pg_interfaces)
1870 capture = self.pg1.get_capture(1)
1875 self.assertEqual(ip.src, self.nat_addr)
1876 self.assertEqual(tcp.sport, external_port)
1877 self.assert_packet_checksums_valid(p)
1879 self.logger.error(ppp("Unexpected or invalid packet:", p))
1882 # from client to server (no translation)
1883 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1884 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
1885 TCP(sport=12346, dport=local_port))
1886 self.pg1.add_stream(p)
1887 self.pg_enable_capture(self.pg_interfaces)
1889 capture = self.pg0.get_capture(1)
1895 self.assertEqual(ip.dst, server1.ip4)
1896 self.assertEqual(tcp.dport, local_port)
1897 self.assert_packet_checksums_valid(p)
1899 self.logger.error(ppp("Unexpected or invalid packet:", p))
1902 # from service back to client (no translation)
1903 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
1904 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
1905 TCP(sport=local_port, dport=12346))
1906 self.pg0.add_stream(p)
1907 self.pg_enable_capture(self.pg_interfaces)
1909 capture = self.pg1.get_capture(1)
1914 self.assertEqual(ip.src, server1.ip4)
1915 self.assertEqual(tcp.sport, local_port)
1916 self.assert_packet_checksums_valid(p)
1918 self.logger.error(ppp("Unexpected or invalid packet:", p))
1921 def test_multiple_inside_interfaces(self):
1922 """ NAT44 multiple non-overlapping address space inside interfaces """
1924 self.nat44_add_address(self.nat_addr)
1925 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1926 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1927 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1930 # between two NAT44 inside interfaces (no translation)
1931 pkts = self.create_stream_in(self.pg0, self.pg1)
1932 self.pg0.add_stream(pkts)
1933 self.pg_enable_capture(self.pg_interfaces)
1935 capture = self.pg1.get_capture(len(pkts))
1936 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1938 # from NAT44 inside to interface without NAT44 feature (no translation)
1939 pkts = self.create_stream_in(self.pg0, self.pg2)
1940 self.pg0.add_stream(pkts)
1941 self.pg_enable_capture(self.pg_interfaces)
1943 capture = self.pg2.get_capture(len(pkts))
1944 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1946 # in2out 1st interface
1947 pkts = self.create_stream_in(self.pg0, self.pg3)
1948 self.pg0.add_stream(pkts)
1949 self.pg_enable_capture(self.pg_interfaces)
1951 capture = self.pg3.get_capture(len(pkts))
1952 self.verify_capture_out(capture)
1954 # out2in 1st interface
1955 pkts = self.create_stream_out(self.pg3)
1956 self.pg3.add_stream(pkts)
1957 self.pg_enable_capture(self.pg_interfaces)
1959 capture = self.pg0.get_capture(len(pkts))
1960 self.verify_capture_in(capture, self.pg0)
1962 # in2out 2nd interface
1963 pkts = self.create_stream_in(self.pg1, self.pg3)
1964 self.pg1.add_stream(pkts)
1965 self.pg_enable_capture(self.pg_interfaces)
1967 capture = self.pg3.get_capture(len(pkts))
1968 self.verify_capture_out(capture)
1970 # out2in 2nd interface
1971 pkts = self.create_stream_out(self.pg3)
1972 self.pg3.add_stream(pkts)
1973 self.pg_enable_capture(self.pg_interfaces)
1975 capture = self.pg1.get_capture(len(pkts))
1976 self.verify_capture_in(capture, self.pg1)
1978 def test_inside_overlapping_interfaces(self):
1979 """ NAT44 multiple inside interfaces with overlapping address space """
1981 static_nat_ip = "10.0.0.10"
1982 self.nat44_add_address(self.nat_addr)
1983 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1985 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1986 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1987 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1988 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1991 # between NAT44 inside interfaces with same VRF (no translation)
1992 pkts = self.create_stream_in(self.pg4, self.pg5)
1993 self.pg4.add_stream(pkts)
1994 self.pg_enable_capture(self.pg_interfaces)
1996 capture = self.pg5.get_capture(len(pkts))
1997 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1999 # between NAT44 inside interfaces with different VRF (hairpinning)
2000 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
2001 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
2002 TCP(sport=1234, dport=5678))
2003 self.pg4.add_stream(p)
2004 self.pg_enable_capture(self.pg_interfaces)
2006 capture = self.pg6.get_capture(1)
2011 self.assertEqual(ip.src, self.nat_addr)
2012 self.assertEqual(ip.dst, self.pg6.remote_ip4)
2013 self.assertNotEqual(tcp.sport, 1234)
2014 self.assertEqual(tcp.dport, 5678)
2016 self.logger.error(ppp("Unexpected or invalid packet:", p))
2019 # in2out 1st interface
2020 pkts = self.create_stream_in(self.pg4, self.pg3)
2021 self.pg4.add_stream(pkts)
2022 self.pg_enable_capture(self.pg_interfaces)
2024 capture = self.pg3.get_capture(len(pkts))
2025 self.verify_capture_out(capture)
2027 # out2in 1st interface
2028 pkts = self.create_stream_out(self.pg3)
2029 self.pg3.add_stream(pkts)
2030 self.pg_enable_capture(self.pg_interfaces)
2032 capture = self.pg4.get_capture(len(pkts))
2033 self.verify_capture_in(capture, self.pg4)
2035 # in2out 2nd interface
2036 pkts = self.create_stream_in(self.pg5, self.pg3)
2037 self.pg5.add_stream(pkts)
2038 self.pg_enable_capture(self.pg_interfaces)
2040 capture = self.pg3.get_capture(len(pkts))
2041 self.verify_capture_out(capture)
2043 # out2in 2nd interface
2044 pkts = self.create_stream_out(self.pg3)
2045 self.pg3.add_stream(pkts)
2046 self.pg_enable_capture(self.pg_interfaces)
2048 capture = self.pg5.get_capture(len(pkts))
2049 self.verify_capture_in(capture, self.pg5)
2052 addresses = self.vapi.nat44_address_dump()
2053 self.assertEqual(len(addresses), 1)
2054 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
2055 self.assertEqual(len(sessions), 3)
2056 for session in sessions:
2057 self.assertFalse(session.is_static)
2058 self.assertEqual(session.inside_ip_address[0:4],
2059 self.pg5.remote_ip4n)
2060 self.assertEqual(session.outside_ip_address,
2061 addresses[0].ip_address)
2062 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
2063 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
2064 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
2065 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
2066 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
2067 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
2068 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
2069 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
2070 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
2072 # in2out 3rd interface
2073 pkts = self.create_stream_in(self.pg6, self.pg3)
2074 self.pg6.add_stream(pkts)
2075 self.pg_enable_capture(self.pg_interfaces)
2077 capture = self.pg3.get_capture(len(pkts))
2078 self.verify_capture_out(capture, static_nat_ip, True)
2080 # out2in 3rd interface
2081 pkts = self.create_stream_out(self.pg3, static_nat_ip)
2082 self.pg3.add_stream(pkts)
2083 self.pg_enable_capture(self.pg_interfaces)
2085 capture = self.pg6.get_capture(len(pkts))
2086 self.verify_capture_in(capture, self.pg6)
2088 # general user and session dump verifications
2089 users = self.vapi.nat44_user_dump()
2090 self.assertTrue(len(users) >= 3)
2091 addresses = self.vapi.nat44_address_dump()
2092 self.assertEqual(len(addresses), 1)
2094 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
2096 for session in sessions:
2097 self.assertEqual(user.ip_address, session.inside_ip_address)
2098 self.assertTrue(session.total_bytes > session.total_pkts > 0)
2099 self.assertTrue(session.protocol in
2100 [IP_PROTOS.tcp, IP_PROTOS.udp,
2102 self.assertFalse(session.ext_host_valid)
2105 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
2106 self.assertTrue(len(sessions) >= 4)
2107 for session in sessions:
2108 self.assertFalse(session.is_static)
2109 self.assertEqual(session.inside_ip_address[0:4],
2110 self.pg4.remote_ip4n)
2111 self.assertEqual(session.outside_ip_address,
2112 addresses[0].ip_address)
2115 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
2116 self.assertTrue(len(sessions) >= 3)
2117 for session in sessions:
2118 self.assertTrue(session.is_static)
2119 self.assertEqual(session.inside_ip_address[0:4],
2120 self.pg6.remote_ip4n)
2121 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
2122 map(int, static_nat_ip.split('.')))
2123 self.assertTrue(session.inside_port in
2124 [self.tcp_port_in, self.udp_port_in,
2127 def test_hairpinning(self):
2128 """ NAT44 hairpinning - 1:1 NAPT """
2130 host = self.pg0.remote_hosts[0]
2131 server = self.pg0.remote_hosts[1]
2134 server_in_port = 5678
2135 server_out_port = 8765
2137 self.nat44_add_address(self.nat_addr)
2138 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2139 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2141 # add static mapping for server
2142 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2143 server_in_port, server_out_port,
2144 proto=IP_PROTOS.tcp)
2146 # send packet from host to server
2147 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2148 IP(src=host.ip4, dst=self.nat_addr) /
2149 TCP(sport=host_in_port, dport=server_out_port))
2150 self.pg0.add_stream(p)
2151 self.pg_enable_capture(self.pg_interfaces)
2153 capture = self.pg0.get_capture(1)
2158 self.assertEqual(ip.src, self.nat_addr)
2159 self.assertEqual(ip.dst, server.ip4)
2160 self.assertNotEqual(tcp.sport, host_in_port)
2161 self.assertEqual(tcp.dport, server_in_port)
2162 self.assert_packet_checksums_valid(p)
2163 host_out_port = tcp.sport
2165 self.logger.error(ppp("Unexpected or invalid packet:", p))
2168 # send reply from server to host
2169 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2170 IP(src=server.ip4, dst=self.nat_addr) /
2171 TCP(sport=server_in_port, dport=host_out_port))
2172 self.pg0.add_stream(p)
2173 self.pg_enable_capture(self.pg_interfaces)
2175 capture = self.pg0.get_capture(1)
2180 self.assertEqual(ip.src, self.nat_addr)
2181 self.assertEqual(ip.dst, host.ip4)
2182 self.assertEqual(tcp.sport, server_out_port)
2183 self.assertEqual(tcp.dport, host_in_port)
2184 self.assert_packet_checksums_valid(p)
2186 self.logger.error(ppp("Unexpected or invalid packet:", p))
2189 def test_hairpinning2(self):
2190 """ NAT44 hairpinning - 1:1 NAT"""
2192 server1_nat_ip = "10.0.0.10"
2193 server2_nat_ip = "10.0.0.11"
2194 host = self.pg0.remote_hosts[0]
2195 server1 = self.pg0.remote_hosts[1]
2196 server2 = self.pg0.remote_hosts[2]
2197 server_tcp_port = 22
2198 server_udp_port = 20
2200 self.nat44_add_address(self.nat_addr)
2201 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2202 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2205 # add static mapping for servers
2206 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2207 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2211 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2212 IP(src=host.ip4, dst=server1_nat_ip) /
2213 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2215 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2216 IP(src=host.ip4, dst=server1_nat_ip) /
2217 UDP(sport=self.udp_port_in, dport=server_udp_port))
2219 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2220 IP(src=host.ip4, dst=server1_nat_ip) /
2221 ICMP(id=self.icmp_id_in, type='echo-request'))
2223 self.pg0.add_stream(pkts)
2224 self.pg_enable_capture(self.pg_interfaces)
2226 capture = self.pg0.get_capture(len(pkts))
2227 for packet in capture:
2229 self.assertEqual(packet[IP].src, self.nat_addr)
2230 self.assertEqual(packet[IP].dst, server1.ip4)
2231 if packet.haslayer(TCP):
2232 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2233 self.assertEqual(packet[TCP].dport, server_tcp_port)
2234 self.tcp_port_out = packet[TCP].sport
2235 self.assert_packet_checksums_valid(packet)
2236 elif packet.haslayer(UDP):
2237 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2238 self.assertEqual(packet[UDP].dport, server_udp_port)
2239 self.udp_port_out = packet[UDP].sport
2241 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2242 self.icmp_id_out = packet[ICMP].id
2244 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2249 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2250 IP(src=server1.ip4, dst=self.nat_addr) /
2251 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2253 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2254 IP(src=server1.ip4, dst=self.nat_addr) /
2255 UDP(sport=server_udp_port, dport=self.udp_port_out))
2257 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2258 IP(src=server1.ip4, dst=self.nat_addr) /
2259 ICMP(id=self.icmp_id_out, type='echo-reply'))
2261 self.pg0.add_stream(pkts)
2262 self.pg_enable_capture(self.pg_interfaces)
2264 capture = self.pg0.get_capture(len(pkts))
2265 for packet in capture:
2267 self.assertEqual(packet[IP].src, server1_nat_ip)
2268 self.assertEqual(packet[IP].dst, host.ip4)
2269 if packet.haslayer(TCP):
2270 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2271 self.assertEqual(packet[TCP].sport, server_tcp_port)
2272 self.assert_packet_checksums_valid(packet)
2273 elif packet.haslayer(UDP):
2274 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2275 self.assertEqual(packet[UDP].sport, server_udp_port)
2277 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2279 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2282 # server2 to server1
2284 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2285 IP(src=server2.ip4, dst=server1_nat_ip) /
2286 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2288 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2289 IP(src=server2.ip4, dst=server1_nat_ip) /
2290 UDP(sport=self.udp_port_in, dport=server_udp_port))
2292 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2293 IP(src=server2.ip4, dst=server1_nat_ip) /
2294 ICMP(id=self.icmp_id_in, type='echo-request'))
2296 self.pg0.add_stream(pkts)
2297 self.pg_enable_capture(self.pg_interfaces)
2299 capture = self.pg0.get_capture(len(pkts))
2300 for packet in capture:
2302 self.assertEqual(packet[IP].src, server2_nat_ip)
2303 self.assertEqual(packet[IP].dst, server1.ip4)
2304 if packet.haslayer(TCP):
2305 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2306 self.assertEqual(packet[TCP].dport, server_tcp_port)
2307 self.tcp_port_out = packet[TCP].sport
2308 self.assert_packet_checksums_valid(packet)
2309 elif packet.haslayer(UDP):
2310 self.assertEqual(packet[UDP].sport, self.udp_port_in)
2311 self.assertEqual(packet[UDP].dport, server_udp_port)
2312 self.udp_port_out = packet[UDP].sport
2314 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2315 self.icmp_id_out = packet[ICMP].id
2317 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2320 # server1 to server2
2322 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2323 IP(src=server1.ip4, dst=server2_nat_ip) /
2324 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2326 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2327 IP(src=server1.ip4, dst=server2_nat_ip) /
2328 UDP(sport=server_udp_port, dport=self.udp_port_out))
2330 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2331 IP(src=server1.ip4, dst=server2_nat_ip) /
2332 ICMP(id=self.icmp_id_out, type='echo-reply'))
2334 self.pg0.add_stream(pkts)
2335 self.pg_enable_capture(self.pg_interfaces)
2337 capture = self.pg0.get_capture(len(pkts))
2338 for packet in capture:
2340 self.assertEqual(packet[IP].src, server1_nat_ip)
2341 self.assertEqual(packet[IP].dst, server2.ip4)
2342 if packet.haslayer(TCP):
2343 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2344 self.assertEqual(packet[TCP].sport, server_tcp_port)
2345 self.assert_packet_checksums_valid(packet)
2346 elif packet.haslayer(UDP):
2347 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2348 self.assertEqual(packet[UDP].sport, server_udp_port)
2350 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2352 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2355 def test_max_translations_per_user(self):
2356 """ MAX translations per user - recycle the least recently used """
2358 self.nat44_add_address(self.nat_addr)
2359 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2360 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2363 # get maximum number of translations per user
2364 nat44_config = self.vapi.nat_show_config()
2366 # send more than maximum number of translations per user packets
2367 pkts_num = nat44_config.max_translations_per_user + 5
2369 for port in range(0, pkts_num):
2370 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2371 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2372 TCP(sport=1025 + port))
2374 self.pg0.add_stream(pkts)
2375 self.pg_enable_capture(self.pg_interfaces)
2378 # verify number of translated packet
2379 self.pg1.get_capture(pkts_num)
2381 users = self.vapi.nat44_user_dump()
2383 if user.ip_address == self.pg0.remote_ip4n:
2384 self.assertEqual(user.nsessions,
2385 nat44_config.max_translations_per_user)
2386 self.assertEqual(user.nstaticsessions, 0)
2389 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
2391 proto=IP_PROTOS.tcp)
2392 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2393 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2394 TCP(sport=tcp_port))
2395 self.pg0.add_stream(p)
2396 self.pg_enable_capture(self.pg_interfaces)
2398 self.pg1.get_capture(1)
2399 users = self.vapi.nat44_user_dump()
2401 if user.ip_address == self.pg0.remote_ip4n:
2402 self.assertEqual(user.nsessions,
2403 nat44_config.max_translations_per_user - 1)
2404 self.assertEqual(user.nstaticsessions, 1)
2406 def test_interface_addr(self):
2407 """ Acquire NAT44 addresses from interface """
2408 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2410 # no address in NAT pool
2411 adresses = self.vapi.nat44_address_dump()
2412 self.assertEqual(0, len(adresses))
2414 # configure interface address and check NAT address pool
2415 self.pg7.config_ip4()
2416 adresses = self.vapi.nat44_address_dump()
2417 self.assertEqual(1, len(adresses))
2418 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2420 # remove interface address and check NAT address pool
2421 self.pg7.unconfig_ip4()
2422 adresses = self.vapi.nat44_address_dump()
2423 self.assertEqual(0, len(adresses))
2425 def test_interface_addr_static_mapping(self):
2426 """ Static mapping with addresses from interface """
2429 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2430 self.nat44_add_static_mapping(
2432 external_sw_if_index=self.pg7.sw_if_index,
2435 # static mappings with external interface
2436 static_mappings = self.vapi.nat44_static_mapping_dump()
2437 self.assertEqual(1, len(static_mappings))
2438 self.assertEqual(self.pg7.sw_if_index,
2439 static_mappings[0].external_sw_if_index)
2440 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2442 # configure interface address and check static mappings
2443 self.pg7.config_ip4()
2444 static_mappings = self.vapi.nat44_static_mapping_dump()
2445 self.assertEqual(2, len(static_mappings))
2447 for sm in static_mappings:
2448 if sm.external_sw_if_index == 0xFFFFFFFF:
2449 self.assertEqual(sm.external_ip_address[0:4],
2450 self.pg7.local_ip4n)
2451 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2453 self.assertTrue(resolved)
2455 # remove interface address and check static mappings
2456 self.pg7.unconfig_ip4()
2457 static_mappings = self.vapi.nat44_static_mapping_dump()
2458 self.assertEqual(1, len(static_mappings))
2459 self.assertEqual(self.pg7.sw_if_index,
2460 static_mappings[0].external_sw_if_index)
2461 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2463 # configure interface address again and check static mappings
2464 self.pg7.config_ip4()
2465 static_mappings = self.vapi.nat44_static_mapping_dump()
2466 self.assertEqual(2, len(static_mappings))
2468 for sm in static_mappings:
2469 if sm.external_sw_if_index == 0xFFFFFFFF:
2470 self.assertEqual(sm.external_ip_address[0:4],
2471 self.pg7.local_ip4n)
2472 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2474 self.assertTrue(resolved)
2476 # remove static mapping
2477 self.nat44_add_static_mapping(
2479 external_sw_if_index=self.pg7.sw_if_index,
2482 static_mappings = self.vapi.nat44_static_mapping_dump()
2483 self.assertEqual(0, len(static_mappings))
2485 def test_interface_addr_identity_nat(self):
2486 """ Identity NAT with addresses from interface """
2489 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2490 self.vapi.nat44_add_del_identity_mapping(
2491 sw_if_index=self.pg7.sw_if_index,
2493 protocol=IP_PROTOS.tcp,
2496 # identity mappings with external interface
2497 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2498 self.assertEqual(1, len(identity_mappings))
2499 self.assertEqual(self.pg7.sw_if_index,
2500 identity_mappings[0].sw_if_index)
2502 # configure interface address and check identity mappings
2503 self.pg7.config_ip4()
2504 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2506 self.assertEqual(2, len(identity_mappings))
2507 for sm in identity_mappings:
2508 if sm.sw_if_index == 0xFFFFFFFF:
2509 self.assertEqual(identity_mappings[0].ip_address,
2510 self.pg7.local_ip4n)
2511 self.assertEqual(port, identity_mappings[0].port)
2512 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2514 self.assertTrue(resolved)
2516 # remove interface address and check identity mappings
2517 self.pg7.unconfig_ip4()
2518 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2519 self.assertEqual(1, len(identity_mappings))
2520 self.assertEqual(self.pg7.sw_if_index,
2521 identity_mappings[0].sw_if_index)
2523 def test_ipfix_nat44_sess(self):
2524 """ IPFIX logging NAT44 session created/delted """
2525 self.ipfix_domain_id = 10
2526 self.ipfix_src_port = 20202
2527 colector_port = 30303
2528 bind_layers(UDP, IPFIX, dport=30303)
2529 self.nat44_add_address(self.nat_addr)
2530 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2531 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2533 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2534 src_address=self.pg3.local_ip4n,
2536 template_interval=10,
2537 collector_port=colector_port)
2538 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2539 src_port=self.ipfix_src_port)
2541 pkts = self.create_stream_in(self.pg0, self.pg1)
2542 self.pg0.add_stream(pkts)
2543 self.pg_enable_capture(self.pg_interfaces)
2545 capture = self.pg1.get_capture(len(pkts))
2546 self.verify_capture_out(capture)
2547 self.nat44_add_address(self.nat_addr, is_add=0)
2548 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2549 capture = self.pg3.get_capture(9)
2550 ipfix = IPFIXDecoder()
2551 # first load template
2553 self.assertTrue(p.haslayer(IPFIX))
2554 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2555 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2556 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2557 self.assertEqual(p[UDP].dport, colector_port)
2558 self.assertEqual(p[IPFIX].observationDomainID,
2559 self.ipfix_domain_id)
2560 if p.haslayer(Template):
2561 ipfix.add_template(p.getlayer(Template))
2562 # verify events in data set
2564 if p.haslayer(Data):
2565 data = ipfix.decode_data_set(p.getlayer(Set))
2566 self.verify_ipfix_nat44_ses(data)
2568 def test_ipfix_addr_exhausted(self):
2569 """ IPFIX logging NAT addresses exhausted """
2570 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2571 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2573 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2574 src_address=self.pg3.local_ip4n,
2576 template_interval=10)
2577 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2578 src_port=self.ipfix_src_port)
2580 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2581 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2583 self.pg0.add_stream(p)
2584 self.pg_enable_capture(self.pg_interfaces)
2586 self.pg1.assert_nothing_captured()
2588 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2589 capture = self.pg3.get_capture(9)
2590 ipfix = IPFIXDecoder()
2591 # first load template
2593 self.assertTrue(p.haslayer(IPFIX))
2594 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2595 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2596 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2597 self.assertEqual(p[UDP].dport, 4739)
2598 self.assertEqual(p[IPFIX].observationDomainID,
2599 self.ipfix_domain_id)
2600 if p.haslayer(Template):
2601 ipfix.add_template(p.getlayer(Template))
2602 # verify events in data set
2604 if p.haslayer(Data):
2605 data = ipfix.decode_data_set(p.getlayer(Set))
2606 self.verify_ipfix_addr_exhausted(data)
2608 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2609 def test_ipfix_max_sessions(self):
2610 """ IPFIX logging maximum session entries exceeded """
2611 self.nat44_add_address(self.nat_addr)
2612 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2613 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2616 nat44_config = self.vapi.nat_show_config()
2617 max_sessions = 10 * nat44_config.translation_buckets
2620 for i in range(0, max_sessions):
2621 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2622 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2623 IP(src=src, dst=self.pg1.remote_ip4) /
2626 self.pg0.add_stream(pkts)
2627 self.pg_enable_capture(self.pg_interfaces)
2630 self.pg1.get_capture(max_sessions)
2631 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2632 src_address=self.pg3.local_ip4n,
2634 template_interval=10)
2635 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2636 src_port=self.ipfix_src_port)
2638 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2639 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2641 self.pg0.add_stream(p)
2642 self.pg_enable_capture(self.pg_interfaces)
2644 self.pg1.assert_nothing_captured()
2646 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2647 capture = self.pg3.get_capture(9)
2648 ipfix = IPFIXDecoder()
2649 # first load template
2651 self.assertTrue(p.haslayer(IPFIX))
2652 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2653 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2654 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2655 self.assertEqual(p[UDP].dport, 4739)
2656 self.assertEqual(p[IPFIX].observationDomainID,
2657 self.ipfix_domain_id)
2658 if p.haslayer(Template):
2659 ipfix.add_template(p.getlayer(Template))
2660 # verify events in data set
2662 if p.haslayer(Data):
2663 data = ipfix.decode_data_set(p.getlayer(Set))
2664 self.verify_ipfix_max_sessions(data, max_sessions)
2666 def test_pool_addr_fib(self):
2667 """ NAT44 add pool addresses to FIB """
2668 static_addr = '10.0.0.10'
2669 self.nat44_add_address(self.nat_addr)
2670 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2671 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2673 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2676 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2677 ARP(op=ARP.who_has, pdst=self.nat_addr,
2678 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2679 self.pg1.add_stream(p)
2680 self.pg_enable_capture(self.pg_interfaces)
2682 capture = self.pg1.get_capture(1)
2683 self.assertTrue(capture[0].haslayer(ARP))
2684 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2687 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2688 ARP(op=ARP.who_has, pdst=static_addr,
2689 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2690 self.pg1.add_stream(p)
2691 self.pg_enable_capture(self.pg_interfaces)
2693 capture = self.pg1.get_capture(1)
2694 self.assertTrue(capture[0].haslayer(ARP))
2695 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2697 # send ARP to non-NAT44 interface
2698 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2699 ARP(op=ARP.who_has, pdst=self.nat_addr,
2700 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2701 self.pg2.add_stream(p)
2702 self.pg_enable_capture(self.pg_interfaces)
2704 self.pg1.assert_nothing_captured()
2706 # remove addresses and verify
2707 self.nat44_add_address(self.nat_addr, is_add=0)
2708 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2711 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2712 ARP(op=ARP.who_has, pdst=self.nat_addr,
2713 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2714 self.pg1.add_stream(p)
2715 self.pg_enable_capture(self.pg_interfaces)
2717 self.pg1.assert_nothing_captured()
2719 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2720 ARP(op=ARP.who_has, pdst=static_addr,
2721 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2722 self.pg1.add_stream(p)
2723 self.pg_enable_capture(self.pg_interfaces)
2725 self.pg1.assert_nothing_captured()
2727 def test_vrf_mode(self):
2728 """ NAT44 tenant VRF aware address pool mode """
2732 nat_ip1 = "10.0.0.10"
2733 nat_ip2 = "10.0.0.11"
2735 self.pg0.unconfig_ip4()
2736 self.pg1.unconfig_ip4()
2737 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2738 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2739 self.pg0.set_table_ip4(vrf_id1)
2740 self.pg1.set_table_ip4(vrf_id2)
2741 self.pg0.config_ip4()
2742 self.pg1.config_ip4()
2744 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2745 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2746 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2747 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2748 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2752 pkts = self.create_stream_in(self.pg0, self.pg2)
2753 self.pg0.add_stream(pkts)
2754 self.pg_enable_capture(self.pg_interfaces)
2756 capture = self.pg2.get_capture(len(pkts))
2757 self.verify_capture_out(capture, nat_ip1)
2760 pkts = self.create_stream_in(self.pg1, self.pg2)
2761 self.pg1.add_stream(pkts)
2762 self.pg_enable_capture(self.pg_interfaces)
2764 capture = self.pg2.get_capture(len(pkts))
2765 self.verify_capture_out(capture, nat_ip2)
2767 self.pg0.unconfig_ip4()
2768 self.pg1.unconfig_ip4()
2769 self.pg0.set_table_ip4(0)
2770 self.pg1.set_table_ip4(0)
2771 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2772 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2774 def test_vrf_feature_independent(self):
2775 """ NAT44 tenant VRF independent address pool mode """
2777 nat_ip1 = "10.0.0.10"
2778 nat_ip2 = "10.0.0.11"
2780 self.nat44_add_address(nat_ip1)
2781 self.nat44_add_address(nat_ip2, vrf_id=99)
2782 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2783 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2784 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2788 pkts = self.create_stream_in(self.pg0, self.pg2)
2789 self.pg0.add_stream(pkts)
2790 self.pg_enable_capture(self.pg_interfaces)
2792 capture = self.pg2.get_capture(len(pkts))
2793 self.verify_capture_out(capture, nat_ip1)
2796 pkts = self.create_stream_in(self.pg1, self.pg2)
2797 self.pg1.add_stream(pkts)
2798 self.pg_enable_capture(self.pg_interfaces)
2800 capture = self.pg2.get_capture(len(pkts))
2801 self.verify_capture_out(capture, nat_ip1)
2803 def test_dynamic_ipless_interfaces(self):
2804 """ NAT44 interfaces without configured IP address """
2806 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2807 mactobinary(self.pg7.remote_mac),
2808 self.pg7.remote_ip4n,
2810 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2811 mactobinary(self.pg8.remote_mac),
2812 self.pg8.remote_ip4n,
2815 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2816 dst_address_length=32,
2817 next_hop_address=self.pg7.remote_ip4n,
2818 next_hop_sw_if_index=self.pg7.sw_if_index)
2819 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2820 dst_address_length=32,
2821 next_hop_address=self.pg8.remote_ip4n,
2822 next_hop_sw_if_index=self.pg8.sw_if_index)
2824 self.nat44_add_address(self.nat_addr)
2825 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2826 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2830 pkts = self.create_stream_in(self.pg7, self.pg8)
2831 self.pg7.add_stream(pkts)
2832 self.pg_enable_capture(self.pg_interfaces)
2834 capture = self.pg8.get_capture(len(pkts))
2835 self.verify_capture_out(capture)
2838 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2839 self.pg8.add_stream(pkts)
2840 self.pg_enable_capture(self.pg_interfaces)
2842 capture = self.pg7.get_capture(len(pkts))
2843 self.verify_capture_in(capture, self.pg7)
2845 def test_static_ipless_interfaces(self):
2846 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2848 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2849 mactobinary(self.pg7.remote_mac),
2850 self.pg7.remote_ip4n,
2852 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2853 mactobinary(self.pg8.remote_mac),
2854 self.pg8.remote_ip4n,
2857 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2858 dst_address_length=32,
2859 next_hop_address=self.pg7.remote_ip4n,
2860 next_hop_sw_if_index=self.pg7.sw_if_index)
2861 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2862 dst_address_length=32,
2863 next_hop_address=self.pg8.remote_ip4n,
2864 next_hop_sw_if_index=self.pg8.sw_if_index)
2866 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2867 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2868 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2872 pkts = self.create_stream_out(self.pg8)
2873 self.pg8.add_stream(pkts)
2874 self.pg_enable_capture(self.pg_interfaces)
2876 capture = self.pg7.get_capture(len(pkts))
2877 self.verify_capture_in(capture, self.pg7)
2880 pkts = self.create_stream_in(self.pg7, self.pg8)
2881 self.pg7.add_stream(pkts)
2882 self.pg_enable_capture(self.pg_interfaces)
2884 capture = self.pg8.get_capture(len(pkts))
2885 self.verify_capture_out(capture, self.nat_addr, True)
2887 def test_static_with_port_ipless_interfaces(self):
2888 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2890 self.tcp_port_out = 30606
2891 self.udp_port_out = 30607
2892 self.icmp_id_out = 30608
2894 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2895 mactobinary(self.pg7.remote_mac),
2896 self.pg7.remote_ip4n,
2898 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2899 mactobinary(self.pg8.remote_mac),
2900 self.pg8.remote_ip4n,
2903 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2904 dst_address_length=32,
2905 next_hop_address=self.pg7.remote_ip4n,
2906 next_hop_sw_if_index=self.pg7.sw_if_index)
2907 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2908 dst_address_length=32,
2909 next_hop_address=self.pg8.remote_ip4n,
2910 next_hop_sw_if_index=self.pg8.sw_if_index)
2912 self.nat44_add_address(self.nat_addr)
2913 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2914 self.tcp_port_in, self.tcp_port_out,
2915 proto=IP_PROTOS.tcp)
2916 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2917 self.udp_port_in, self.udp_port_out,
2918 proto=IP_PROTOS.udp)
2919 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2920 self.icmp_id_in, self.icmp_id_out,
2921 proto=IP_PROTOS.icmp)
2922 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2923 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2927 pkts = self.create_stream_out(self.pg8)
2928 self.pg8.add_stream(pkts)
2929 self.pg_enable_capture(self.pg_interfaces)
2931 capture = self.pg7.get_capture(len(pkts))
2932 self.verify_capture_in(capture, self.pg7)
2935 pkts = self.create_stream_in(self.pg7, self.pg8)
2936 self.pg7.add_stream(pkts)
2937 self.pg_enable_capture(self.pg_interfaces)
2939 capture = self.pg8.get_capture(len(pkts))
2940 self.verify_capture_out(capture)
2942 def test_static_unknown_proto(self):
2943 """ 1:1 NAT translate packet with unknown protocol """
2944 nat_ip = "10.0.0.10"
2945 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2946 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2947 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2951 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2952 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2954 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2955 TCP(sport=1234, dport=1234))
2956 self.pg0.add_stream(p)
2957 self.pg_enable_capture(self.pg_interfaces)
2959 p = self.pg1.get_capture(1)
2962 self.assertEqual(packet[IP].src, nat_ip)
2963 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2964 self.assertTrue(packet.haslayer(GRE))
2965 self.assert_packet_checksums_valid(packet)
2967 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2971 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2972 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2974 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2975 TCP(sport=1234, dport=1234))
2976 self.pg1.add_stream(p)
2977 self.pg_enable_capture(self.pg_interfaces)
2979 p = self.pg0.get_capture(1)
2982 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2983 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2984 self.assertTrue(packet.haslayer(GRE))
2985 self.assert_packet_checksums_valid(packet)
2987 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2990 def test_hairpinning_static_unknown_proto(self):
2991 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2993 host = self.pg0.remote_hosts[0]
2994 server = self.pg0.remote_hosts[1]
2996 host_nat_ip = "10.0.0.10"
2997 server_nat_ip = "10.0.0.11"
2999 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
3000 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3001 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3002 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3006 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3007 IP(src=host.ip4, dst=server_nat_ip) /
3009 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3010 TCP(sport=1234, dport=1234))
3011 self.pg0.add_stream(p)
3012 self.pg_enable_capture(self.pg_interfaces)
3014 p = self.pg0.get_capture(1)
3017 self.assertEqual(packet[IP].src, host_nat_ip)
3018 self.assertEqual(packet[IP].dst, server.ip4)
3019 self.assertTrue(packet.haslayer(GRE))
3020 self.assert_packet_checksums_valid(packet)
3022 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3026 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3027 IP(src=server.ip4, dst=host_nat_ip) /
3029 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3030 TCP(sport=1234, dport=1234))
3031 self.pg0.add_stream(p)
3032 self.pg_enable_capture(self.pg_interfaces)
3034 p = self.pg0.get_capture(1)
3037 self.assertEqual(packet[IP].src, server_nat_ip)
3038 self.assertEqual(packet[IP].dst, host.ip4)
3039 self.assertTrue(packet.haslayer(GRE))
3040 self.assert_packet_checksums_valid(packet)
3042 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3045 def test_unknown_proto(self):
3046 """ NAT44 translate packet with unknown protocol """
3047 self.nat44_add_address(self.nat_addr)
3048 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3049 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3053 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3054 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3055 TCP(sport=self.tcp_port_in, dport=20))
3056 self.pg0.add_stream(p)
3057 self.pg_enable_capture(self.pg_interfaces)
3059 p = self.pg1.get_capture(1)
3061 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3062 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3064 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3065 TCP(sport=1234, dport=1234))
3066 self.pg0.add_stream(p)
3067 self.pg_enable_capture(self.pg_interfaces)
3069 p = self.pg1.get_capture(1)
3072 self.assertEqual(packet[IP].src, self.nat_addr)
3073 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3074 self.assertTrue(packet.haslayer(GRE))
3075 self.assert_packet_checksums_valid(packet)
3077 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3081 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3082 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3084 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3085 TCP(sport=1234, dport=1234))
3086 self.pg1.add_stream(p)
3087 self.pg_enable_capture(self.pg_interfaces)
3089 p = self.pg0.get_capture(1)
3092 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3093 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3094 self.assertTrue(packet.haslayer(GRE))
3095 self.assert_packet_checksums_valid(packet)
3097 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3100 def test_hairpinning_unknown_proto(self):
3101 """ NAT44 translate packet with unknown protocol - hairpinning """
3102 host = self.pg0.remote_hosts[0]
3103 server = self.pg0.remote_hosts[1]
3105 server_out_port = 8765
3106 server_nat_ip = "10.0.0.11"
3108 self.nat44_add_address(self.nat_addr)
3109 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3110 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3113 # add static mapping for server
3114 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3117 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3118 IP(src=host.ip4, dst=server_nat_ip) /
3119 TCP(sport=host_in_port, dport=server_out_port))
3120 self.pg0.add_stream(p)
3121 self.pg_enable_capture(self.pg_interfaces)
3123 self.pg0.get_capture(1)
3125 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3126 IP(src=host.ip4, dst=server_nat_ip) /
3128 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3129 TCP(sport=1234, dport=1234))
3130 self.pg0.add_stream(p)
3131 self.pg_enable_capture(self.pg_interfaces)
3133 p = self.pg0.get_capture(1)
3136 self.assertEqual(packet[IP].src, self.nat_addr)
3137 self.assertEqual(packet[IP].dst, server.ip4)
3138 self.assertTrue(packet.haslayer(GRE))
3139 self.assert_packet_checksums_valid(packet)
3141 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3145 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3146 IP(src=server.ip4, dst=self.nat_addr) /
3148 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3149 TCP(sport=1234, dport=1234))
3150 self.pg0.add_stream(p)
3151 self.pg_enable_capture(self.pg_interfaces)
3153 p = self.pg0.get_capture(1)
3156 self.assertEqual(packet[IP].src, server_nat_ip)
3157 self.assertEqual(packet[IP].dst, host.ip4)
3158 self.assertTrue(packet.haslayer(GRE))
3159 self.assert_packet_checksums_valid(packet)
3161 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3164 def test_output_feature(self):
3165 """ NAT44 interface output feature (in2out postrouting) """
3166 self.nat44_add_address(self.nat_addr)
3167 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3168 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
3169 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3173 pkts = self.create_stream_in(self.pg0, self.pg3)
3174 self.pg0.add_stream(pkts)
3175 self.pg_enable_capture(self.pg_interfaces)
3177 capture = self.pg3.get_capture(len(pkts))
3178 self.verify_capture_out(capture)
3181 pkts = self.create_stream_out(self.pg3)
3182 self.pg3.add_stream(pkts)
3183 self.pg_enable_capture(self.pg_interfaces)
3185 capture = self.pg0.get_capture(len(pkts))
3186 self.verify_capture_in(capture, self.pg0)
3188 # from non-NAT interface to NAT inside interface
3189 pkts = self.create_stream_in(self.pg2, self.pg0)
3190 self.pg2.add_stream(pkts)
3191 self.pg_enable_capture(self.pg_interfaces)
3193 capture = self.pg0.get_capture(len(pkts))
3194 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3196 def test_output_feature_vrf_aware(self):
3197 """ NAT44 interface output feature VRF aware (in2out postrouting) """
3198 nat_ip_vrf10 = "10.0.0.10"
3199 nat_ip_vrf20 = "10.0.0.20"
3201 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3202 dst_address_length=32,
3203 next_hop_address=self.pg3.remote_ip4n,
3204 next_hop_sw_if_index=self.pg3.sw_if_index,
3206 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3207 dst_address_length=32,
3208 next_hop_address=self.pg3.remote_ip4n,
3209 next_hop_sw_if_index=self.pg3.sw_if_index,
3212 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3213 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3214 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
3215 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
3216 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3220 pkts = self.create_stream_in(self.pg4, self.pg3)
3221 self.pg4.add_stream(pkts)
3222 self.pg_enable_capture(self.pg_interfaces)
3224 capture = self.pg3.get_capture(len(pkts))
3225 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3228 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3229 self.pg3.add_stream(pkts)
3230 self.pg_enable_capture(self.pg_interfaces)
3232 capture = self.pg4.get_capture(len(pkts))
3233 self.verify_capture_in(capture, self.pg4)
3236 pkts = self.create_stream_in(self.pg6, self.pg3)
3237 self.pg6.add_stream(pkts)
3238 self.pg_enable_capture(self.pg_interfaces)
3240 capture = self.pg3.get_capture(len(pkts))
3241 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3244 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3245 self.pg3.add_stream(pkts)
3246 self.pg_enable_capture(self.pg_interfaces)
3248 capture = self.pg6.get_capture(len(pkts))
3249 self.verify_capture_in(capture, self.pg6)
3251 def test_output_feature_hairpinning(self):
3252 """ NAT44 interface output feature hairpinning (in2out postrouting) """
3253 host = self.pg0.remote_hosts[0]
3254 server = self.pg0.remote_hosts[1]
3257 server_in_port = 5678
3258 server_out_port = 8765
3260 self.nat44_add_address(self.nat_addr)
3261 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3262 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3265 # add static mapping for server
3266 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3267 server_in_port, server_out_port,
3268 proto=IP_PROTOS.tcp)
3270 # send packet from host to server
3271 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3272 IP(src=host.ip4, dst=self.nat_addr) /
3273 TCP(sport=host_in_port, dport=server_out_port))
3274 self.pg0.add_stream(p)
3275 self.pg_enable_capture(self.pg_interfaces)
3277 capture = self.pg0.get_capture(1)
3282 self.assertEqual(ip.src, self.nat_addr)
3283 self.assertEqual(ip.dst, server.ip4)
3284 self.assertNotEqual(tcp.sport, host_in_port)
3285 self.assertEqual(tcp.dport, server_in_port)
3286 self.assert_packet_checksums_valid(p)
3287 host_out_port = tcp.sport
3289 self.logger.error(ppp("Unexpected or invalid packet:", p))
3292 # send reply from server to host
3293 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3294 IP(src=server.ip4, dst=self.nat_addr) /
3295 TCP(sport=server_in_port, dport=host_out_port))
3296 self.pg0.add_stream(p)
3297 self.pg_enable_capture(self.pg_interfaces)
3299 capture = self.pg0.get_capture(1)
3304 self.assertEqual(ip.src, self.nat_addr)
3305 self.assertEqual(ip.dst, host.ip4)
3306 self.assertEqual(tcp.sport, server_out_port)
3307 self.assertEqual(tcp.dport, host_in_port)
3308 self.assert_packet_checksums_valid(p)
3310 self.logger.error(ppp("Unexpected or invalid packet:", p))
3313 def test_output_feature_and_service(self):
3314 """ NAT44 interface output feature and services """
3315 external_addr = '1.2.3.4'
3319 self.vapi.nat44_forwarding_enable_disable(1)
3320 self.nat44_add_address(self.nat_addr)
3321 self.vapi.nat44_add_del_identity_mapping(ip=self.pg1.remote_ip4n)
3322 self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
3323 local_port, external_port,
3324 proto=IP_PROTOS.tcp, out2in_only=1)
3325 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3326 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3328 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3331 # from client to service
3332 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3333 IP(src=self.pg1.remote_ip4, dst=external_addr) /
3334 TCP(sport=12345, dport=external_port))
3335 self.pg1.add_stream(p)
3336 self.pg_enable_capture(self.pg_interfaces)
3338 capture = self.pg0.get_capture(1)
3343 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3344 self.assertEqual(tcp.dport, local_port)
3345 self.assert_packet_checksums_valid(p)
3347 self.logger.error(ppp("Unexpected or invalid packet:", p))
3350 # from service back to client
3351 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3352 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3353 TCP(sport=local_port, dport=12345))
3354 self.pg0.add_stream(p)
3355 self.pg_enable_capture(self.pg_interfaces)
3357 capture = self.pg1.get_capture(1)
3362 self.assertEqual(ip.src, external_addr)
3363 self.assertEqual(tcp.sport, external_port)
3364 self.assert_packet_checksums_valid(p)
3366 self.logger.error(ppp("Unexpected or invalid packet:", p))
3369 # from local network host to external network
3370 pkts = self.create_stream_in(self.pg0, self.pg1)
3371 self.pg0.add_stream(pkts)
3372 self.pg_enable_capture(self.pg_interfaces)
3374 capture = self.pg1.get_capture(len(pkts))
3375 self.verify_capture_out(capture)
3376 pkts = self.create_stream_in(self.pg0, self.pg1)
3377 self.pg0.add_stream(pkts)
3378 self.pg_enable_capture(self.pg_interfaces)
3380 capture = self.pg1.get_capture(len(pkts))
3381 self.verify_capture_out(capture)
3383 # from external network back to local network host
3384 pkts = self.create_stream_out(self.pg1)
3385 self.pg1.add_stream(pkts)
3386 self.pg_enable_capture(self.pg_interfaces)
3388 capture = self.pg0.get_capture(len(pkts))
3389 self.verify_capture_in(capture, self.pg0)
3391 def test_output_feature_and_service2(self):
3392 """ NAT44 interface output feature and service host direct access """
3393 self.vapi.nat44_forwarding_enable_disable(1)
3394 self.nat44_add_address(self.nat_addr)
3395 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3398 # session initiaded from service host - translate
3399 pkts = self.create_stream_in(self.pg0, self.pg1)
3400 self.pg0.add_stream(pkts)
3401 self.pg_enable_capture(self.pg_interfaces)
3403 capture = self.pg1.get_capture(len(pkts))
3404 self.verify_capture_out(capture)
3406 pkts = self.create_stream_out(self.pg1)
3407 self.pg1.add_stream(pkts)
3408 self.pg_enable_capture(self.pg_interfaces)
3410 capture = self.pg0.get_capture(len(pkts))
3411 self.verify_capture_in(capture, self.pg0)
3413 # session initiaded from remote host - do not translate
3414 pkts = self.create_stream_out(self.pg1,
3415 self.pg0.remote_ip4,
3416 use_inside_ports=True)
3417 self.pg1.add_stream(pkts)
3418 self.pg_enable_capture(self.pg_interfaces)
3420 capture = self.pg0.get_capture(len(pkts))
3421 self.verify_capture_in(capture, self.pg0)
3423 pkts = self.create_stream_in(self.pg0, self.pg1)
3424 self.pg0.add_stream(pkts)
3425 self.pg_enable_capture(self.pg_interfaces)
3427 capture = self.pg1.get_capture(len(pkts))
3428 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3431 def test_output_feature_and_service3(self):
3432 """ NAT44 interface output feature and DST NAT """
3433 external_addr = '1.2.3.4'
3437 self.vapi.nat44_forwarding_enable_disable(1)
3438 self.nat44_add_address(self.nat_addr)
3439 self.nat44_add_static_mapping(self.pg1.remote_ip4, external_addr,
3440 local_port, external_port,
3441 proto=IP_PROTOS.tcp, out2in_only=1)
3442 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3443 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3445 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3448 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3449 IP(src=self.pg0.remote_ip4, dst=external_addr) /
3450 TCP(sport=12345, dport=external_port))
3451 self.pg0.add_stream(p)
3452 self.pg_enable_capture(self.pg_interfaces)
3454 capture = self.pg1.get_capture(1)
3459 self.assertEqual(ip.src, self.pg0.remote_ip4)
3460 self.assertEqual(tcp.sport, 12345)
3461 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3462 self.assertEqual(tcp.dport, local_port)
3463 self.assert_packet_checksums_valid(p)
3465 self.logger.error(ppp("Unexpected or invalid packet:", p))
3468 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3469 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
3470 TCP(sport=local_port, dport=12345))
3471 self.pg1.add_stream(p)
3472 self.pg_enable_capture(self.pg_interfaces)
3474 capture = self.pg0.get_capture(1)
3479 self.assertEqual(ip.src, external_addr)
3480 self.assertEqual(tcp.sport, external_port)
3481 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3482 self.assertEqual(tcp.dport, 12345)
3483 self.assert_packet_checksums_valid(p)
3485 self.logger.error(ppp("Unexpected or invalid packet:", p))
3488 def test_one_armed_nat44(self):
3489 """ One armed NAT44 """
3490 remote_host = self.pg9.remote_hosts[0]
3491 local_host = self.pg9.remote_hosts[1]
3494 self.nat44_add_address(self.nat_addr)
3495 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3496 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3500 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3501 IP(src=local_host.ip4, dst=remote_host.ip4) /
3502 TCP(sport=12345, dport=80))
3503 self.pg9.add_stream(p)
3504 self.pg_enable_capture(self.pg_interfaces)
3506 capture = self.pg9.get_capture(1)
3511 self.assertEqual(ip.src, self.nat_addr)
3512 self.assertEqual(ip.dst, remote_host.ip4)
3513 self.assertNotEqual(tcp.sport, 12345)
3514 external_port = tcp.sport
3515 self.assertEqual(tcp.dport, 80)
3516 self.assert_packet_checksums_valid(p)
3518 self.logger.error(ppp("Unexpected or invalid packet:", p))
3522 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3523 IP(src=remote_host.ip4, dst=self.nat_addr) /
3524 TCP(sport=80, dport=external_port))
3525 self.pg9.add_stream(p)
3526 self.pg_enable_capture(self.pg_interfaces)
3528 capture = self.pg9.get_capture(1)
3533 self.assertEqual(ip.src, remote_host.ip4)
3534 self.assertEqual(ip.dst, local_host.ip4)
3535 self.assertEqual(tcp.sport, 80)
3536 self.assertEqual(tcp.dport, 12345)
3537 self.assert_packet_checksums_valid(p)
3539 self.logger.error(ppp("Unexpected or invalid packet:", p))
3542 def test_one_armed_nat44_static(self):
3543 """ One armed NAT44 and 1:1 NAPT symmetrical rule """
3544 remote_host = self.pg9.remote_hosts[0]
3545 local_host = self.pg9.remote_hosts[1]
3550 self.vapi.nat44_forwarding_enable_disable(1)
3551 self.nat44_add_address(self.nat_addr, twice_nat=1)
3552 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
3553 local_port, external_port,
3554 proto=IP_PROTOS.tcp, out2in_only=1,
3556 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3557 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3560 # from client to service
3561 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3562 IP(src=remote_host.ip4, dst=self.nat_addr) /
3563 TCP(sport=12345, dport=external_port))
3564 self.pg9.add_stream(p)
3565 self.pg_enable_capture(self.pg_interfaces)
3567 capture = self.pg9.get_capture(1)
3572 self.assertEqual(ip.dst, local_host.ip4)
3573 self.assertEqual(ip.src, self.nat_addr)
3574 self.assertEqual(tcp.dport, local_port)
3575 self.assertNotEqual(tcp.sport, 12345)
3576 eh_port_in = tcp.sport
3577 self.assert_packet_checksums_valid(p)
3579 self.logger.error(ppp("Unexpected or invalid packet:", p))
3582 # from service back to client
3583 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3584 IP(src=local_host.ip4, dst=self.nat_addr) /
3585 TCP(sport=local_port, dport=eh_port_in))
3586 self.pg9.add_stream(p)
3587 self.pg_enable_capture(self.pg_interfaces)
3589 capture = self.pg9.get_capture(1)
3594 self.assertEqual(ip.src, self.nat_addr)
3595 self.assertEqual(ip.dst, remote_host.ip4)
3596 self.assertEqual(tcp.sport, external_port)
3597 self.assertEqual(tcp.dport, 12345)
3598 self.assert_packet_checksums_valid(p)
3600 self.logger.error(ppp("Unexpected or invalid packet:", p))
3603 def test_del_session(self):
3604 """ Delete NAT44 session """
3605 self.nat44_add_address(self.nat_addr)
3606 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3607 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3610 pkts = self.create_stream_in(self.pg0, self.pg1)
3611 self.pg0.add_stream(pkts)
3612 self.pg_enable_capture(self.pg_interfaces)
3614 self.pg1.get_capture(len(pkts))
3616 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3617 nsessions = len(sessions)
3619 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
3620 sessions[0].inside_port,
3621 sessions[0].protocol)
3622 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
3623 sessions[1].outside_port,
3624 sessions[1].protocol,
3627 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3628 self.assertEqual(nsessions - len(sessions), 2)
3630 def test_set_get_reass(self):
3631 """ NAT44 set/get virtual fragmentation reassembly """
3632 reas_cfg1 = self.vapi.nat_get_reass()
3634 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
3635 max_reass=reas_cfg1.ip4_max_reass * 2,
3636 max_frag=reas_cfg1.ip4_max_frag * 2)
3638 reas_cfg2 = self.vapi.nat_get_reass()
3640 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
3641 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
3642 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
3644 self.vapi.nat_set_reass(drop_frag=1)
3645 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
3647 def test_frag_in_order(self):
3648 """ NAT44 translate fragments arriving in order """
3649 self.nat44_add_address(self.nat_addr)
3650 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3651 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3654 data = "A" * 4 + "B" * 16 + "C" * 3
3655 self.tcp_port_in = random.randint(1025, 65535)
3657 reass = self.vapi.nat_reass_dump()
3658 reass_n_start = len(reass)
3661 pkts = self.create_stream_frag(self.pg0,
3662 self.pg1.remote_ip4,
3666 self.pg0.add_stream(pkts)
3667 self.pg_enable_capture(self.pg_interfaces)
3669 frags = self.pg1.get_capture(len(pkts))
3670 p = self.reass_frags_and_verify(frags,
3672 self.pg1.remote_ip4)
3673 self.assertEqual(p[TCP].dport, 20)
3674 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3675 self.tcp_port_out = p[TCP].sport
3676 self.assertEqual(data, p[Raw].load)
3679 pkts = self.create_stream_frag(self.pg1,
3684 self.pg1.add_stream(pkts)
3685 self.pg_enable_capture(self.pg_interfaces)
3687 frags = self.pg0.get_capture(len(pkts))
3688 p = self.reass_frags_and_verify(frags,
3689 self.pg1.remote_ip4,
3690 self.pg0.remote_ip4)
3691 self.assertEqual(p[TCP].sport, 20)
3692 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3693 self.assertEqual(data, p[Raw].load)
3695 reass = self.vapi.nat_reass_dump()
3696 reass_n_end = len(reass)
3698 self.assertEqual(reass_n_end - reass_n_start, 2)
3700 def test_reass_hairpinning(self):
3701 """ NAT44 fragments hairpinning """
3702 server = self.pg0.remote_hosts[1]
3703 host_in_port = random.randint(1025, 65535)
3704 server_in_port = random.randint(1025, 65535)
3705 server_out_port = random.randint(1025, 65535)
3706 data = "A" * 4 + "B" * 16 + "C" * 3
3708 self.nat44_add_address(self.nat_addr)
3709 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3710 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3712 # add static mapping for server
3713 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3714 server_in_port, server_out_port,
3715 proto=IP_PROTOS.tcp)
3717 # send packet from host to server
3718 pkts = self.create_stream_frag(self.pg0,
3723 self.pg0.add_stream(pkts)
3724 self.pg_enable_capture(self.pg_interfaces)
3726 frags = self.pg0.get_capture(len(pkts))
3727 p = self.reass_frags_and_verify(frags,
3730 self.assertNotEqual(p[TCP].sport, host_in_port)
3731 self.assertEqual(p[TCP].dport, server_in_port)
3732 self.assertEqual(data, p[Raw].load)
3734 def test_frag_out_of_order(self):
3735 """ NAT44 translate fragments arriving out of order """
3736 self.nat44_add_address(self.nat_addr)
3737 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3738 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3741 data = "A" * 4 + "B" * 16 + "C" * 3
3742 random.randint(1025, 65535)
3745 pkts = self.create_stream_frag(self.pg0,
3746 self.pg1.remote_ip4,
3751 self.pg0.add_stream(pkts)
3752 self.pg_enable_capture(self.pg_interfaces)
3754 frags = self.pg1.get_capture(len(pkts))
3755 p = self.reass_frags_and_verify(frags,
3757 self.pg1.remote_ip4)
3758 self.assertEqual(p[TCP].dport, 20)
3759 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3760 self.tcp_port_out = p[TCP].sport
3761 self.assertEqual(data, p[Raw].load)
3764 pkts = self.create_stream_frag(self.pg1,
3770 self.pg1.add_stream(pkts)
3771 self.pg_enable_capture(self.pg_interfaces)
3773 frags = self.pg0.get_capture(len(pkts))
3774 p = self.reass_frags_and_verify(frags,
3775 self.pg1.remote_ip4,
3776 self.pg0.remote_ip4)
3777 self.assertEqual(p[TCP].sport, 20)
3778 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3779 self.assertEqual(data, p[Raw].load)
3781 def test_port_restricted(self):
3782 """ Port restricted NAT44 (MAP-E CE) """
3783 self.nat44_add_address(self.nat_addr)
3784 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3785 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3787 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3788 "psid-offset 6 psid-len 6")
3790 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3791 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3792 TCP(sport=4567, dport=22))
3793 self.pg0.add_stream(p)
3794 self.pg_enable_capture(self.pg_interfaces)
3796 capture = self.pg1.get_capture(1)
3801 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3802 self.assertEqual(ip.src, self.nat_addr)
3803 self.assertEqual(tcp.dport, 22)
3804 self.assertNotEqual(tcp.sport, 4567)
3805 self.assertEqual((tcp.sport >> 6) & 63, 10)
3806 self.assert_packet_checksums_valid(p)
3808 self.logger.error(ppp("Unexpected or invalid packet:", p))
3811 def twice_nat_common(self, self_twice_nat=False, same_pg=False, lb=False,
3813 twice_nat_addr = '10.0.1.3'
3821 port_in1 = port_in+1
3822 port_in2 = port_in+2
3827 server1 = self.pg0.remote_hosts[0]
3828 server2 = self.pg0.remote_hosts[1]
3840 eh_translate = ((not self_twice_nat) or (not lb and same_pg) or
3843 self.nat44_add_address(self.nat_addr)
3844 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3846 self.nat44_add_static_mapping(pg0.remote_ip4, self.nat_addr,
3848 proto=IP_PROTOS.tcp,
3849 twice_nat=int(not self_twice_nat),
3850 self_twice_nat=int(self_twice_nat))
3852 locals = [{'addr': server1.ip4n,
3855 {'addr': server2.ip4n,
3858 out_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3859 self.vapi.nat44_add_del_lb_static_mapping(out_addr_n,
3863 not self_twice_nat),
3866 local_num=len(locals),
3868 self.vapi.nat44_interface_add_del_feature(pg0.sw_if_index)
3869 self.vapi.nat44_interface_add_del_feature(pg1.sw_if_index,
3876 assert client_id is not None
3878 client = self.pg0.remote_hosts[0]
3879 elif client_id == 2:
3880 client = self.pg0.remote_hosts[1]
3882 client = pg1.remote_hosts[0]
3883 p = (Ether(src=pg1.remote_mac, dst=pg1.local_mac) /
3884 IP(src=client.ip4, dst=self.nat_addr) /
3885 TCP(sport=eh_port_out, dport=port_out))
3887 self.pg_enable_capture(self.pg_interfaces)
3889 capture = pg0.get_capture(1)
3895 if ip.dst == server1.ip4:
3901 self.assertEqual(ip.dst, server.ip4)
3903 self.assertIn(tcp.dport, [port_in1, port_in2])
3905 self.assertEqual(tcp.dport, port_in)
3907 self.assertEqual(ip.src, twice_nat_addr)
3908 self.assertNotEqual(tcp.sport, eh_port_out)
3910 self.assertEqual(ip.src, client.ip4)
3911 self.assertEqual(tcp.sport, eh_port_out)
3913 eh_port_in = tcp.sport
3914 saved_port_in = tcp.dport
3915 self.assert_packet_checksums_valid(p)
3917 self.logger.error(ppp("Unexpected or invalid packet:", p))
3920 p = (Ether(src=server.mac, dst=pg0.local_mac) /
3921 IP(src=server.ip4, dst=eh_addr_in) /
3922 TCP(sport=saved_port_in, dport=eh_port_in))
3924 self.pg_enable_capture(self.pg_interfaces)
3926 capture = pg1.get_capture(1)
3931 self.assertEqual(ip.dst, client.ip4)
3932 self.assertEqual(ip.src, self.nat_addr)
3933 self.assertEqual(tcp.dport, eh_port_out)
3934 self.assertEqual(tcp.sport, port_out)
3935 self.assert_packet_checksums_valid(p)
3937 self.logger.error(ppp("Unexpected or invalid packet:", p))
3941 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3942 self.assertEqual(len(sessions), 1)
3943 self.assertTrue(sessions[0].ext_host_valid)
3944 self.assertTrue(sessions[0].is_twicenat)
3945 self.vapi.nat44_del_session(
3946 sessions[0].inside_ip_address,
3947 sessions[0].inside_port,
3948 sessions[0].protocol,
3949 ext_host_address=sessions[0].ext_host_nat_address,
3950 ext_host_port=sessions[0].ext_host_nat_port)
3951 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3952 self.assertEqual(len(sessions), 0)
3954 def test_twice_nat(self):
3956 self.twice_nat_common()
3958 def test_self_twice_nat_positive(self):
3959 """ Self Twice NAT44 (positive test) """
3960 self.twice_nat_common(self_twice_nat=True, same_pg=True)
3962 def test_self_twice_nat_negative(self):
3963 """ Self Twice NAT44 (negative test) """
3964 self.twice_nat_common(self_twice_nat=True)
3966 def test_twice_nat_lb(self):
3967 """ Twice NAT44 local service load balancing """
3968 self.twice_nat_common(lb=True)
3970 def test_self_twice_nat_lb_positive(self):
3971 """ Self Twice NAT44 local service load balancing (positive test) """
3972 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
3975 def test_self_twice_nat_lb_negative(self):
3976 """ Self Twice NAT44 local service load balancing (negative test) """
3977 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
3980 def test_twice_nat_interface_addr(self):
3981 """ Acquire twice NAT44 addresses from interface """
3982 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3984 # no address in NAT pool
3985 adresses = self.vapi.nat44_address_dump()
3986 self.assertEqual(0, len(adresses))
3988 # configure interface address and check NAT address pool
3989 self.pg7.config_ip4()
3990 adresses = self.vapi.nat44_address_dump()
3991 self.assertEqual(1, len(adresses))
3992 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3993 self.assertEqual(adresses[0].twice_nat, 1)
3995 # remove interface address and check NAT address pool
3996 self.pg7.unconfig_ip4()
3997 adresses = self.vapi.nat44_address_dump()
3998 self.assertEqual(0, len(adresses))
4000 def test_ipfix_max_frags(self):
4001 """ IPFIX logging maximum fragments pending reassembly exceeded """
4002 self.nat44_add_address(self.nat_addr)
4003 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4004 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4006 self.vapi.nat_set_reass(max_frag=0)
4007 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
4008 src_address=self.pg3.local_ip4n,
4010 template_interval=10)
4011 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
4012 src_port=self.ipfix_src_port)
4014 data = "A" * 4 + "B" * 16 + "C" * 3
4015 self.tcp_port_in = random.randint(1025, 65535)
4016 pkts = self.create_stream_frag(self.pg0,
4017 self.pg1.remote_ip4,
4021 self.pg0.add_stream(pkts[-1])
4022 self.pg_enable_capture(self.pg_interfaces)
4024 self.pg1.assert_nothing_captured()
4026 self.vapi.cli("ipfix flush") # FIXME this should be an API call
4027 capture = self.pg3.get_capture(9)
4028 ipfix = IPFIXDecoder()
4029 # first load template
4031 self.assertTrue(p.haslayer(IPFIX))
4032 self.assertEqual(p[IP].src, self.pg3.local_ip4)
4033 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
4034 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
4035 self.assertEqual(p[UDP].dport, 4739)
4036 self.assertEqual(p[IPFIX].observationDomainID,
4037 self.ipfix_domain_id)
4038 if p.haslayer(Template):
4039 ipfix.add_template(p.getlayer(Template))
4040 # verify events in data set
4042 if p.haslayer(Data):
4043 data = ipfix.decode_data_set(p.getlayer(Set))
4044 self.verify_ipfix_max_fragments_ip4(data, 0,
4045 self.pg0.remote_ip4n)
4047 def test_tcp_session_close_in(self):
4048 """ Close TCP session from inside network """
4049 self.tcp_port_out = 10505
4050 self.nat44_add_address(self.nat_addr)
4051 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4055 proto=IP_PROTOS.tcp,
4057 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4058 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4061 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4062 start_sessnum = len(sessions)
4064 self.initiate_tcp_session(self.pg0, self.pg1)
4066 # FIN packet in -> out
4067 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4068 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4069 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4070 flags="FA", seq=100, ack=300))
4071 self.pg0.add_stream(p)
4072 self.pg_enable_capture(self.pg_interfaces)
4074 self.pg1.get_capture(1)
4078 # ACK packet out -> in
4079 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4080 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4081 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4082 flags="A", seq=300, ack=101))
4085 # FIN packet out -> in
4086 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4087 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4088 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4089 flags="FA", seq=300, ack=101))
4092 self.pg1.add_stream(pkts)
4093 self.pg_enable_capture(self.pg_interfaces)
4095 self.pg0.get_capture(2)
4097 # ACK packet in -> out
4098 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4099 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4100 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4101 flags="A", seq=101, ack=301))
4102 self.pg0.add_stream(p)
4103 self.pg_enable_capture(self.pg_interfaces)
4105 self.pg1.get_capture(1)
4107 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4109 self.assertEqual(len(sessions) - start_sessnum, 0)
4111 def test_tcp_session_close_out(self):
4112 """ Close TCP session from outside network """
4113 self.tcp_port_out = 10505
4114 self.nat44_add_address(self.nat_addr)
4115 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4119 proto=IP_PROTOS.tcp,
4121 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4122 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4125 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4126 start_sessnum = len(sessions)
4128 self.initiate_tcp_session(self.pg0, self.pg1)
4130 # FIN packet out -> in
4131 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4132 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4133 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4134 flags="FA", seq=100, ack=300))
4135 self.pg1.add_stream(p)
4136 self.pg_enable_capture(self.pg_interfaces)
4138 self.pg0.get_capture(1)
4140 # FIN+ACK packet in -> out
4141 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4142 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4143 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4144 flags="FA", seq=300, ack=101))
4146 self.pg0.add_stream(p)
4147 self.pg_enable_capture(self.pg_interfaces)
4149 self.pg1.get_capture(1)
4151 # ACK packet out -> in
4152 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4153 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4154 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4155 flags="A", seq=101, ack=301))
4156 self.pg1.add_stream(p)
4157 self.pg_enable_capture(self.pg_interfaces)
4159 self.pg0.get_capture(1)
4161 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4163 self.assertEqual(len(sessions) - start_sessnum, 0)
4165 def test_tcp_session_close_simultaneous(self):
4166 """ Close TCP session from inside network """
4167 self.tcp_port_out = 10505
4168 self.nat44_add_address(self.nat_addr)
4169 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4173 proto=IP_PROTOS.tcp,
4175 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4176 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4179 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4180 start_sessnum = len(sessions)
4182 self.initiate_tcp_session(self.pg0, self.pg1)
4184 # FIN packet in -> out
4185 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4186 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4187 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4188 flags="FA", seq=100, ack=300))
4189 self.pg0.add_stream(p)
4190 self.pg_enable_capture(self.pg_interfaces)
4192 self.pg1.get_capture(1)
4194 # FIN packet out -> in
4195 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4196 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4197 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4198 flags="FA", seq=300, ack=100))
4199 self.pg1.add_stream(p)
4200 self.pg_enable_capture(self.pg_interfaces)
4202 self.pg0.get_capture(1)
4204 # ACK packet in -> out
4205 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4206 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4207 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4208 flags="A", seq=101, ack=301))
4209 self.pg0.add_stream(p)
4210 self.pg_enable_capture(self.pg_interfaces)
4212 self.pg1.get_capture(1)
4214 # ACK packet out -> in
4215 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4216 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4217 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4218 flags="A", seq=301, ack=101))
4219 self.pg1.add_stream(p)
4220 self.pg_enable_capture(self.pg_interfaces)
4222 self.pg0.get_capture(1)
4224 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4226 self.assertEqual(len(sessions) - start_sessnum, 0)
4229 super(TestNAT44, self).tearDown()
4230 if not self.vpp_dead:
4231 self.logger.info(self.vapi.cli("show nat44 addresses"))
4232 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4233 self.logger.info(self.vapi.cli("show nat44 static mappings"))
4234 self.logger.info(self.vapi.cli("show nat44 interface address"))
4235 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
4236 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
4237 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
4238 self.vapi.cli("nat addr-port-assignment-alg default")
4240 self.vapi.cli("clear logging")
4243 class TestNAT44Out2InDPO(MethodHolder):
4244 """ NAT44 Test Cases using out2in DPO """
4247 def setUpConstants(cls):
4248 super(TestNAT44Out2InDPO, cls).setUpConstants()
4249 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
4252 def setUpClass(cls):
4253 super(TestNAT44Out2InDPO, cls).setUpClass()
4254 cls.vapi.cli("set log class nat level debug")
4257 cls.tcp_port_in = 6303
4258 cls.tcp_port_out = 6303
4259 cls.udp_port_in = 6304
4260 cls.udp_port_out = 6304
4261 cls.icmp_id_in = 6305
4262 cls.icmp_id_out = 6305
4263 cls.nat_addr = '10.0.0.3'
4264 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4265 cls.dst_ip4 = '192.168.70.1'
4267 cls.create_pg_interfaces(range(2))
4270 cls.pg0.config_ip4()
4271 cls.pg0.resolve_arp()
4274 cls.pg1.config_ip6()
4275 cls.pg1.resolve_ndp()
4277 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
4278 dst_address_length=0,
4279 next_hop_address=cls.pg1.remote_ip6n,
4280 next_hop_sw_if_index=cls.pg1.sw_if_index)
4283 super(TestNAT44Out2InDPO, cls).tearDownClass()
4286 def configure_xlat(self):
4287 self.dst_ip6_pfx = '1:2:3::'
4288 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4290 self.dst_ip6_pfx_len = 96
4291 self.src_ip6_pfx = '4:5:6::'
4292 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4294 self.src_ip6_pfx_len = 96
4295 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
4296 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
4297 '\x00\x00\x00\x00', 0, is_translation=1,
4300 def test_464xlat_ce(self):
4301 """ Test 464XLAT CE with NAT44 """
4303 self.configure_xlat()
4305 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4306 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
4308 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4309 self.dst_ip6_pfx_len)
4310 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
4311 self.src_ip6_pfx_len)
4314 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4315 self.pg0.add_stream(pkts)
4316 self.pg_enable_capture(self.pg_interfaces)
4318 capture = self.pg1.get_capture(len(pkts))
4319 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
4322 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
4324 self.pg1.add_stream(pkts)
4325 self.pg_enable_capture(self.pg_interfaces)
4327 capture = self.pg0.get_capture(len(pkts))
4328 self.verify_capture_in(capture, self.pg0)
4330 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4332 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
4333 self.nat_addr_n, is_add=0)
4335 def test_464xlat_ce_no_nat(self):
4336 """ Test 464XLAT CE without NAT44 """
4338 self.configure_xlat()
4340 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4341 self.dst_ip6_pfx_len)
4342 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
4343 self.src_ip6_pfx_len)
4345 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4346 self.pg0.add_stream(pkts)
4347 self.pg_enable_capture(self.pg_interfaces)
4349 capture = self.pg1.get_capture(len(pkts))
4350 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
4351 nat_ip=out_dst_ip6, same_port=True)
4353 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4354 self.pg1.add_stream(pkts)
4355 self.pg_enable_capture(self.pg_interfaces)
4357 capture = self.pg0.get_capture(len(pkts))
4358 self.verify_capture_in(capture, self.pg0)
4361 class TestDeterministicNAT(MethodHolder):
4362 """ Deterministic NAT Test Cases """
4365 def setUpConstants(cls):
4366 super(TestDeterministicNAT, cls).setUpConstants()
4367 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
4370 def setUpClass(cls):
4371 super(TestDeterministicNAT, cls).setUpClass()
4372 cls.vapi.cli("set log class nat level debug")
4375 cls.tcp_port_in = 6303
4376 cls.tcp_external_port = 6303
4377 cls.udp_port_in = 6304
4378 cls.udp_external_port = 6304
4379 cls.icmp_id_in = 6305
4380 cls.nat_addr = '10.0.0.3'
4382 cls.create_pg_interfaces(range(3))
4383 cls.interfaces = list(cls.pg_interfaces)
4385 for i in cls.interfaces:
4390 cls.pg0.generate_remote_hosts(2)
4391 cls.pg0.configure_ipv4_neighbors()
4394 super(TestDeterministicNAT, cls).tearDownClass()
4397 def create_stream_in(self, in_if, out_if, ttl=64):
4399 Create packet stream for inside network
4401 :param in_if: Inside interface
4402 :param out_if: Outside interface
4403 :param ttl: TTL of generated packets
4407 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4408 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4409 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
4413 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4414 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4415 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
4419 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4420 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4421 ICMP(id=self.icmp_id_in, type='echo-request'))
4426 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
4428 Create packet stream for outside network
4430 :param out_if: Outside interface
4431 :param dst_ip: Destination IP address (Default use global NAT address)
4432 :param ttl: TTL of generated packets
4435 dst_ip = self.nat_addr
4438 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4439 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4440 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
4444 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4445 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4446 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
4450 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4451 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4452 ICMP(id=self.icmp_external_id, type='echo-reply'))
4457 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
4459 Verify captured packets on outside network
4461 :param capture: Captured packets
4462 :param nat_ip: Translated IP address (Default use global NAT address)
4463 :param same_port: Sorce port number is not translated (Default False)
4464 :param packet_num: Expected number of packets (Default 3)
4467 nat_ip = self.nat_addr
4468 self.assertEqual(packet_num, len(capture))
4469 for packet in capture:
4471 self.assertEqual(packet[IP].src, nat_ip)
4472 if packet.haslayer(TCP):
4473 self.tcp_port_out = packet[TCP].sport
4474 elif packet.haslayer(UDP):
4475 self.udp_port_out = packet[UDP].sport
4477 self.icmp_external_id = packet[ICMP].id
4479 self.logger.error(ppp("Unexpected or invalid packet "
4480 "(outside network):", packet))
4483 def verify_ipfix_max_entries_per_user(self, data):
4485 Verify IPFIX maximum entries per user exceeded event
4487 :param data: Decoded IPFIX data records
4489 self.assertEqual(1, len(data))
4492 self.assertEqual(ord(record[230]), 13)
4493 # natQuotaExceededEvent
4494 self.assertEqual('\x03\x00\x00\x00', record[466])
4496 self.assertEqual('\xe8\x03\x00\x00', record[473])
4498 self.assertEqual(self.pg0.remote_ip4n, record[8])
4500 def test_deterministic_mode(self):
4501 """ NAT plugin run deterministic mode """
4502 in_addr = '172.16.255.0'
4503 out_addr = '172.17.255.50'
4504 in_addr_t = '172.16.255.20'
4505 in_addr_n = socket.inet_aton(in_addr)
4506 out_addr_n = socket.inet_aton(out_addr)
4507 in_addr_t_n = socket.inet_aton(in_addr_t)
4511 nat_config = self.vapi.nat_show_config()
4512 self.assertEqual(1, nat_config.deterministic)
4514 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
4516 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
4517 self.assertEqual(rep1.out_addr[:4], out_addr_n)
4518 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
4519 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
4521 deterministic_mappings = self.vapi.nat_det_map_dump()
4522 self.assertEqual(len(deterministic_mappings), 1)
4523 dsm = deterministic_mappings[0]
4524 self.assertEqual(in_addr_n, dsm.in_addr[:4])
4525 self.assertEqual(in_plen, dsm.in_plen)
4526 self.assertEqual(out_addr_n, dsm.out_addr[:4])
4527 self.assertEqual(out_plen, dsm.out_plen)
4529 self.clear_nat_det()
4530 deterministic_mappings = self.vapi.nat_det_map_dump()
4531 self.assertEqual(len(deterministic_mappings), 0)
4533 def test_set_timeouts(self):
4534 """ Set deterministic NAT timeouts """
4535 timeouts_before = self.vapi.nat_det_get_timeouts()
4537 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
4538 timeouts_before.tcp_established + 10,
4539 timeouts_before.tcp_transitory + 10,
4540 timeouts_before.icmp + 10)
4542 timeouts_after = self.vapi.nat_det_get_timeouts()
4544 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
4545 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
4546 self.assertNotEqual(timeouts_before.tcp_established,
4547 timeouts_after.tcp_established)
4548 self.assertNotEqual(timeouts_before.tcp_transitory,
4549 timeouts_after.tcp_transitory)
4551 def test_det_in(self):
4552 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
4554 nat_ip = "10.0.0.10"
4556 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4558 socket.inet_aton(nat_ip),
4560 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4561 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4565 pkts = self.create_stream_in(self.pg0, self.pg1)
4566 self.pg0.add_stream(pkts)
4567 self.pg_enable_capture(self.pg_interfaces)
4569 capture = self.pg1.get_capture(len(pkts))
4570 self.verify_capture_out(capture, nat_ip)
4573 pkts = self.create_stream_out(self.pg1, nat_ip)
4574 self.pg1.add_stream(pkts)
4575 self.pg_enable_capture(self.pg_interfaces)
4577 capture = self.pg0.get_capture(len(pkts))
4578 self.verify_capture_in(capture, self.pg0)
4581 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
4582 self.assertEqual(len(sessions), 3)
4586 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4587 self.assertEqual(s.in_port, self.tcp_port_in)
4588 self.assertEqual(s.out_port, self.tcp_port_out)
4589 self.assertEqual(s.ext_port, self.tcp_external_port)
4593 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4594 self.assertEqual(s.in_port, self.udp_port_in)
4595 self.assertEqual(s.out_port, self.udp_port_out)
4596 self.assertEqual(s.ext_port, self.udp_external_port)
4600 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4601 self.assertEqual(s.in_port, self.icmp_id_in)
4602 self.assertEqual(s.out_port, self.icmp_external_id)
4604 def test_multiple_users(self):
4605 """ Deterministic NAT multiple users """
4607 nat_ip = "10.0.0.10"
4609 external_port = 6303
4611 host0 = self.pg0.remote_hosts[0]
4612 host1 = self.pg0.remote_hosts[1]
4614 self.vapi.nat_det_add_del_map(host0.ip4n,
4616 socket.inet_aton(nat_ip),
4618 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4619 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4623 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
4624 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
4625 TCP(sport=port_in, dport=external_port))
4626 self.pg0.add_stream(p)
4627 self.pg_enable_capture(self.pg_interfaces)
4629 capture = self.pg1.get_capture(1)
4634 self.assertEqual(ip.src, nat_ip)
4635 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4636 self.assertEqual(tcp.dport, external_port)
4637 port_out0 = tcp.sport
4639 self.logger.error(ppp("Unexpected or invalid packet:", p))
4643 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
4644 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
4645 TCP(sport=port_in, dport=external_port))
4646 self.pg0.add_stream(p)
4647 self.pg_enable_capture(self.pg_interfaces)
4649 capture = self.pg1.get_capture(1)
4654 self.assertEqual(ip.src, nat_ip)
4655 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4656 self.assertEqual(tcp.dport, external_port)
4657 port_out1 = tcp.sport
4659 self.logger.error(ppp("Unexpected or invalid packet:", p))
4662 dms = self.vapi.nat_det_map_dump()
4663 self.assertEqual(1, len(dms))
4664 self.assertEqual(2, dms[0].ses_num)
4667 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4668 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4669 TCP(sport=external_port, dport=port_out0))
4670 self.pg1.add_stream(p)
4671 self.pg_enable_capture(self.pg_interfaces)
4673 capture = self.pg0.get_capture(1)
4678 self.assertEqual(ip.src, self.pg1.remote_ip4)
4679 self.assertEqual(ip.dst, host0.ip4)
4680 self.assertEqual(tcp.dport, port_in)
4681 self.assertEqual(tcp.sport, external_port)
4683 self.logger.error(ppp("Unexpected or invalid packet:", p))
4687 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4688 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4689 TCP(sport=external_port, dport=port_out1))
4690 self.pg1.add_stream(p)
4691 self.pg_enable_capture(self.pg_interfaces)
4693 capture = self.pg0.get_capture(1)
4698 self.assertEqual(ip.src, self.pg1.remote_ip4)
4699 self.assertEqual(ip.dst, host1.ip4)
4700 self.assertEqual(tcp.dport, port_in)
4701 self.assertEqual(tcp.sport, external_port)
4703 self.logger.error(ppp("Unexpected or invalid packet", p))
4706 # session close api test
4707 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
4709 self.pg1.remote_ip4n,
4711 dms = self.vapi.nat_det_map_dump()
4712 self.assertEqual(dms[0].ses_num, 1)
4714 self.vapi.nat_det_close_session_in(host0.ip4n,
4716 self.pg1.remote_ip4n,
4718 dms = self.vapi.nat_det_map_dump()
4719 self.assertEqual(dms[0].ses_num, 0)
4721 def test_tcp_session_close_detection_in(self):
4722 """ Deterministic NAT TCP session close from inside network """
4723 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4725 socket.inet_aton(self.nat_addr),
4727 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4728 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4731 self.initiate_tcp_session(self.pg0, self.pg1)
4733 # close the session from inside
4735 # FIN packet in -> out
4736 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4737 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4738 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4740 self.pg0.add_stream(p)
4741 self.pg_enable_capture(self.pg_interfaces)
4743 self.pg1.get_capture(1)
4747 # ACK packet out -> in
4748 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4749 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4750 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4754 # FIN packet out -> in
4755 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4756 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4757 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4761 self.pg1.add_stream(pkts)
4762 self.pg_enable_capture(self.pg_interfaces)
4764 self.pg0.get_capture(2)
4766 # ACK packet in -> out
4767 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4768 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4769 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4771 self.pg0.add_stream(p)
4772 self.pg_enable_capture(self.pg_interfaces)
4774 self.pg1.get_capture(1)
4776 # Check if deterministic NAT44 closed the session
4777 dms = self.vapi.nat_det_map_dump()
4778 self.assertEqual(0, dms[0].ses_num)
4780 self.logger.error("TCP session termination failed")
4783 def test_tcp_session_close_detection_out(self):
4784 """ Deterministic NAT TCP session close from outside network """
4785 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4787 socket.inet_aton(self.nat_addr),
4789 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4790 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4793 self.initiate_tcp_session(self.pg0, self.pg1)
4795 # close the session from outside
4797 # FIN packet out -> in
4798 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4799 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4800 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4802 self.pg1.add_stream(p)
4803 self.pg_enable_capture(self.pg_interfaces)
4805 self.pg0.get_capture(1)
4809 # ACK packet in -> out
4810 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4811 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4812 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4816 # ACK packet in -> out
4817 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4818 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4819 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4823 self.pg0.add_stream(pkts)
4824 self.pg_enable_capture(self.pg_interfaces)
4826 self.pg1.get_capture(2)
4828 # ACK packet out -> in
4829 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4830 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4831 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4833 self.pg1.add_stream(p)
4834 self.pg_enable_capture(self.pg_interfaces)
4836 self.pg0.get_capture(1)
4838 # Check if deterministic NAT44 closed the session
4839 dms = self.vapi.nat_det_map_dump()
4840 self.assertEqual(0, dms[0].ses_num)
4842 self.logger.error("TCP session termination failed")
4845 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4846 def test_session_timeout(self):
4847 """ Deterministic NAT session timeouts """
4848 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4850 socket.inet_aton(self.nat_addr),
4852 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4853 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4856 self.initiate_tcp_session(self.pg0, self.pg1)
4857 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
4858 pkts = self.create_stream_in(self.pg0, self.pg1)
4859 self.pg0.add_stream(pkts)
4860 self.pg_enable_capture(self.pg_interfaces)
4862 capture = self.pg1.get_capture(len(pkts))
4865 dms = self.vapi.nat_det_map_dump()
4866 self.assertEqual(0, dms[0].ses_num)
4868 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4869 def test_session_limit_per_user(self):
4870 """ Deterministic NAT maximum sessions per user limit """
4871 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4873 socket.inet_aton(self.nat_addr),
4875 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4876 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4878 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
4879 src_address=self.pg2.local_ip4n,
4881 template_interval=10)
4882 self.vapi.nat_ipfix()
4885 for port in range(1025, 2025):
4886 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4887 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4888 UDP(sport=port, dport=port))
4891 self.pg0.add_stream(pkts)
4892 self.pg_enable_capture(self.pg_interfaces)
4894 capture = self.pg1.get_capture(len(pkts))
4896 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4897 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4898 UDP(sport=3001, dport=3002))
4899 self.pg0.add_stream(p)
4900 self.pg_enable_capture(self.pg_interfaces)
4902 capture = self.pg1.assert_nothing_captured()
4904 # verify ICMP error packet
4905 capture = self.pg0.get_capture(1)
4907 self.assertTrue(p.haslayer(ICMP))
4909 self.assertEqual(icmp.type, 3)
4910 self.assertEqual(icmp.code, 1)
4911 self.assertTrue(icmp.haslayer(IPerror))
4912 inner_ip = icmp[IPerror]
4913 self.assertEqual(inner_ip[UDPerror].sport, 3001)
4914 self.assertEqual(inner_ip[UDPerror].dport, 3002)
4916 dms = self.vapi.nat_det_map_dump()
4918 self.assertEqual(1000, dms[0].ses_num)
4920 # verify IPFIX logging
4921 self.vapi.cli("ipfix flush") # FIXME this should be an API call
4923 capture = self.pg2.get_capture(2)
4924 ipfix = IPFIXDecoder()
4925 # first load template
4927 self.assertTrue(p.haslayer(IPFIX))
4928 if p.haslayer(Template):
4929 ipfix.add_template(p.getlayer(Template))
4930 # verify events in data set
4932 if p.haslayer(Data):
4933 data = ipfix.decode_data_set(p.getlayer(Set))
4934 self.verify_ipfix_max_entries_per_user(data)
4936 def clear_nat_det(self):
4938 Clear deterministic NAT configuration.
4940 self.vapi.nat_ipfix(enable=0)
4941 self.vapi.nat_det_set_timeouts()
4942 deterministic_mappings = self.vapi.nat_det_map_dump()
4943 for dsm in deterministic_mappings:
4944 self.vapi.nat_det_add_del_map(dsm.in_addr,
4950 interfaces = self.vapi.nat44_interface_dump()
4951 for intf in interfaces:
4952 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
4957 super(TestDeterministicNAT, self).tearDown()
4958 if not self.vpp_dead:
4959 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4961 self.vapi.cli("show nat44 deterministic mappings"))
4963 self.vapi.cli("show nat44 deterministic timeouts"))
4965 self.vapi.cli("show nat44 deterministic sessions"))
4966 self.clear_nat_det()
4969 class TestNAT64(MethodHolder):
4970 """ NAT64 Test Cases """
4973 def setUpConstants(cls):
4974 super(TestNAT64, cls).setUpConstants()
4975 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
4976 "nat64 st hash buckets 256", "}"])
4979 def setUpClass(cls):
4980 super(TestNAT64, cls).setUpClass()
4983 cls.tcp_port_in = 6303
4984 cls.tcp_port_out = 6303
4985 cls.udp_port_in = 6304
4986 cls.udp_port_out = 6304
4987 cls.icmp_id_in = 6305
4988 cls.icmp_id_out = 6305
4989 cls.nat_addr = '10.0.0.3'
4990 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4992 cls.vrf1_nat_addr = '10.0.10.3'
4993 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
4995 cls.ipfix_src_port = 4739
4996 cls.ipfix_domain_id = 1
4998 cls.create_pg_interfaces(range(5))
4999 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
5000 cls.ip6_interfaces.append(cls.pg_interfaces[2])
5001 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
5003 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
5005 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
5007 cls.pg0.generate_remote_hosts(2)
5009 for i in cls.ip6_interfaces:
5012 i.configure_ipv6_neighbors()
5014 for i in cls.ip4_interfaces:
5020 cls.pg3.config_ip4()
5021 cls.pg3.resolve_arp()
5022 cls.pg3.config_ip6()
5023 cls.pg3.configure_ipv6_neighbors()
5026 super(TestNAT64, cls).tearDownClass()
5029 def test_pool(self):
5030 """ Add/delete address to NAT64 pool """
5031 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
5033 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
5035 addresses = self.vapi.nat64_pool_addr_dump()
5036 self.assertEqual(len(addresses), 1)
5037 self.assertEqual(addresses[0].address, nat_addr)
5039 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
5041 addresses = self.vapi.nat64_pool_addr_dump()
5042 self.assertEqual(len(addresses), 0)
5044 def test_interface(self):
5045 """ Enable/disable NAT64 feature on the interface """
5046 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5047 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5049 interfaces = self.vapi.nat64_interface_dump()
5050 self.assertEqual(len(interfaces), 2)
5053 for intf in interfaces:
5054 if intf.sw_if_index == self.pg0.sw_if_index:
5055 self.assertEqual(intf.is_inside, 1)
5057 elif intf.sw_if_index == self.pg1.sw_if_index:
5058 self.assertEqual(intf.is_inside, 0)
5060 self.assertTrue(pg0_found)
5061 self.assertTrue(pg1_found)
5063 features = self.vapi.cli("show interface features pg0")
5064 self.assertNotEqual(features.find('nat64-in2out'), -1)
5065 features = self.vapi.cli("show interface features pg1")
5066 self.assertNotEqual(features.find('nat64-out2in'), -1)
5068 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
5069 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
5071 interfaces = self.vapi.nat64_interface_dump()
5072 self.assertEqual(len(interfaces), 0)
5074 def test_static_bib(self):
5075 """ Add/delete static BIB entry """
5076 in_addr = socket.inet_pton(socket.AF_INET6,
5077 '2001:db8:85a3::8a2e:370:7334')
5078 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
5081 proto = IP_PROTOS.tcp
5083 self.vapi.nat64_add_del_static_bib(in_addr,
5088 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
5093 self.assertEqual(bibe.i_addr, in_addr)
5094 self.assertEqual(bibe.o_addr, out_addr)
5095 self.assertEqual(bibe.i_port, in_port)
5096 self.assertEqual(bibe.o_port, out_port)
5097 self.assertEqual(static_bib_num, 1)
5099 self.vapi.nat64_add_del_static_bib(in_addr,
5105 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
5110 self.assertEqual(static_bib_num, 0)
5112 def test_set_timeouts(self):
5113 """ Set NAT64 timeouts """
5114 # verify default values
5115 timeouts = self.vapi.nat64_get_timeouts()
5116 self.assertEqual(timeouts.udp, 300)
5117 self.assertEqual(timeouts.icmp, 60)
5118 self.assertEqual(timeouts.tcp_trans, 240)
5119 self.assertEqual(timeouts.tcp_est, 7440)
5120 self.assertEqual(timeouts.tcp_incoming_syn, 6)
5122 # set and verify custom values
5123 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
5124 tcp_est=7450, tcp_incoming_syn=10)
5125 timeouts = self.vapi.nat64_get_timeouts()
5126 self.assertEqual(timeouts.udp, 200)
5127 self.assertEqual(timeouts.icmp, 30)
5128 self.assertEqual(timeouts.tcp_trans, 250)
5129 self.assertEqual(timeouts.tcp_est, 7450)
5130 self.assertEqual(timeouts.tcp_incoming_syn, 10)
5132 def test_dynamic(self):
5133 """ NAT64 dynamic translation test """
5134 self.tcp_port_in = 6303
5135 self.udp_port_in = 6304
5136 self.icmp_id_in = 6305
5138 ses_num_start = self.nat64_get_ses_num()
5140 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5142 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5143 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5146 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5147 self.pg0.add_stream(pkts)
5148 self.pg_enable_capture(self.pg_interfaces)
5150 capture = self.pg1.get_capture(len(pkts))
5151 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5152 dst_ip=self.pg1.remote_ip4)
5155 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5156 self.pg1.add_stream(pkts)
5157 self.pg_enable_capture(self.pg_interfaces)
5159 capture = self.pg0.get_capture(len(pkts))
5160 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5161 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5164 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5165 self.pg0.add_stream(pkts)
5166 self.pg_enable_capture(self.pg_interfaces)
5168 capture = self.pg1.get_capture(len(pkts))
5169 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5170 dst_ip=self.pg1.remote_ip4)
5173 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5174 self.pg1.add_stream(pkts)
5175 self.pg_enable_capture(self.pg_interfaces)
5177 capture = self.pg0.get_capture(len(pkts))
5178 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5180 ses_num_end = self.nat64_get_ses_num()
5182 self.assertEqual(ses_num_end - ses_num_start, 3)
5184 # tenant with specific VRF
5185 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5186 self.vrf1_nat_addr_n,
5187 vrf_id=self.vrf1_id)
5188 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5190 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
5191 self.pg2.add_stream(pkts)
5192 self.pg_enable_capture(self.pg_interfaces)
5194 capture = self.pg1.get_capture(len(pkts))
5195 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5196 dst_ip=self.pg1.remote_ip4)
5198 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5199 self.pg1.add_stream(pkts)
5200 self.pg_enable_capture(self.pg_interfaces)
5202 capture = self.pg2.get_capture(len(pkts))
5203 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
5205 def test_static(self):
5206 """ NAT64 static translation test """
5207 self.tcp_port_in = 60303
5208 self.udp_port_in = 60304
5209 self.icmp_id_in = 60305
5210 self.tcp_port_out = 60303
5211 self.udp_port_out = 60304
5212 self.icmp_id_out = 60305
5214 ses_num_start = self.nat64_get_ses_num()
5216 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5218 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5219 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5221 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5226 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5231 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5238 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5239 self.pg0.add_stream(pkts)
5240 self.pg_enable_capture(self.pg_interfaces)
5242 capture = self.pg1.get_capture(len(pkts))
5243 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5244 dst_ip=self.pg1.remote_ip4, same_port=True)
5247 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5248 self.pg1.add_stream(pkts)
5249 self.pg_enable_capture(self.pg_interfaces)
5251 capture = self.pg0.get_capture(len(pkts))
5252 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5253 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5255 ses_num_end = self.nat64_get_ses_num()
5257 self.assertEqual(ses_num_end - ses_num_start, 3)
5259 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5260 def test_session_timeout(self):
5261 """ NAT64 session timeout """
5262 self.icmp_id_in = 1234
5263 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5265 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5266 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5267 self.vapi.nat64_set_timeouts(icmp=5)
5269 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5270 self.pg0.add_stream(pkts)
5271 self.pg_enable_capture(self.pg_interfaces)
5273 capture = self.pg1.get_capture(len(pkts))
5275 ses_num_before_timeout = self.nat64_get_ses_num()
5279 # ICMP session after timeout
5280 ses_num_after_timeout = self.nat64_get_ses_num()
5281 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
5283 def test_icmp_error(self):
5284 """ NAT64 ICMP Error message translation """
5285 self.tcp_port_in = 6303
5286 self.udp_port_in = 6304
5287 self.icmp_id_in = 6305
5289 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5291 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5292 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5294 # send some packets to create sessions
5295 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5296 self.pg0.add_stream(pkts)
5297 self.pg_enable_capture(self.pg_interfaces)
5299 capture_ip4 = self.pg1.get_capture(len(pkts))
5300 self.verify_capture_out(capture_ip4,
5301 nat_ip=self.nat_addr,
5302 dst_ip=self.pg1.remote_ip4)
5304 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5305 self.pg1.add_stream(pkts)
5306 self.pg_enable_capture(self.pg_interfaces)
5308 capture_ip6 = self.pg0.get_capture(len(pkts))
5309 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5310 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
5311 self.pg0.remote_ip6)
5314 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5315 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
5316 ICMPv6DestUnreach(code=1) /
5317 packet[IPv6] for packet in capture_ip6]
5318 self.pg0.add_stream(pkts)
5319 self.pg_enable_capture(self.pg_interfaces)
5321 capture = self.pg1.get_capture(len(pkts))
5322 for packet in capture:
5324 self.assertEqual(packet[IP].src, self.nat_addr)
5325 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5326 self.assertEqual(packet[ICMP].type, 3)
5327 self.assertEqual(packet[ICMP].code, 13)
5328 inner = packet[IPerror]
5329 self.assertEqual(inner.src, self.pg1.remote_ip4)
5330 self.assertEqual(inner.dst, self.nat_addr)
5331 self.assert_packet_checksums_valid(packet)
5332 if inner.haslayer(TCPerror):
5333 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
5334 elif inner.haslayer(UDPerror):
5335 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
5337 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
5339 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5343 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5344 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5345 ICMP(type=3, code=13) /
5346 packet[IP] for packet in capture_ip4]
5347 self.pg1.add_stream(pkts)
5348 self.pg_enable_capture(self.pg_interfaces)
5350 capture = self.pg0.get_capture(len(pkts))
5351 for packet in capture:
5353 self.assertEqual(packet[IPv6].src, ip.src)
5354 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5355 icmp = packet[ICMPv6DestUnreach]
5356 self.assertEqual(icmp.code, 1)
5357 inner = icmp[IPerror6]
5358 self.assertEqual(inner.src, self.pg0.remote_ip6)
5359 self.assertEqual(inner.dst, ip.src)
5360 self.assert_icmpv6_checksum_valid(packet)
5361 if inner.haslayer(TCPerror):
5362 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
5363 elif inner.haslayer(UDPerror):
5364 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
5366 self.assertEqual(inner[ICMPv6EchoRequest].id,
5369 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5372 def test_hairpinning(self):
5373 """ NAT64 hairpinning """
5375 client = self.pg0.remote_hosts[0]
5376 server = self.pg0.remote_hosts[1]
5377 server_tcp_in_port = 22
5378 server_tcp_out_port = 4022
5379 server_udp_in_port = 23
5380 server_udp_out_port = 4023
5381 client_tcp_in_port = 1234
5382 client_udp_in_port = 1235
5383 client_tcp_out_port = 0
5384 client_udp_out_port = 0
5385 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5386 nat_addr_ip6 = ip.src
5388 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5390 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5391 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5393 self.vapi.nat64_add_del_static_bib(server.ip6n,
5396 server_tcp_out_port,
5398 self.vapi.nat64_add_del_static_bib(server.ip6n,
5401 server_udp_out_port,
5406 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5407 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5408 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5410 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5411 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5412 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
5414 self.pg0.add_stream(pkts)
5415 self.pg_enable_capture(self.pg_interfaces)
5417 capture = self.pg0.get_capture(len(pkts))
5418 for packet in capture:
5420 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5421 self.assertEqual(packet[IPv6].dst, server.ip6)
5422 self.assert_packet_checksums_valid(packet)
5423 if packet.haslayer(TCP):
5424 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
5425 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
5426 client_tcp_out_port = packet[TCP].sport
5428 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
5429 self.assertEqual(packet[UDP].dport, server_udp_in_port)
5430 client_udp_out_port = packet[UDP].sport
5432 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5437 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5438 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5439 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
5441 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5442 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5443 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
5445 self.pg0.add_stream(pkts)
5446 self.pg_enable_capture(self.pg_interfaces)
5448 capture = self.pg0.get_capture(len(pkts))
5449 for packet in capture:
5451 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5452 self.assertEqual(packet[IPv6].dst, client.ip6)
5453 self.assert_packet_checksums_valid(packet)
5454 if packet.haslayer(TCP):
5455 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
5456 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
5458 self.assertEqual(packet[UDP].sport, server_udp_out_port)
5459 self.assertEqual(packet[UDP].dport, client_udp_in_port)
5461 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5466 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5467 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5468 ICMPv6DestUnreach(code=1) /
5469 packet[IPv6] for packet in capture]
5470 self.pg0.add_stream(pkts)
5471 self.pg_enable_capture(self.pg_interfaces)
5473 capture = self.pg0.get_capture(len(pkts))
5474 for packet in capture:
5476 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5477 self.assertEqual(packet[IPv6].dst, server.ip6)
5478 icmp = packet[ICMPv6DestUnreach]
5479 self.assertEqual(icmp.code, 1)
5480 inner = icmp[IPerror6]
5481 self.assertEqual(inner.src, server.ip6)
5482 self.assertEqual(inner.dst, nat_addr_ip6)
5483 self.assert_packet_checksums_valid(packet)
5484 if inner.haslayer(TCPerror):
5485 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
5486 self.assertEqual(inner[TCPerror].dport,
5487 client_tcp_out_port)
5489 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
5490 self.assertEqual(inner[UDPerror].dport,
5491 client_udp_out_port)
5493 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5496 def test_prefix(self):
5497 """ NAT64 Network-Specific Prefix """
5499 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5501 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5502 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5503 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5504 self.vrf1_nat_addr_n,
5505 vrf_id=self.vrf1_id)
5506 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5509 global_pref64 = "2001:db8::"
5510 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
5511 global_pref64_len = 32
5512 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
5514 prefix = self.vapi.nat64_prefix_dump()
5515 self.assertEqual(len(prefix), 1)
5516 self.assertEqual(prefix[0].prefix, global_pref64_n)
5517 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
5518 self.assertEqual(prefix[0].vrf_id, 0)
5520 # Add tenant specific prefix
5521 vrf1_pref64 = "2001:db8:122:300::"
5522 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
5523 vrf1_pref64_len = 56
5524 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
5526 vrf_id=self.vrf1_id)
5527 prefix = self.vapi.nat64_prefix_dump()
5528 self.assertEqual(len(prefix), 2)
5531 pkts = self.create_stream_in_ip6(self.pg0,
5534 plen=global_pref64_len)
5535 self.pg0.add_stream(pkts)
5536 self.pg_enable_capture(self.pg_interfaces)
5538 capture = self.pg1.get_capture(len(pkts))
5539 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5540 dst_ip=self.pg1.remote_ip4)
5542 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5543 self.pg1.add_stream(pkts)
5544 self.pg_enable_capture(self.pg_interfaces)
5546 capture = self.pg0.get_capture(len(pkts))
5547 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5550 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
5552 # Tenant specific prefix
5553 pkts = self.create_stream_in_ip6(self.pg2,
5556 plen=vrf1_pref64_len)
5557 self.pg2.add_stream(pkts)
5558 self.pg_enable_capture(self.pg_interfaces)
5560 capture = self.pg1.get_capture(len(pkts))
5561 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5562 dst_ip=self.pg1.remote_ip4)
5564 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5565 self.pg1.add_stream(pkts)
5566 self.pg_enable_capture(self.pg_interfaces)
5568 capture = self.pg2.get_capture(len(pkts))
5569 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5572 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
5574 def test_unknown_proto(self):
5575 """ NAT64 translate packet with unknown protocol """
5577 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5579 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5580 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5581 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5584 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5585 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
5586 TCP(sport=self.tcp_port_in, dport=20))
5587 self.pg0.add_stream(p)
5588 self.pg_enable_capture(self.pg_interfaces)
5590 p = self.pg1.get_capture(1)
5592 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5593 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
5595 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5596 TCP(sport=1234, dport=1234))
5597 self.pg0.add_stream(p)
5598 self.pg_enable_capture(self.pg_interfaces)
5600 p = self.pg1.get_capture(1)
5603 self.assertEqual(packet[IP].src, self.nat_addr)
5604 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5605 self.assertTrue(packet.haslayer(GRE))
5606 self.assert_packet_checksums_valid(packet)
5608 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5612 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5613 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5615 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5616 TCP(sport=1234, dport=1234))
5617 self.pg1.add_stream(p)
5618 self.pg_enable_capture(self.pg_interfaces)
5620 p = self.pg0.get_capture(1)
5623 self.assertEqual(packet[IPv6].src, remote_ip6)
5624 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5625 self.assertEqual(packet[IPv6].nh, 47)
5627 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5630 def test_hairpinning_unknown_proto(self):
5631 """ NAT64 translate packet with unknown protocol - hairpinning """
5633 client = self.pg0.remote_hosts[0]
5634 server = self.pg0.remote_hosts[1]
5635 server_tcp_in_port = 22
5636 server_tcp_out_port = 4022
5637 client_tcp_in_port = 1234
5638 client_tcp_out_port = 1235
5639 server_nat_ip = "10.0.0.100"
5640 client_nat_ip = "10.0.0.110"
5641 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
5642 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
5643 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
5644 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
5646 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
5648 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5649 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5651 self.vapi.nat64_add_del_static_bib(server.ip6n,
5654 server_tcp_out_port,
5657 self.vapi.nat64_add_del_static_bib(server.ip6n,
5663 self.vapi.nat64_add_del_static_bib(client.ip6n,
5666 client_tcp_out_port,
5670 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5671 IPv6(src=client.ip6, dst=server_nat_ip6) /
5672 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5673 self.pg0.add_stream(p)
5674 self.pg_enable_capture(self.pg_interfaces)
5676 p = self.pg0.get_capture(1)
5678 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5679 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
5681 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5682 TCP(sport=1234, dport=1234))
5683 self.pg0.add_stream(p)
5684 self.pg_enable_capture(self.pg_interfaces)
5686 p = self.pg0.get_capture(1)
5689 self.assertEqual(packet[IPv6].src, client_nat_ip6)
5690 self.assertEqual(packet[IPv6].dst, server.ip6)
5691 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5693 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5697 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5698 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
5700 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5701 TCP(sport=1234, dport=1234))
5702 self.pg0.add_stream(p)
5703 self.pg_enable_capture(self.pg_interfaces)
5705 p = self.pg0.get_capture(1)
5708 self.assertEqual(packet[IPv6].src, server_nat_ip6)
5709 self.assertEqual(packet[IPv6].dst, client.ip6)
5710 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5712 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5715 def test_one_armed_nat64(self):
5716 """ One armed NAT64 """
5718 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
5722 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5724 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
5725 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
5728 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5729 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
5730 TCP(sport=12345, dport=80))
5731 self.pg3.add_stream(p)
5732 self.pg_enable_capture(self.pg_interfaces)
5734 capture = self.pg3.get_capture(1)
5739 self.assertEqual(ip.src, self.nat_addr)
5740 self.assertEqual(ip.dst, self.pg3.remote_ip4)
5741 self.assertNotEqual(tcp.sport, 12345)
5742 external_port = tcp.sport
5743 self.assertEqual(tcp.dport, 80)
5744 self.assert_packet_checksums_valid(p)
5746 self.logger.error(ppp("Unexpected or invalid packet:", p))
5750 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5751 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5752 TCP(sport=80, dport=external_port))
5753 self.pg3.add_stream(p)
5754 self.pg_enable_capture(self.pg_interfaces)
5756 capture = self.pg3.get_capture(1)
5761 self.assertEqual(ip.src, remote_host_ip6)
5762 self.assertEqual(ip.dst, self.pg3.remote_ip6)
5763 self.assertEqual(tcp.sport, 80)
5764 self.assertEqual(tcp.dport, 12345)
5765 self.assert_packet_checksums_valid(p)
5767 self.logger.error(ppp("Unexpected or invalid packet:", p))
5770 def test_frag_in_order(self):
5771 """ NAT64 translate fragments arriving in order """
5772 self.tcp_port_in = random.randint(1025, 65535)
5774 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5776 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5777 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5779 reass = self.vapi.nat_reass_dump()
5780 reass_n_start = len(reass)
5784 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5785 self.tcp_port_in, 20, data)
5786 self.pg0.add_stream(pkts)
5787 self.pg_enable_capture(self.pg_interfaces)
5789 frags = self.pg1.get_capture(len(pkts))
5790 p = self.reass_frags_and_verify(frags,
5792 self.pg1.remote_ip4)
5793 self.assertEqual(p[TCP].dport, 20)
5794 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5795 self.tcp_port_out = p[TCP].sport
5796 self.assertEqual(data, p[Raw].load)
5799 data = "A" * 4 + "b" * 16 + "C" * 3
5800 pkts = self.create_stream_frag(self.pg1,
5805 self.pg1.add_stream(pkts)
5806 self.pg_enable_capture(self.pg_interfaces)
5808 frags = self.pg0.get_capture(len(pkts))
5809 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5810 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5811 self.assertEqual(p[TCP].sport, 20)
5812 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5813 self.assertEqual(data, p[Raw].load)
5815 reass = self.vapi.nat_reass_dump()
5816 reass_n_end = len(reass)
5818 self.assertEqual(reass_n_end - reass_n_start, 2)
5820 def test_reass_hairpinning(self):
5821 """ NAT64 fragments hairpinning """
5823 server = self.pg0.remote_hosts[1]
5824 server_in_port = random.randint(1025, 65535)
5825 server_out_port = random.randint(1025, 65535)
5826 client_in_port = random.randint(1025, 65535)
5827 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5828 nat_addr_ip6 = ip.src
5830 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5832 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5833 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5835 # add static BIB entry for server
5836 self.vapi.nat64_add_del_static_bib(server.ip6n,
5842 # send packet from host to server
5843 pkts = self.create_stream_frag_ip6(self.pg0,
5848 self.pg0.add_stream(pkts)
5849 self.pg_enable_capture(self.pg_interfaces)
5851 frags = self.pg0.get_capture(len(pkts))
5852 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
5853 self.assertNotEqual(p[TCP].sport, client_in_port)
5854 self.assertEqual(p[TCP].dport, server_in_port)
5855 self.assertEqual(data, p[Raw].load)
5857 def test_frag_out_of_order(self):
5858 """ NAT64 translate fragments arriving out of order """
5859 self.tcp_port_in = random.randint(1025, 65535)
5861 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5863 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5864 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5868 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5869 self.tcp_port_in, 20, data)
5871 self.pg0.add_stream(pkts)
5872 self.pg_enable_capture(self.pg_interfaces)
5874 frags = self.pg1.get_capture(len(pkts))
5875 p = self.reass_frags_and_verify(frags,
5877 self.pg1.remote_ip4)
5878 self.assertEqual(p[TCP].dport, 20)
5879 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5880 self.tcp_port_out = p[TCP].sport
5881 self.assertEqual(data, p[Raw].load)
5884 data = "A" * 4 + "B" * 16 + "C" * 3
5885 pkts = self.create_stream_frag(self.pg1,
5891 self.pg1.add_stream(pkts)
5892 self.pg_enable_capture(self.pg_interfaces)
5894 frags = self.pg0.get_capture(len(pkts))
5895 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5896 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5897 self.assertEqual(p[TCP].sport, 20)
5898 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5899 self.assertEqual(data, p[Raw].load)
5901 def test_interface_addr(self):
5902 """ Acquire NAT64 pool addresses from interface """
5903 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
5905 # no address in NAT64 pool
5906 adresses = self.vapi.nat44_address_dump()
5907 self.assertEqual(0, len(adresses))
5909 # configure interface address and check NAT64 address pool
5910 self.pg4.config_ip4()
5911 addresses = self.vapi.nat64_pool_addr_dump()
5912 self.assertEqual(len(addresses), 1)
5913 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
5915 # remove interface address and check NAT64 address pool
5916 self.pg4.unconfig_ip4()
5917 addresses = self.vapi.nat64_pool_addr_dump()
5918 self.assertEqual(0, len(adresses))
5920 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5921 def test_ipfix_max_bibs_sessions(self):
5922 """ IPFIX logging maximum session and BIB entries exceeded """
5925 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5929 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5931 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5932 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5936 for i in range(0, max_bibs):
5937 src = "fd01:aa::%x" % (i)
5938 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5939 IPv6(src=src, dst=remote_host_ip6) /
5940 TCP(sport=12345, dport=80))
5942 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5943 IPv6(src=src, dst=remote_host_ip6) /
5944 TCP(sport=12345, dport=22))
5946 self.pg0.add_stream(pkts)
5947 self.pg_enable_capture(self.pg_interfaces)
5949 self.pg1.get_capture(max_sessions)
5951 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5952 src_address=self.pg3.local_ip4n,
5954 template_interval=10)
5955 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5956 src_port=self.ipfix_src_port)
5958 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5959 IPv6(src=src, dst=remote_host_ip6) /
5960 TCP(sport=12345, dport=25))
5961 self.pg0.add_stream(p)
5962 self.pg_enable_capture(self.pg_interfaces)
5964 self.pg1.assert_nothing_captured()
5966 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5967 capture = self.pg3.get_capture(9)
5968 ipfix = IPFIXDecoder()
5969 # first load template
5971 self.assertTrue(p.haslayer(IPFIX))
5972 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5973 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5974 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5975 self.assertEqual(p[UDP].dport, 4739)
5976 self.assertEqual(p[IPFIX].observationDomainID,
5977 self.ipfix_domain_id)
5978 if p.haslayer(Template):
5979 ipfix.add_template(p.getlayer(Template))
5980 # verify events in data set
5982 if p.haslayer(Data):
5983 data = ipfix.decode_data_set(p.getlayer(Set))
5984 self.verify_ipfix_max_sessions(data, max_sessions)
5986 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5987 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5988 TCP(sport=12345, dport=80))
5989 self.pg0.add_stream(p)
5990 self.pg_enable_capture(self.pg_interfaces)
5992 self.pg1.assert_nothing_captured()
5994 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5995 capture = self.pg3.get_capture(1)
5996 # verify events in data set
5998 self.assertTrue(p.haslayer(IPFIX))
5999 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6000 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6001 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6002 self.assertEqual(p[UDP].dport, 4739)
6003 self.assertEqual(p[IPFIX].observationDomainID,
6004 self.ipfix_domain_id)
6005 if p.haslayer(Data):
6006 data = ipfix.decode_data_set(p.getlayer(Set))
6007 self.verify_ipfix_max_bibs(data, max_bibs)
6009 def test_ipfix_max_frags(self):
6010 """ IPFIX logging maximum fragments pending reassembly exceeded """
6011 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6013 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6014 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6015 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
6016 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6017 src_address=self.pg3.local_ip4n,
6019 template_interval=10)
6020 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6021 src_port=self.ipfix_src_port)
6024 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6025 self.tcp_port_in, 20, data)
6026 self.pg0.add_stream(pkts[-1])
6027 self.pg_enable_capture(self.pg_interfaces)
6029 self.pg1.assert_nothing_captured()
6031 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6032 capture = self.pg3.get_capture(9)
6033 ipfix = IPFIXDecoder()
6034 # first load template
6036 self.assertTrue(p.haslayer(IPFIX))
6037 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6038 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6039 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6040 self.assertEqual(p[UDP].dport, 4739)
6041 self.assertEqual(p[IPFIX].observationDomainID,
6042 self.ipfix_domain_id)
6043 if p.haslayer(Template):
6044 ipfix.add_template(p.getlayer(Template))
6045 # verify events in data set
6047 if p.haslayer(Data):
6048 data = ipfix.decode_data_set(p.getlayer(Set))
6049 self.verify_ipfix_max_fragments_ip6(data, 0,
6050 self.pg0.remote_ip6n)
6052 def test_ipfix_bib_ses(self):
6053 """ IPFIX logging NAT64 BIB/session create and delete events """
6054 self.tcp_port_in = random.randint(1025, 65535)
6055 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
6059 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6061 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6062 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6063 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6064 src_address=self.pg3.local_ip4n,
6066 template_interval=10)
6067 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6068 src_port=self.ipfix_src_port)
6071 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6072 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
6073 TCP(sport=self.tcp_port_in, dport=25))
6074 self.pg0.add_stream(p)
6075 self.pg_enable_capture(self.pg_interfaces)
6077 p = self.pg1.get_capture(1)
6078 self.tcp_port_out = p[0][TCP].sport
6079 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6080 capture = self.pg3.get_capture(10)
6081 ipfix = IPFIXDecoder()
6082 # first load template
6084 self.assertTrue(p.haslayer(IPFIX))
6085 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6086 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6087 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6088 self.assertEqual(p[UDP].dport, 4739)
6089 self.assertEqual(p[IPFIX].observationDomainID,
6090 self.ipfix_domain_id)
6091 if p.haslayer(Template):
6092 ipfix.add_template(p.getlayer(Template))
6093 # verify events in data set
6095 if p.haslayer(Data):
6096 data = ipfix.decode_data_set(p.getlayer(Set))
6097 if ord(data[0][230]) == 10:
6098 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
6099 elif ord(data[0][230]) == 6:
6100 self.verify_ipfix_nat64_ses(data,
6102 self.pg0.remote_ip6n,
6103 self.pg1.remote_ip4,
6106 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6109 self.pg_enable_capture(self.pg_interfaces)
6110 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6113 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6114 capture = self.pg3.get_capture(2)
6115 # verify events in data set
6117 self.assertTrue(p.haslayer(IPFIX))
6118 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6119 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6120 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6121 self.assertEqual(p[UDP].dport, 4739)
6122 self.assertEqual(p[IPFIX].observationDomainID,
6123 self.ipfix_domain_id)
6124 if p.haslayer(Data):
6125 data = ipfix.decode_data_set(p.getlayer(Set))
6126 if ord(data[0][230]) == 11:
6127 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
6128 elif ord(data[0][230]) == 7:
6129 self.verify_ipfix_nat64_ses(data,
6131 self.pg0.remote_ip6n,
6132 self.pg1.remote_ip4,
6135 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6137 def nat64_get_ses_num(self):
6139 Return number of active NAT64 sessions.
6141 st = self.vapi.nat64_st_dump()
6144 def clear_nat64(self):
6146 Clear NAT64 configuration.
6148 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
6149 domain_id=self.ipfix_domain_id)
6150 self.ipfix_src_port = 4739
6151 self.ipfix_domain_id = 1
6153 self.vapi.nat64_set_timeouts()
6155 interfaces = self.vapi.nat64_interface_dump()
6156 for intf in interfaces:
6157 if intf.is_inside > 1:
6158 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6161 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6165 bib = self.vapi.nat64_bib_dump(255)
6168 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
6176 adresses = self.vapi.nat64_pool_addr_dump()
6177 for addr in adresses:
6178 self.vapi.nat64_add_del_pool_addr_range(addr.address,
6183 prefixes = self.vapi.nat64_prefix_dump()
6184 for prefix in prefixes:
6185 self.vapi.nat64_add_del_prefix(prefix.prefix,
6187 vrf_id=prefix.vrf_id,
6191 super(TestNAT64, self).tearDown()
6192 if not self.vpp_dead:
6193 self.logger.info(self.vapi.cli("show nat64 pool"))
6194 self.logger.info(self.vapi.cli("show nat64 interfaces"))
6195 self.logger.info(self.vapi.cli("show nat64 prefix"))
6196 self.logger.info(self.vapi.cli("show nat64 bib all"))
6197 self.logger.info(self.vapi.cli("show nat64 session table all"))
6198 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
6202 class TestDSlite(MethodHolder):
6203 """ DS-Lite Test Cases """
6206 def setUpClass(cls):
6207 super(TestDSlite, cls).setUpClass()
6210 cls.nat_addr = '10.0.0.3'
6211 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
6213 cls.create_pg_interfaces(range(2))
6215 cls.pg0.config_ip4()
6216 cls.pg0.resolve_arp()
6218 cls.pg1.config_ip6()
6219 cls.pg1.generate_remote_hosts(2)
6220 cls.pg1.configure_ipv6_neighbors()
6223 super(TestDSlite, cls).tearDownClass()
6226 def test_dslite(self):
6227 """ Test DS-Lite """
6228 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
6230 aftr_ip4 = '192.0.0.1'
6231 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6232 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6233 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6234 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6237 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6238 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
6239 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6240 UDP(sport=20000, dport=10000))
6241 self.pg1.add_stream(p)
6242 self.pg_enable_capture(self.pg_interfaces)
6244 capture = self.pg0.get_capture(1)
6245 capture = capture[0]
6246 self.assertFalse(capture.haslayer(IPv6))
6247 self.assertEqual(capture[IP].src, self.nat_addr)
6248 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6249 self.assertNotEqual(capture[UDP].sport, 20000)
6250 self.assertEqual(capture[UDP].dport, 10000)
6251 self.assert_packet_checksums_valid(capture)
6252 out_port = capture[UDP].sport
6254 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6255 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6256 UDP(sport=10000, dport=out_port))
6257 self.pg0.add_stream(p)
6258 self.pg_enable_capture(self.pg_interfaces)
6260 capture = self.pg1.get_capture(1)
6261 capture = capture[0]
6262 self.assertEqual(capture[IPv6].src, aftr_ip6)
6263 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6264 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6265 self.assertEqual(capture[IP].dst, '192.168.1.1')
6266 self.assertEqual(capture[UDP].sport, 10000)
6267 self.assertEqual(capture[UDP].dport, 20000)
6268 self.assert_packet_checksums_valid(capture)
6271 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6272 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6273 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6274 TCP(sport=20001, dport=10001))
6275 self.pg1.add_stream(p)
6276 self.pg_enable_capture(self.pg_interfaces)
6278 capture = self.pg0.get_capture(1)
6279 capture = capture[0]
6280 self.assertFalse(capture.haslayer(IPv6))
6281 self.assertEqual(capture[IP].src, self.nat_addr)
6282 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6283 self.assertNotEqual(capture[TCP].sport, 20001)
6284 self.assertEqual(capture[TCP].dport, 10001)
6285 self.assert_packet_checksums_valid(capture)
6286 out_port = capture[TCP].sport
6288 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6289 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6290 TCP(sport=10001, dport=out_port))
6291 self.pg0.add_stream(p)
6292 self.pg_enable_capture(self.pg_interfaces)
6294 capture = self.pg1.get_capture(1)
6295 capture = capture[0]
6296 self.assertEqual(capture[IPv6].src, aftr_ip6)
6297 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6298 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6299 self.assertEqual(capture[IP].dst, '192.168.1.1')
6300 self.assertEqual(capture[TCP].sport, 10001)
6301 self.assertEqual(capture[TCP].dport, 20001)
6302 self.assert_packet_checksums_valid(capture)
6305 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6306 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6307 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6308 ICMP(id=4000, type='echo-request'))
6309 self.pg1.add_stream(p)
6310 self.pg_enable_capture(self.pg_interfaces)
6312 capture = self.pg0.get_capture(1)
6313 capture = capture[0]
6314 self.assertFalse(capture.haslayer(IPv6))
6315 self.assertEqual(capture[IP].src, self.nat_addr)
6316 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6317 self.assertNotEqual(capture[ICMP].id, 4000)
6318 self.assert_packet_checksums_valid(capture)
6319 out_id = capture[ICMP].id
6321 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6322 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6323 ICMP(id=out_id, type='echo-reply'))
6324 self.pg0.add_stream(p)
6325 self.pg_enable_capture(self.pg_interfaces)
6327 capture = self.pg1.get_capture(1)
6328 capture = capture[0]
6329 self.assertEqual(capture[IPv6].src, aftr_ip6)
6330 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6331 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6332 self.assertEqual(capture[IP].dst, '192.168.1.1')
6333 self.assertEqual(capture[ICMP].id, 4000)
6334 self.assert_packet_checksums_valid(capture)
6336 # ping DS-Lite AFTR tunnel endpoint address
6337 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6338 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
6339 ICMPv6EchoRequest())
6340 self.pg1.add_stream(p)
6341 self.pg_enable_capture(self.pg_interfaces)
6343 capture = self.pg1.get_capture(1)
6344 self.assertEqual(1, len(capture))
6345 capture = capture[0]
6346 self.assertEqual(capture[IPv6].src, aftr_ip6)
6347 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6348 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6351 super(TestDSlite, self).tearDown()
6352 if not self.vpp_dead:
6353 self.logger.info(self.vapi.cli("show dslite pool"))
6355 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6356 self.logger.info(self.vapi.cli("show dslite sessions"))
6359 class TestDSliteCE(MethodHolder):
6360 """ DS-Lite CE Test Cases """
6363 def setUpConstants(cls):
6364 super(TestDSliteCE, cls).setUpConstants()
6365 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
6368 def setUpClass(cls):
6369 super(TestDSliteCE, cls).setUpClass()
6372 cls.create_pg_interfaces(range(2))
6374 cls.pg0.config_ip4()
6375 cls.pg0.resolve_arp()
6377 cls.pg1.config_ip6()
6378 cls.pg1.generate_remote_hosts(1)
6379 cls.pg1.configure_ipv6_neighbors()
6382 super(TestDSliteCE, cls).tearDownClass()
6385 def test_dslite_ce(self):
6386 """ Test DS-Lite CE """
6388 b4_ip4 = '192.0.0.2'
6389 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
6390 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
6391 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
6392 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
6394 aftr_ip4 = '192.0.0.1'
6395 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6396 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6397 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6398 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6400 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
6401 dst_address_length=128,
6402 next_hop_address=self.pg1.remote_ip6n,
6403 next_hop_sw_if_index=self.pg1.sw_if_index,
6407 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6408 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
6409 UDP(sport=10000, dport=20000))
6410 self.pg0.add_stream(p)
6411 self.pg_enable_capture(self.pg_interfaces)
6413 capture = self.pg1.get_capture(1)
6414 capture = capture[0]
6415 self.assertEqual(capture[IPv6].src, b4_ip6)
6416 self.assertEqual(capture[IPv6].dst, aftr_ip6)
6417 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6418 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
6419 self.assertEqual(capture[UDP].sport, 10000)
6420 self.assertEqual(capture[UDP].dport, 20000)
6421 self.assert_packet_checksums_valid(capture)
6424 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6425 IPv6(dst=b4_ip6, src=aftr_ip6) /
6426 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
6427 UDP(sport=20000, dport=10000))
6428 self.pg1.add_stream(p)
6429 self.pg_enable_capture(self.pg_interfaces)
6431 capture = self.pg0.get_capture(1)
6432 capture = capture[0]
6433 self.assertFalse(capture.haslayer(IPv6))
6434 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
6435 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6436 self.assertEqual(capture[UDP].sport, 20000)
6437 self.assertEqual(capture[UDP].dport, 10000)
6438 self.assert_packet_checksums_valid(capture)
6440 # ping DS-Lite B4 tunnel endpoint address
6441 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6442 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
6443 ICMPv6EchoRequest())
6444 self.pg1.add_stream(p)
6445 self.pg_enable_capture(self.pg_interfaces)
6447 capture = self.pg1.get_capture(1)
6448 self.assertEqual(1, len(capture))
6449 capture = capture[0]
6450 self.assertEqual(capture[IPv6].src, b4_ip6)
6451 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6452 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6455 super(TestDSliteCE, self).tearDown()
6456 if not self.vpp_dead:
6458 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6460 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
6463 class TestNAT66(MethodHolder):
6464 """ NAT66 Test Cases """
6467 def setUpClass(cls):
6468 super(TestNAT66, cls).setUpClass()
6471 cls.nat_addr = 'fd01:ff::2'
6472 cls.nat_addr_n = socket.inet_pton(socket.AF_INET6, cls.nat_addr)
6474 cls.create_pg_interfaces(range(2))
6475 cls.interfaces = list(cls.pg_interfaces)
6477 for i in cls.interfaces:
6480 i.configure_ipv6_neighbors()
6483 super(TestNAT66, cls).tearDownClass()
6486 def test_static(self):
6487 """ 1:1 NAT66 test """
6488 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
6489 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6490 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
6495 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6496 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6499 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6500 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6503 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6504 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6505 ICMPv6EchoRequest())
6507 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6508 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6509 GRE() / IP() / TCP())
6511 self.pg0.add_stream(pkts)
6512 self.pg_enable_capture(self.pg_interfaces)
6514 capture = self.pg1.get_capture(len(pkts))
6515 for packet in capture:
6517 self.assertEqual(packet[IPv6].src, self.nat_addr)
6518 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
6519 self.assert_packet_checksums_valid(packet)
6521 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6526 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6527 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6530 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6531 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6534 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6535 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6538 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6539 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6540 GRE() / IP() / TCP())
6542 self.pg1.add_stream(pkts)
6543 self.pg_enable_capture(self.pg_interfaces)
6545 capture = self.pg0.get_capture(len(pkts))
6546 for packet in capture:
6548 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
6549 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6550 self.assert_packet_checksums_valid(packet)
6552 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6555 sm = self.vapi.nat66_static_mapping_dump()
6556 self.assertEqual(len(sm), 1)
6557 self.assertEqual(sm[0].total_pkts, 8)
6559 def test_check_no_translate(self):
6560 """ NAT66 translate only when egress interface is outside interface """
6561 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
6562 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index)
6563 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
6567 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6568 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6570 self.pg0.add_stream([p])
6571 self.pg_enable_capture(self.pg_interfaces)
6573 capture = self.pg1.get_capture(1)
6576 self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
6577 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
6579 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6582 def clear_nat66(self):
6584 Clear NAT66 configuration.
6586 interfaces = self.vapi.nat66_interface_dump()
6587 for intf in interfaces:
6588 self.vapi.nat66_add_del_interface(intf.sw_if_index,
6592 static_mappings = self.vapi.nat66_static_mapping_dump()
6593 for sm in static_mappings:
6594 self.vapi.nat66_add_del_static_mapping(sm.local_ip_address,
6595 sm.external_ip_address,
6600 super(TestNAT66, self).tearDown()
6601 if not self.vpp_dead:
6602 self.logger.info(self.vapi.cli("show nat66 interfaces"))
6603 self.logger.info(self.vapi.cli("show nat66 static mappings"))
6607 if __name__ == '__main__':
6608 unittest.main(testRunner=VppTestRunner)