9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
12 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
13 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
14 from scapy.layers.l2 import Ether, ARP, GRE
15 from scapy.data import IP_PROTOS
16 from scapy.packet import bind_layers, Raw
17 from scapy.all import fragment6
19 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
20 from time import sleep
21 from util import ip4_range
22 from util import mactobinary
25 class MethodHolder(VppTestCase):
26 """ NAT create capture and verify method holder """
28 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
30 Create packet stream for inside network
32 :param in_if: Inside interface
33 :param out_if: Outside interface
34 :param dst_ip: Destination address
35 :param ttl: TTL of generated packets
38 dst_ip = out_if.remote_ip4
42 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
43 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
44 TCP(sport=self.tcp_port_in, dport=20))
48 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
49 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
50 UDP(sport=self.udp_port_in, dport=20))
54 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
55 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
56 ICMP(id=self.icmp_id_in, type='echo-request'))
61 def compose_ip6(self, ip4, pref, plen):
63 Compose IPv4-embedded IPv6 addresses
65 :param ip4: IPv4 address
66 :param pref: IPv6 prefix
67 :param plen: IPv6 prefix length
68 :returns: IPv4-embedded IPv6 addresses
70 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
71 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
100 pref_n[14] = ip4_n[2]
101 pref_n[15] = ip4_n[3]
102 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
104 def extract_ip4(self, ip6, plen):
106 Extract IPv4 address embedded in IPv6 addresses
108 :param ip6: IPv6 address
109 :param plen: IPv6 prefix length
110 :returns: extracted IPv4 address
112 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
144 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
146 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
148 Create IPv6 packet stream for inside network
150 :param in_if: Inside interface
151 :param out_if: Outside interface
152 :param ttl: Hop Limit of generated packets
153 :param pref: NAT64 prefix
154 :param plen: NAT64 prefix length
158 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
160 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
163 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
165 TCP(sport=self.tcp_port_in, dport=20))
169 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
170 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
171 UDP(sport=self.udp_port_in, dport=20))
175 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
176 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
177 ICMPv6EchoRequest(id=self.icmp_id_in))
182 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
183 use_inside_ports=False):
185 Create packet stream for outside network
187 :param out_if: Outside interface
188 :param dst_ip: Destination IP address (Default use global NAT address)
189 :param ttl: TTL of generated packets
190 :param use_inside_ports: Use inside NAT ports as destination ports
191 instead of outside ports
194 dst_ip = self.nat_addr
195 if not use_inside_ports:
196 tcp_port = self.tcp_port_out
197 udp_port = self.udp_port_out
198 icmp_id = self.icmp_id_out
200 tcp_port = self.tcp_port_in
201 udp_port = self.udp_port_in
202 icmp_id = self.icmp_id_in
205 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
206 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
207 TCP(dport=tcp_port, sport=20))
211 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
212 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
213 UDP(dport=udp_port, sport=20))
217 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
218 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
219 ICMP(id=icmp_id, type='echo-reply'))
224 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
226 Create packet stream for outside network
228 :param out_if: Outside interface
229 :param dst_ip: Destination IP address (Default use global NAT address)
230 :param hl: HL of generated packets
234 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
235 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
236 TCP(dport=self.tcp_port_out, sport=20))
240 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
241 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
242 UDP(dport=self.udp_port_out, sport=20))
246 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
247 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
248 ICMPv6EchoReply(id=self.icmp_id_out))
253 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
254 packet_num=3, dst_ip=None, is_ip6=False):
256 Verify captured packets on outside network
258 :param capture: Captured packets
259 :param nat_ip: Translated IP address (Default use global NAT address)
260 :param same_port: Sorce port number is not translated (Default False)
261 :param packet_num: Expected number of packets (Default 3)
262 :param dst_ip: Destination IP address (Default do not verify)
263 :param is_ip6: If L3 protocol is IPv6 (Default False)
267 ICMP46 = ICMPv6EchoRequest
272 nat_ip = self.nat_addr
273 self.assertEqual(packet_num, len(capture))
274 for packet in capture:
277 self.assert_packet_checksums_valid(packet)
278 self.assertEqual(packet[IP46].src, nat_ip)
279 if dst_ip is not None:
280 self.assertEqual(packet[IP46].dst, dst_ip)
281 if packet.haslayer(TCP):
283 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
286 packet[TCP].sport, self.tcp_port_in)
287 self.tcp_port_out = packet[TCP].sport
288 self.assert_packet_checksums_valid(packet)
289 elif packet.haslayer(UDP):
291 self.assertEqual(packet[UDP].sport, self.udp_port_in)
294 packet[UDP].sport, self.udp_port_in)
295 self.udp_port_out = packet[UDP].sport
298 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
300 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
301 self.icmp_id_out = packet[ICMP46].id
302 self.assert_packet_checksums_valid(packet)
304 self.logger.error(ppp("Unexpected or invalid packet "
305 "(outside network):", packet))
308 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
309 packet_num=3, dst_ip=None):
311 Verify captured packets on outside network
313 :param capture: Captured packets
314 :param nat_ip: Translated IP address
315 :param same_port: Sorce port number is not translated (Default False)
316 :param packet_num: Expected number of packets (Default 3)
317 :param dst_ip: Destination IP address (Default do not verify)
319 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
322 def verify_capture_in(self, capture, in_if, packet_num=3):
324 Verify captured packets on inside network
326 :param capture: Captured packets
327 :param in_if: Inside interface
328 :param packet_num: Expected number of packets (Default 3)
330 self.assertEqual(packet_num, len(capture))
331 for packet in capture:
333 self.assert_packet_checksums_valid(packet)
334 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
335 if packet.haslayer(TCP):
336 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
337 elif packet.haslayer(UDP):
338 self.assertEqual(packet[UDP].dport, self.udp_port_in)
340 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
342 self.logger.error(ppp("Unexpected or invalid packet "
343 "(inside network):", packet))
346 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
348 Verify captured IPv6 packets on inside network
350 :param capture: Captured packets
351 :param src_ip: Source IP
352 :param dst_ip: Destination IP address
353 :param packet_num: Expected number of packets (Default 3)
355 self.assertEqual(packet_num, len(capture))
356 for packet in capture:
358 self.assertEqual(packet[IPv6].src, src_ip)
359 self.assertEqual(packet[IPv6].dst, dst_ip)
360 self.assert_packet_checksums_valid(packet)
361 if packet.haslayer(TCP):
362 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
363 elif packet.haslayer(UDP):
364 self.assertEqual(packet[UDP].dport, self.udp_port_in)
366 self.assertEqual(packet[ICMPv6EchoReply].id,
369 self.logger.error(ppp("Unexpected or invalid packet "
370 "(inside network):", packet))
373 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
375 Verify captured packet that don't have to be translated
377 :param capture: Captured packets
378 :param ingress_if: Ingress interface
379 :param egress_if: Egress interface
381 for packet in capture:
383 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
384 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
385 if packet.haslayer(TCP):
386 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
387 elif packet.haslayer(UDP):
388 self.assertEqual(packet[UDP].sport, self.udp_port_in)
390 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
392 self.logger.error(ppp("Unexpected or invalid packet "
393 "(inside network):", packet))
396 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
397 packet_num=3, icmp_type=11):
399 Verify captured packets with ICMP errors on outside network
401 :param capture: Captured packets
402 :param src_ip: Translated IP address or IP address of VPP
403 (Default use global NAT address)
404 :param packet_num: Expected number of packets (Default 3)
405 :param icmp_type: Type of error ICMP packet
406 we are expecting (Default 11)
409 src_ip = self.nat_addr
410 self.assertEqual(packet_num, len(capture))
411 for packet in capture:
413 self.assertEqual(packet[IP].src, src_ip)
414 self.assertTrue(packet.haslayer(ICMP))
416 self.assertEqual(icmp.type, icmp_type)
417 self.assertTrue(icmp.haslayer(IPerror))
418 inner_ip = icmp[IPerror]
419 if inner_ip.haslayer(TCPerror):
420 self.assertEqual(inner_ip[TCPerror].dport,
422 elif inner_ip.haslayer(UDPerror):
423 self.assertEqual(inner_ip[UDPerror].dport,
426 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
428 self.logger.error(ppp("Unexpected or invalid packet "
429 "(outside network):", packet))
432 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
435 Verify captured packets with ICMP errors on inside network
437 :param capture: Captured packets
438 :param in_if: Inside interface
439 :param packet_num: Expected number of packets (Default 3)
440 :param icmp_type: Type of error ICMP packet
441 we are expecting (Default 11)
443 self.assertEqual(packet_num, len(capture))
444 for packet in capture:
446 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447 self.assertTrue(packet.haslayer(ICMP))
449 self.assertEqual(icmp.type, icmp_type)
450 self.assertTrue(icmp.haslayer(IPerror))
451 inner_ip = icmp[IPerror]
452 if inner_ip.haslayer(TCPerror):
453 self.assertEqual(inner_ip[TCPerror].sport,
455 elif inner_ip.haslayer(UDPerror):
456 self.assertEqual(inner_ip[UDPerror].sport,
459 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
461 self.logger.error(ppp("Unexpected or invalid packet "
462 "(inside network):", packet))
465 def create_stream_frag(self, src_if, dst, sport, dport, data):
467 Create fragmented packet stream
469 :param src_if: Source interface
470 :param dst: Destination IPv4 address
471 :param sport: Source TCP port
472 :param dport: Destination TCP port
473 :param data: Payload data
476 id = random.randint(0, 65535)
477 p = (IP(src=src_if.remote_ip4, dst=dst) /
478 TCP(sport=sport, dport=dport) /
480 p = p.__class__(str(p))
481 chksum = p['TCP'].chksum
483 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
484 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
485 TCP(sport=sport, dport=dport, chksum=chksum) /
488 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
489 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
490 proto=IP_PROTOS.tcp) /
493 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
494 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
500 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
501 pref=None, plen=0, frag_size=128):
503 Create fragmented packet stream
505 :param src_if: Source interface
506 :param dst: Destination IPv4 address
507 :param sport: Source TCP port
508 :param dport: Destination TCP port
509 :param data: Payload data
510 :param pref: NAT64 prefix
511 :param plen: NAT64 prefix length
512 :param fragsize: size of fragments
516 dst_ip6 = ''.join(['64:ff9b::', dst])
518 dst_ip6 = self.compose_ip6(dst, pref, plen)
520 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
521 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
522 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
523 TCP(sport=sport, dport=dport) /
526 return fragment6(p, frag_size)
528 def reass_frags_and_verify(self, frags, src, dst):
530 Reassemble and verify fragmented packet
532 :param frags: Captured fragments
533 :param src: Source IPv4 address to verify
534 :param dst: Destination IPv4 address to verify
536 :returns: Reassembled IPv4 packet
538 buffer = StringIO.StringIO()
540 self.assertEqual(p[IP].src, src)
541 self.assertEqual(p[IP].dst, dst)
542 self.assert_ip_checksum_valid(p)
543 buffer.seek(p[IP].frag * 8)
544 buffer.write(p[IP].payload)
545 ip = frags[0].getlayer(IP)
546 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
547 proto=frags[0][IP].proto)
548 if ip.proto == IP_PROTOS.tcp:
549 p = (ip / TCP(buffer.getvalue()))
550 self.assert_tcp_checksum_valid(p)
551 elif ip.proto == IP_PROTOS.udp:
552 p = (ip / UDP(buffer.getvalue()))
555 def reass_frags_and_verify_ip6(self, frags, src, dst):
557 Reassemble and verify fragmented packet
559 :param frags: Captured fragments
560 :param src: Source IPv6 address to verify
561 :param dst: Destination IPv6 address to verify
563 :returns: Reassembled IPv6 packet
565 buffer = StringIO.StringIO()
567 self.assertEqual(p[IPv6].src, src)
568 self.assertEqual(p[IPv6].dst, dst)
569 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
570 buffer.write(p[IPv6ExtHdrFragment].payload)
571 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
572 nh=frags[0][IPv6ExtHdrFragment].nh)
573 if ip.nh == IP_PROTOS.tcp:
574 p = (ip / TCP(buffer.getvalue()))
575 elif ip.nh == IP_PROTOS.udp:
576 p = (ip / UDP(buffer.getvalue()))
577 self.assert_packet_checksums_valid(p)
580 def initiate_tcp_session(self, in_if, out_if):
582 Initiates TCP session
584 :param in_if: Inside interface
585 :param out_if: Outside interface
589 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
590 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
591 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
594 self.pg_enable_capture(self.pg_interfaces)
596 capture = out_if.get_capture(1)
598 self.tcp_port_out = p[TCP].sport
600 # SYN + ACK packet out->in
601 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
602 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
603 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
606 self.pg_enable_capture(self.pg_interfaces)
611 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
612 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
613 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
616 self.pg_enable_capture(self.pg_interfaces)
618 out_if.get_capture(1)
621 self.logger.error("TCP 3 way handshake failed")
624 def verify_ipfix_nat44_ses(self, data):
626 Verify IPFIX NAT44 session create/delete event
628 :param data: Decoded IPFIX data records
630 nat44_ses_create_num = 0
631 nat44_ses_delete_num = 0
632 self.assertEqual(6, len(data))
635 self.assertIn(ord(record[230]), [4, 5])
636 if ord(record[230]) == 4:
637 nat44_ses_create_num += 1
639 nat44_ses_delete_num += 1
641 self.assertEqual(self.pg0.remote_ip4n, record[8])
642 # postNATSourceIPv4Address
643 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
646 self.assertEqual(struct.pack("!I", 0), record[234])
647 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
648 if IP_PROTOS.icmp == ord(record[4]):
649 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
650 self.assertEqual(struct.pack("!H", self.icmp_id_out),
652 elif IP_PROTOS.tcp == ord(record[4]):
653 self.assertEqual(struct.pack("!H", self.tcp_port_in),
655 self.assertEqual(struct.pack("!H", self.tcp_port_out),
657 elif IP_PROTOS.udp == ord(record[4]):
658 self.assertEqual(struct.pack("!H", self.udp_port_in),
660 self.assertEqual(struct.pack("!H", self.udp_port_out),
663 self.fail("Invalid protocol")
664 self.assertEqual(3, nat44_ses_create_num)
665 self.assertEqual(3, nat44_ses_delete_num)
667 def verify_ipfix_addr_exhausted(self, data):
669 Verify IPFIX NAT addresses event
671 :param data: Decoded IPFIX data records
673 self.assertEqual(1, len(data))
676 self.assertEqual(ord(record[230]), 3)
678 self.assertEqual(struct.pack("!I", 0), record[283])
680 def verify_ipfix_max_sessions(self, data, limit):
682 Verify IPFIX maximum session entries exceeded event
684 :param data: Decoded IPFIX data records
685 :param limit: Number of maximum session entries that can be created.
687 self.assertEqual(1, len(data))
690 self.assertEqual(ord(record[230]), 13)
691 # natQuotaExceededEvent
692 self.assertEqual(struct.pack("I", 1), record[466])
694 self.assertEqual(struct.pack("I", limit), record[471])
696 def verify_ipfix_max_bibs(self, data, limit):
698 Verify IPFIX maximum BIB entries exceeded event
700 :param data: Decoded IPFIX data records
701 :param limit: Number of maximum BIB entries that can be created.
703 self.assertEqual(1, len(data))
706 self.assertEqual(ord(record[230]), 13)
707 # natQuotaExceededEvent
708 self.assertEqual(struct.pack("I", 2), record[466])
710 self.assertEqual(struct.pack("I", limit), record[472])
712 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
714 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
716 :param data: Decoded IPFIX data records
717 :param limit: Number of maximum fragments pending reassembly
718 :param src_addr: IPv6 source address
720 self.assertEqual(1, len(data))
723 self.assertEqual(ord(record[230]), 13)
724 # natQuotaExceededEvent
725 self.assertEqual(struct.pack("I", 5), record[466])
726 # maxFragmentsPendingReassembly
727 self.assertEqual(struct.pack("I", limit), record[475])
729 self.assertEqual(src_addr, record[27])
731 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
733 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
735 :param data: Decoded IPFIX data records
736 :param limit: Number of maximum fragments pending reassembly
737 :param src_addr: IPv4 source address
739 self.assertEqual(1, len(data))
742 self.assertEqual(ord(record[230]), 13)
743 # natQuotaExceededEvent
744 self.assertEqual(struct.pack("I", 5), record[466])
745 # maxFragmentsPendingReassembly
746 self.assertEqual(struct.pack("I", limit), record[475])
748 self.assertEqual(src_addr, record[8])
750 def verify_ipfix_bib(self, data, is_create, src_addr):
752 Verify IPFIX NAT64 BIB create and delete events
754 :param data: Decoded IPFIX data records
755 :param is_create: Create event if nonzero value otherwise delete event
756 :param src_addr: IPv6 source address
758 self.assertEqual(1, len(data))
762 self.assertEqual(ord(record[230]), 10)
764 self.assertEqual(ord(record[230]), 11)
766 self.assertEqual(src_addr, record[27])
767 # postNATSourceIPv4Address
768 self.assertEqual(self.nat_addr_n, record[225])
770 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
772 self.assertEqual(struct.pack("!I", 0), record[234])
773 # sourceTransportPort
774 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
775 # postNAPTSourceTransportPort
776 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
778 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
781 Verify IPFIX NAT64 session create and delete events
783 :param data: Decoded IPFIX data records
784 :param is_create: Create event if nonzero value otherwise delete event
785 :param src_addr: IPv6 source address
786 :param dst_addr: IPv4 destination address
787 :param dst_port: destination TCP port
789 self.assertEqual(1, len(data))
793 self.assertEqual(ord(record[230]), 6)
795 self.assertEqual(ord(record[230]), 7)
797 self.assertEqual(src_addr, record[27])
798 # destinationIPv6Address
799 self.assertEqual(socket.inet_pton(socket.AF_INET6,
800 self.compose_ip6(dst_addr,
804 # postNATSourceIPv4Address
805 self.assertEqual(self.nat_addr_n, record[225])
806 # postNATDestinationIPv4Address
807 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
810 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
812 self.assertEqual(struct.pack("!I", 0), record[234])
813 # sourceTransportPort
814 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
815 # postNAPTSourceTransportPort
816 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
817 # destinationTransportPort
818 self.assertEqual(struct.pack("!H", dst_port), record[11])
819 # postNAPTDestinationTransportPort
820 self.assertEqual(struct.pack("!H", dst_port), record[228])
823 class TestNAT44(MethodHolder):
824 """ NAT44 Test Cases """
828 super(TestNAT44, cls).setUpClass()
831 cls.tcp_port_in = 6303
832 cls.tcp_port_out = 6303
833 cls.udp_port_in = 6304
834 cls.udp_port_out = 6304
835 cls.icmp_id_in = 6305
836 cls.icmp_id_out = 6305
837 cls.nat_addr = '10.0.0.3'
838 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
839 cls.ipfix_src_port = 4739
840 cls.ipfix_domain_id = 1
841 cls.tcp_external_port = 80
843 cls.create_pg_interfaces(range(10))
844 cls.interfaces = list(cls.pg_interfaces[0:4])
846 for i in cls.interfaces:
851 cls.pg0.generate_remote_hosts(3)
852 cls.pg0.configure_ipv4_neighbors()
854 cls.pg1.generate_remote_hosts(1)
855 cls.pg1.configure_ipv4_neighbors()
857 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
858 cls.vapi.ip_table_add_del(10, is_add=1)
859 cls.vapi.ip_table_add_del(20, is_add=1)
861 cls.pg4._local_ip4 = "172.16.255.1"
862 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
863 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
864 cls.pg4.set_table_ip4(10)
865 cls.pg5._local_ip4 = "172.17.255.3"
866 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
867 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
868 cls.pg5.set_table_ip4(10)
869 cls.pg6._local_ip4 = "172.16.255.1"
870 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
871 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
872 cls.pg6.set_table_ip4(20)
873 for i in cls.overlapping_interfaces:
881 cls.pg9.generate_remote_hosts(2)
883 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
884 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
888 cls.pg9.resolve_arp()
889 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
890 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
891 cls.pg9.resolve_arp()
894 super(TestNAT44, cls).tearDownClass()
897 def clear_nat44(self):
899 Clear NAT44 configuration.
901 # I found no elegant way to do this
902 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
903 dst_address_length=32,
904 next_hop_address=self.pg7.remote_ip4n,
905 next_hop_sw_if_index=self.pg7.sw_if_index,
907 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
908 dst_address_length=32,
909 next_hop_address=self.pg8.remote_ip4n,
910 next_hop_sw_if_index=self.pg8.sw_if_index,
913 for intf in [self.pg7, self.pg8]:
914 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
916 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
921 if self.pg7.has_ip4_config:
922 self.pg7.unconfig_ip4()
924 self.vapi.nat44_forwarding_enable_disable(0)
926 interfaces = self.vapi.nat44_interface_addr_dump()
927 for intf in interfaces:
928 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
929 twice_nat=intf.twice_nat,
932 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
933 domain_id=self.ipfix_domain_id)
934 self.ipfix_src_port = 4739
935 self.ipfix_domain_id = 1
937 interfaces = self.vapi.nat44_interface_dump()
938 for intf in interfaces:
939 if intf.is_inside > 1:
940 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
943 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
947 interfaces = self.vapi.nat44_interface_output_feature_dump()
948 for intf in interfaces:
949 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
953 static_mappings = self.vapi.nat44_static_mapping_dump()
954 for sm in static_mappings:
955 self.vapi.nat44_add_del_static_mapping(
957 sm.external_ip_address,
958 local_port=sm.local_port,
959 external_port=sm.external_port,
960 addr_only=sm.addr_only,
962 protocol=sm.protocol,
963 twice_nat=sm.twice_nat,
964 self_twice_nat=sm.self_twice_nat,
965 out2in_only=sm.out2in_only,
967 external_sw_if_index=sm.external_sw_if_index,
970 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
971 for lb_sm in lb_static_mappings:
972 self.vapi.nat44_add_del_lb_static_mapping(
977 twice_nat=lb_sm.twice_nat,
978 self_twice_nat=lb_sm.self_twice_nat,
979 out2in_only=lb_sm.out2in_only,
985 identity_mappings = self.vapi.nat44_identity_mapping_dump()
986 for id_m in identity_mappings:
987 self.vapi.nat44_add_del_identity_mapping(
988 addr_only=id_m.addr_only,
991 sw_if_index=id_m.sw_if_index,
993 protocol=id_m.protocol,
996 adresses = self.vapi.nat44_address_dump()
997 for addr in adresses:
998 self.vapi.nat44_add_del_address_range(addr.ip_address,
1000 twice_nat=addr.twice_nat,
1003 self.vapi.nat_set_reass()
1004 self.vapi.nat_set_reass(is_ip6=1)
1006 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
1007 local_port=0, external_port=0, vrf_id=0,
1008 is_add=1, external_sw_if_index=0xFFFFFFFF,
1009 proto=0, twice_nat=0, self_twice_nat=0,
1010 out2in_only=0, tag=""):
1012 Add/delete NAT44 static mapping
1014 :param local_ip: Local IP address
1015 :param external_ip: External IP address
1016 :param local_port: Local port number (Optional)
1017 :param external_port: External port number (Optional)
1018 :param vrf_id: VRF ID (Default 0)
1019 :param is_add: 1 if add, 0 if delete (Default add)
1020 :param external_sw_if_index: External interface instead of IP address
1021 :param proto: IP protocol (Mandatory if port specified)
1022 :param twice_nat: 1 if translate external host address and port
1023 :param self_twice_nat: 1 if translate external host address and port
1024 whenever external host address equals
1025 local address of internal host
1026 :param out2in_only: if 1 rule is matching only out2in direction
1027 :param tag: Opaque string tag
1030 if local_port and external_port:
1032 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
1033 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
1034 self.vapi.nat44_add_del_static_mapping(
1037 external_sw_if_index,
1049 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
1051 Add/delete NAT44 address
1053 :param ip: IP address
1054 :param is_add: 1 if add, 0 if delete (Default add)
1055 :param twice_nat: twice NAT address for extenal hosts
1057 nat_addr = socket.inet_pton(socket.AF_INET, ip)
1058 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
1060 twice_nat=twice_nat)
1062 def test_dynamic(self):
1063 """ NAT44 dynamic translation test """
1065 self.nat44_add_address(self.nat_addr)
1066 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1067 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1071 pkts = self.create_stream_in(self.pg0, self.pg1)
1072 self.pg0.add_stream(pkts)
1073 self.pg_enable_capture(self.pg_interfaces)
1075 capture = self.pg1.get_capture(len(pkts))
1076 self.verify_capture_out(capture)
1079 pkts = self.create_stream_out(self.pg1)
1080 self.pg1.add_stream(pkts)
1081 self.pg_enable_capture(self.pg_interfaces)
1083 capture = self.pg0.get_capture(len(pkts))
1084 self.verify_capture_in(capture, self.pg0)
1086 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1087 """ NAT44 handling of client packets with TTL=1 """
1089 self.nat44_add_address(self.nat_addr)
1090 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1091 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1094 # Client side - generate traffic
1095 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1096 self.pg0.add_stream(pkts)
1097 self.pg_enable_capture(self.pg_interfaces)
1100 # Client side - verify ICMP type 11 packets
1101 capture = self.pg0.get_capture(len(pkts))
1102 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1104 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1105 """ NAT44 handling of server packets with TTL=1 """
1107 self.nat44_add_address(self.nat_addr)
1108 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1109 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1112 # Client side - create sessions
1113 pkts = self.create_stream_in(self.pg0, self.pg1)
1114 self.pg0.add_stream(pkts)
1115 self.pg_enable_capture(self.pg_interfaces)
1118 # Server side - generate traffic
1119 capture = self.pg1.get_capture(len(pkts))
1120 self.verify_capture_out(capture)
1121 pkts = self.create_stream_out(self.pg1, ttl=1)
1122 self.pg1.add_stream(pkts)
1123 self.pg_enable_capture(self.pg_interfaces)
1126 # Server side - verify ICMP type 11 packets
1127 capture = self.pg1.get_capture(len(pkts))
1128 self.verify_capture_out_with_icmp_errors(capture,
1129 src_ip=self.pg1.local_ip4)
1131 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1132 """ NAT44 handling of error responses to client packets with TTL=2 """
1134 self.nat44_add_address(self.nat_addr)
1135 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1136 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1139 # Client side - generate traffic
1140 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1141 self.pg0.add_stream(pkts)
1142 self.pg_enable_capture(self.pg_interfaces)
1145 # Server side - simulate ICMP type 11 response
1146 capture = self.pg1.get_capture(len(pkts))
1147 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1148 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1149 ICMP(type=11) / packet[IP] for packet in capture]
1150 self.pg1.add_stream(pkts)
1151 self.pg_enable_capture(self.pg_interfaces)
1154 # Client side - verify ICMP type 11 packets
1155 capture = self.pg0.get_capture(len(pkts))
1156 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1158 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1159 """ NAT44 handling of error responses to server packets with TTL=2 """
1161 self.nat44_add_address(self.nat_addr)
1162 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1163 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1166 # Client side - create sessions
1167 pkts = self.create_stream_in(self.pg0, self.pg1)
1168 self.pg0.add_stream(pkts)
1169 self.pg_enable_capture(self.pg_interfaces)
1172 # Server side - generate traffic
1173 capture = self.pg1.get_capture(len(pkts))
1174 self.verify_capture_out(capture)
1175 pkts = self.create_stream_out(self.pg1, ttl=2)
1176 self.pg1.add_stream(pkts)
1177 self.pg_enable_capture(self.pg_interfaces)
1180 # Client side - simulate ICMP type 11 response
1181 capture = self.pg0.get_capture(len(pkts))
1182 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1183 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1184 ICMP(type=11) / packet[IP] for packet in capture]
1185 self.pg0.add_stream(pkts)
1186 self.pg_enable_capture(self.pg_interfaces)
1189 # Server side - verify ICMP type 11 packets
1190 capture = self.pg1.get_capture(len(pkts))
1191 self.verify_capture_out_with_icmp_errors(capture)
1193 def test_ping_out_interface_from_outside(self):
1194 """ Ping NAT44 out interface from outside network """
1196 self.nat44_add_address(self.nat_addr)
1197 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1198 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1201 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1202 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1203 ICMP(id=self.icmp_id_out, type='echo-request'))
1205 self.pg1.add_stream(pkts)
1206 self.pg_enable_capture(self.pg_interfaces)
1208 capture = self.pg1.get_capture(len(pkts))
1209 self.assertEqual(1, len(capture))
1212 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1213 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1214 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1215 self.assertEqual(packet[ICMP].type, 0) # echo reply
1217 self.logger.error(ppp("Unexpected or invalid packet "
1218 "(outside network):", packet))
1221 def test_ping_internal_host_from_outside(self):
1222 """ Ping internal host from outside network """
1224 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1225 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1226 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1230 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1231 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1232 ICMP(id=self.icmp_id_out, type='echo-request'))
1233 self.pg1.add_stream(pkt)
1234 self.pg_enable_capture(self.pg_interfaces)
1236 capture = self.pg0.get_capture(1)
1237 self.verify_capture_in(capture, self.pg0, packet_num=1)
1238 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1241 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1242 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1243 ICMP(id=self.icmp_id_in, type='echo-reply'))
1244 self.pg0.add_stream(pkt)
1245 self.pg_enable_capture(self.pg_interfaces)
1247 capture = self.pg1.get_capture(1)
1248 self.verify_capture_out(capture, same_port=True, packet_num=1)
1249 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1251 def test_forwarding(self):
1252 """ NAT44 forwarding test """
1254 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1255 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1257 self.vapi.nat44_forwarding_enable_disable(1)
1259 real_ip = self.pg0.remote_ip4n
1260 alias_ip = self.nat_addr_n
1261 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1262 external_ip=alias_ip)
1265 # in2out - static mapping match
1267 pkts = self.create_stream_out(self.pg1)
1268 self.pg1.add_stream(pkts)
1269 self.pg_enable_capture(self.pg_interfaces)
1271 capture = self.pg0.get_capture(len(pkts))
1272 self.verify_capture_in(capture, self.pg0)
1274 pkts = self.create_stream_in(self.pg0, self.pg1)
1275 self.pg0.add_stream(pkts)
1276 self.pg_enable_capture(self.pg_interfaces)
1278 capture = self.pg1.get_capture(len(pkts))
1279 self.verify_capture_out(capture, same_port=True)
1281 # in2out - no static mapping match
1283 host0 = self.pg0.remote_hosts[0]
1284 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1286 pkts = self.create_stream_out(self.pg1,
1287 dst_ip=self.pg0.remote_ip4,
1288 use_inside_ports=True)
1289 self.pg1.add_stream(pkts)
1290 self.pg_enable_capture(self.pg_interfaces)
1292 capture = self.pg0.get_capture(len(pkts))
1293 self.verify_capture_in(capture, self.pg0)
1295 pkts = self.create_stream_in(self.pg0, self.pg1)
1296 self.pg0.add_stream(pkts)
1297 self.pg_enable_capture(self.pg_interfaces)
1299 capture = self.pg1.get_capture(len(pkts))
1300 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1303 self.pg0.remote_hosts[0] = host0
1306 self.vapi.nat44_forwarding_enable_disable(0)
1307 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1308 external_ip=alias_ip,
1311 def test_static_in(self):
1312 """ 1:1 NAT initialized from inside network """
1314 nat_ip = "10.0.0.10"
1315 self.tcp_port_out = 6303
1316 self.udp_port_out = 6304
1317 self.icmp_id_out = 6305
1319 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1320 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1321 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1323 sm = self.vapi.nat44_static_mapping_dump()
1324 self.assertEqual(len(sm), 1)
1325 self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1326 self.assertEqual(sm[0].protocol, 0)
1327 self.assertEqual(sm[0].local_port, 0)
1328 self.assertEqual(sm[0].external_port, 0)
1331 pkts = self.create_stream_in(self.pg0, self.pg1)
1332 self.pg0.add_stream(pkts)
1333 self.pg_enable_capture(self.pg_interfaces)
1335 capture = self.pg1.get_capture(len(pkts))
1336 self.verify_capture_out(capture, nat_ip, True)
1339 pkts = self.create_stream_out(self.pg1, nat_ip)
1340 self.pg1.add_stream(pkts)
1341 self.pg_enable_capture(self.pg_interfaces)
1343 capture = self.pg0.get_capture(len(pkts))
1344 self.verify_capture_in(capture, self.pg0)
1346 def test_static_out(self):
1347 """ 1:1 NAT initialized from outside network """
1349 nat_ip = "10.0.0.20"
1350 self.tcp_port_out = 6303
1351 self.udp_port_out = 6304
1352 self.icmp_id_out = 6305
1355 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1356 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1357 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1359 sm = self.vapi.nat44_static_mapping_dump()
1360 self.assertEqual(len(sm), 1)
1361 self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1364 pkts = self.create_stream_out(self.pg1, nat_ip)
1365 self.pg1.add_stream(pkts)
1366 self.pg_enable_capture(self.pg_interfaces)
1368 capture = self.pg0.get_capture(len(pkts))
1369 self.verify_capture_in(capture, self.pg0)
1372 pkts = self.create_stream_in(self.pg0, self.pg1)
1373 self.pg0.add_stream(pkts)
1374 self.pg_enable_capture(self.pg_interfaces)
1376 capture = self.pg1.get_capture(len(pkts))
1377 self.verify_capture_out(capture, nat_ip, True)
1379 def test_static_with_port_in(self):
1380 """ 1:1 NAPT initialized from inside network """
1382 self.tcp_port_out = 3606
1383 self.udp_port_out = 3607
1384 self.icmp_id_out = 3608
1386 self.nat44_add_address(self.nat_addr)
1387 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1388 self.tcp_port_in, self.tcp_port_out,
1389 proto=IP_PROTOS.tcp)
1390 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1391 self.udp_port_in, self.udp_port_out,
1392 proto=IP_PROTOS.udp)
1393 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1394 self.icmp_id_in, self.icmp_id_out,
1395 proto=IP_PROTOS.icmp)
1396 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1397 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1401 pkts = self.create_stream_in(self.pg0, self.pg1)
1402 self.pg0.add_stream(pkts)
1403 self.pg_enable_capture(self.pg_interfaces)
1405 capture = self.pg1.get_capture(len(pkts))
1406 self.verify_capture_out(capture)
1409 pkts = self.create_stream_out(self.pg1)
1410 self.pg1.add_stream(pkts)
1411 self.pg_enable_capture(self.pg_interfaces)
1413 capture = self.pg0.get_capture(len(pkts))
1414 self.verify_capture_in(capture, self.pg0)
1416 def test_static_with_port_out(self):
1417 """ 1:1 NAPT initialized from outside network """
1419 self.tcp_port_out = 30606
1420 self.udp_port_out = 30607
1421 self.icmp_id_out = 30608
1423 self.nat44_add_address(self.nat_addr)
1424 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1425 self.tcp_port_in, self.tcp_port_out,
1426 proto=IP_PROTOS.tcp)
1427 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1428 self.udp_port_in, self.udp_port_out,
1429 proto=IP_PROTOS.udp)
1430 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1431 self.icmp_id_in, self.icmp_id_out,
1432 proto=IP_PROTOS.icmp)
1433 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1434 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1438 pkts = self.create_stream_out(self.pg1)
1439 self.pg1.add_stream(pkts)
1440 self.pg_enable_capture(self.pg_interfaces)
1442 capture = self.pg0.get_capture(len(pkts))
1443 self.verify_capture_in(capture, self.pg0)
1446 pkts = self.create_stream_in(self.pg0, self.pg1)
1447 self.pg0.add_stream(pkts)
1448 self.pg_enable_capture(self.pg_interfaces)
1450 capture = self.pg1.get_capture(len(pkts))
1451 self.verify_capture_out(capture)
1453 def test_static_with_port_out2(self):
1454 """ 1:1 NAPT symmetrical rule """
1459 self.vapi.nat44_forwarding_enable_disable(1)
1460 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1461 local_port, external_port,
1462 proto=IP_PROTOS.tcp, out2in_only=1)
1463 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1464 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1467 # from client to service
1468 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1469 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1470 TCP(sport=12345, dport=external_port))
1471 self.pg1.add_stream(p)
1472 self.pg_enable_capture(self.pg_interfaces)
1474 capture = self.pg0.get_capture(1)
1479 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1480 self.assertEqual(tcp.dport, local_port)
1481 self.assert_packet_checksums_valid(p)
1483 self.logger.error(ppp("Unexpected or invalid packet:", p))
1487 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1488 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1489 ICMP(type=11) / capture[0][IP])
1490 self.pg0.add_stream(p)
1491 self.pg_enable_capture(self.pg_interfaces)
1493 capture = self.pg1.get_capture(1)
1496 self.assertEqual(p[IP].src, self.nat_addr)
1498 self.assertEqual(inner.dst, self.nat_addr)
1499 self.assertEqual(inner[TCPerror].dport, external_port)
1501 self.logger.error(ppp("Unexpected or invalid packet:", p))
1504 # from service back to client
1505 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1506 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1507 TCP(sport=local_port, dport=12345))
1508 self.pg0.add_stream(p)
1509 self.pg_enable_capture(self.pg_interfaces)
1511 capture = self.pg1.get_capture(1)
1516 self.assertEqual(ip.src, self.nat_addr)
1517 self.assertEqual(tcp.sport, external_port)
1518 self.assert_packet_checksums_valid(p)
1520 self.logger.error(ppp("Unexpected or invalid packet:", p))
1524 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1525 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1526 ICMP(type=11) / capture[0][IP])
1527 self.pg1.add_stream(p)
1528 self.pg_enable_capture(self.pg_interfaces)
1530 capture = self.pg0.get_capture(1)
1533 self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
1535 self.assertEqual(inner.src, self.pg0.remote_ip4)
1536 self.assertEqual(inner[TCPerror].sport, local_port)
1538 self.logger.error(ppp("Unexpected or invalid packet:", p))
1541 # from client to server (no translation)
1542 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1543 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1544 TCP(sport=12346, dport=local_port))
1545 self.pg1.add_stream(p)
1546 self.pg_enable_capture(self.pg_interfaces)
1548 capture = self.pg0.get_capture(1)
1553 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1554 self.assertEqual(tcp.dport, local_port)
1555 self.assert_packet_checksums_valid(p)
1557 self.logger.error(ppp("Unexpected or invalid packet:", p))
1560 # from service back to client (no translation)
1561 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1562 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1563 TCP(sport=local_port, dport=12346))
1564 self.pg0.add_stream(p)
1565 self.pg_enable_capture(self.pg_interfaces)
1567 capture = self.pg1.get_capture(1)
1572 self.assertEqual(ip.src, self.pg0.remote_ip4)
1573 self.assertEqual(tcp.sport, local_port)
1574 self.assert_packet_checksums_valid(p)
1576 self.logger.error(ppp("Unexpected or invalid packet:", p))
1579 def test_static_vrf_aware(self):
1580 """ 1:1 NAT VRF awareness """
1582 nat_ip1 = "10.0.0.30"
1583 nat_ip2 = "10.0.0.40"
1584 self.tcp_port_out = 6303
1585 self.udp_port_out = 6304
1586 self.icmp_id_out = 6305
1588 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1590 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1592 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1594 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1595 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1597 # inside interface VRF match NAT44 static mapping VRF
1598 pkts = self.create_stream_in(self.pg4, self.pg3)
1599 self.pg4.add_stream(pkts)
1600 self.pg_enable_capture(self.pg_interfaces)
1602 capture = self.pg3.get_capture(len(pkts))
1603 self.verify_capture_out(capture, nat_ip1, True)
1605 # inside interface VRF don't match NAT44 static mapping VRF (packets
1607 pkts = self.create_stream_in(self.pg0, self.pg3)
1608 self.pg0.add_stream(pkts)
1609 self.pg_enable_capture(self.pg_interfaces)
1611 self.pg3.assert_nothing_captured()
1613 def test_dynamic_to_static(self):
1614 """ Switch from dynamic translation to 1:1NAT """
1615 nat_ip = "10.0.0.10"
1616 self.tcp_port_out = 6303
1617 self.udp_port_out = 6304
1618 self.icmp_id_out = 6305
1620 self.nat44_add_address(self.nat_addr)
1621 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1622 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1626 pkts = self.create_stream_in(self.pg0, self.pg1)
1627 self.pg0.add_stream(pkts)
1628 self.pg_enable_capture(self.pg_interfaces)
1630 capture = self.pg1.get_capture(len(pkts))
1631 self.verify_capture_out(capture)
1634 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1635 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
1636 self.assertEqual(len(sessions), 0)
1637 pkts = self.create_stream_in(self.pg0, self.pg1)
1638 self.pg0.add_stream(pkts)
1639 self.pg_enable_capture(self.pg_interfaces)
1641 capture = self.pg1.get_capture(len(pkts))
1642 self.verify_capture_out(capture, nat_ip, True)
1644 def test_identity_nat(self):
1645 """ Identity NAT """
1647 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1648 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1649 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1652 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1653 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1654 TCP(sport=12345, dport=56789))
1655 self.pg1.add_stream(p)
1656 self.pg_enable_capture(self.pg_interfaces)
1658 capture = self.pg0.get_capture(1)
1663 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1664 self.assertEqual(ip.src, self.pg1.remote_ip4)
1665 self.assertEqual(tcp.dport, 56789)
1666 self.assertEqual(tcp.sport, 12345)
1667 self.assert_packet_checksums_valid(p)
1669 self.logger.error(ppp("Unexpected or invalid packet:", p))
1672 def test_static_lb(self):
1673 """ NAT44 local service load balancing """
1674 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1677 server1 = self.pg0.remote_hosts[0]
1678 server2 = self.pg0.remote_hosts[1]
1680 locals = [{'addr': server1.ip4n,
1683 {'addr': server2.ip4n,
1687 self.nat44_add_address(self.nat_addr)
1688 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1691 local_num=len(locals),
1693 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1694 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1697 # from client to service
1698 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1699 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1700 TCP(sport=12345, dport=external_port))
1701 self.pg1.add_stream(p)
1702 self.pg_enable_capture(self.pg_interfaces)
1704 capture = self.pg0.get_capture(1)
1710 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1711 if ip.dst == server1.ip4:
1715 self.assertEqual(tcp.dport, local_port)
1716 self.assert_packet_checksums_valid(p)
1718 self.logger.error(ppp("Unexpected or invalid packet:", p))
1721 # from service back to client
1722 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1723 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1724 TCP(sport=local_port, dport=12345))
1725 self.pg0.add_stream(p)
1726 self.pg_enable_capture(self.pg_interfaces)
1728 capture = self.pg1.get_capture(1)
1733 self.assertEqual(ip.src, self.nat_addr)
1734 self.assertEqual(tcp.sport, external_port)
1735 self.assert_packet_checksums_valid(p)
1737 self.logger.error(ppp("Unexpected or invalid packet:", p))
1740 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1741 def test_static_lb_multi_clients(self):
1742 """ NAT44 local service load balancing - multiple clients"""
1744 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1747 server1 = self.pg0.remote_hosts[0]
1748 server2 = self.pg0.remote_hosts[1]
1750 locals = [{'addr': server1.ip4n,
1753 {'addr': server2.ip4n,
1757 self.nat44_add_address(self.nat_addr)
1758 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1761 local_num=len(locals),
1763 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1764 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1769 clients = ip4_range(self.pg1.remote_ip4, 10, 50)
1771 for client in clients:
1772 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1773 IP(src=client, dst=self.nat_addr) /
1774 TCP(sport=12345, dport=external_port))
1776 self.pg1.add_stream(pkts)
1777 self.pg_enable_capture(self.pg_interfaces)
1779 capture = self.pg0.get_capture(len(pkts))
1781 if p[IP].dst == server1.ip4:
1785 self.assertTrue(server1_n > server2_n)
1787 def test_static_lb_2(self):
1788 """ NAT44 local service load balancing (asymmetrical rule) """
1789 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1792 server1 = self.pg0.remote_hosts[0]
1793 server2 = self.pg0.remote_hosts[1]
1795 locals = [{'addr': server1.ip4n,
1798 {'addr': server2.ip4n,
1802 self.vapi.nat44_forwarding_enable_disable(1)
1803 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1807 local_num=len(locals),
1809 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1810 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1813 # from client to service
1814 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1815 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1816 TCP(sport=12345, dport=external_port))
1817 self.pg1.add_stream(p)
1818 self.pg_enable_capture(self.pg_interfaces)
1820 capture = self.pg0.get_capture(1)
1826 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1827 if ip.dst == server1.ip4:
1831 self.assertEqual(tcp.dport, local_port)
1832 self.assert_packet_checksums_valid(p)
1834 self.logger.error(ppp("Unexpected or invalid packet:", p))
1837 # from service back to client
1838 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1839 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1840 TCP(sport=local_port, dport=12345))
1841 self.pg0.add_stream(p)
1842 self.pg_enable_capture(self.pg_interfaces)
1844 capture = self.pg1.get_capture(1)
1849 self.assertEqual(ip.src, self.nat_addr)
1850 self.assertEqual(tcp.sport, external_port)
1851 self.assert_packet_checksums_valid(p)
1853 self.logger.error(ppp("Unexpected or invalid packet:", p))
1856 # from client to server (no translation)
1857 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1858 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
1859 TCP(sport=12346, dport=local_port))
1860 self.pg1.add_stream(p)
1861 self.pg_enable_capture(self.pg_interfaces)
1863 capture = self.pg0.get_capture(1)
1869 self.assertEqual(ip.dst, server1.ip4)
1870 self.assertEqual(tcp.dport, local_port)
1871 self.assert_packet_checksums_valid(p)
1873 self.logger.error(ppp("Unexpected or invalid packet:", p))
1876 # from service back to client (no translation)
1877 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
1878 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
1879 TCP(sport=local_port, dport=12346))
1880 self.pg0.add_stream(p)
1881 self.pg_enable_capture(self.pg_interfaces)
1883 capture = self.pg1.get_capture(1)
1888 self.assertEqual(ip.src, server1.ip4)
1889 self.assertEqual(tcp.sport, local_port)
1890 self.assert_packet_checksums_valid(p)
1892 self.logger.error(ppp("Unexpected or invalid packet:", p))
1895 def test_multiple_inside_interfaces(self):
1896 """ NAT44 multiple non-overlapping address space inside interfaces """
1898 self.nat44_add_address(self.nat_addr)
1899 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1900 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1901 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1904 # between two NAT44 inside interfaces (no translation)
1905 pkts = self.create_stream_in(self.pg0, self.pg1)
1906 self.pg0.add_stream(pkts)
1907 self.pg_enable_capture(self.pg_interfaces)
1909 capture = self.pg1.get_capture(len(pkts))
1910 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1912 # from NAT44 inside to interface without NAT44 feature (no translation)
1913 pkts = self.create_stream_in(self.pg0, self.pg2)
1914 self.pg0.add_stream(pkts)
1915 self.pg_enable_capture(self.pg_interfaces)
1917 capture = self.pg2.get_capture(len(pkts))
1918 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1920 # in2out 1st interface
1921 pkts = self.create_stream_in(self.pg0, self.pg3)
1922 self.pg0.add_stream(pkts)
1923 self.pg_enable_capture(self.pg_interfaces)
1925 capture = self.pg3.get_capture(len(pkts))
1926 self.verify_capture_out(capture)
1928 # out2in 1st interface
1929 pkts = self.create_stream_out(self.pg3)
1930 self.pg3.add_stream(pkts)
1931 self.pg_enable_capture(self.pg_interfaces)
1933 capture = self.pg0.get_capture(len(pkts))
1934 self.verify_capture_in(capture, self.pg0)
1936 # in2out 2nd interface
1937 pkts = self.create_stream_in(self.pg1, self.pg3)
1938 self.pg1.add_stream(pkts)
1939 self.pg_enable_capture(self.pg_interfaces)
1941 capture = self.pg3.get_capture(len(pkts))
1942 self.verify_capture_out(capture)
1944 # out2in 2nd interface
1945 pkts = self.create_stream_out(self.pg3)
1946 self.pg3.add_stream(pkts)
1947 self.pg_enable_capture(self.pg_interfaces)
1949 capture = self.pg1.get_capture(len(pkts))
1950 self.verify_capture_in(capture, self.pg1)
1952 def test_inside_overlapping_interfaces(self):
1953 """ NAT44 multiple inside interfaces with overlapping address space """
1955 static_nat_ip = "10.0.0.10"
1956 self.nat44_add_address(self.nat_addr)
1957 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1959 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1960 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1961 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1962 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1965 # between NAT44 inside interfaces with same VRF (no translation)
1966 pkts = self.create_stream_in(self.pg4, self.pg5)
1967 self.pg4.add_stream(pkts)
1968 self.pg_enable_capture(self.pg_interfaces)
1970 capture = self.pg5.get_capture(len(pkts))
1971 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1973 # between NAT44 inside interfaces with different VRF (hairpinning)
1974 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1975 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1976 TCP(sport=1234, dport=5678))
1977 self.pg4.add_stream(p)
1978 self.pg_enable_capture(self.pg_interfaces)
1980 capture = self.pg6.get_capture(1)
1985 self.assertEqual(ip.src, self.nat_addr)
1986 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1987 self.assertNotEqual(tcp.sport, 1234)
1988 self.assertEqual(tcp.dport, 5678)
1990 self.logger.error(ppp("Unexpected or invalid packet:", p))
1993 # in2out 1st interface
1994 pkts = self.create_stream_in(self.pg4, self.pg3)
1995 self.pg4.add_stream(pkts)
1996 self.pg_enable_capture(self.pg_interfaces)
1998 capture = self.pg3.get_capture(len(pkts))
1999 self.verify_capture_out(capture)
2001 # out2in 1st interface
2002 pkts = self.create_stream_out(self.pg3)
2003 self.pg3.add_stream(pkts)
2004 self.pg_enable_capture(self.pg_interfaces)
2006 capture = self.pg4.get_capture(len(pkts))
2007 self.verify_capture_in(capture, self.pg4)
2009 # in2out 2nd interface
2010 pkts = self.create_stream_in(self.pg5, self.pg3)
2011 self.pg5.add_stream(pkts)
2012 self.pg_enable_capture(self.pg_interfaces)
2014 capture = self.pg3.get_capture(len(pkts))
2015 self.verify_capture_out(capture)
2017 # out2in 2nd interface
2018 pkts = self.create_stream_out(self.pg3)
2019 self.pg3.add_stream(pkts)
2020 self.pg_enable_capture(self.pg_interfaces)
2022 capture = self.pg5.get_capture(len(pkts))
2023 self.verify_capture_in(capture, self.pg5)
2026 addresses = self.vapi.nat44_address_dump()
2027 self.assertEqual(len(addresses), 1)
2028 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
2029 self.assertEqual(len(sessions), 3)
2030 for session in sessions:
2031 self.assertFalse(session.is_static)
2032 self.assertEqual(session.inside_ip_address[0:4],
2033 self.pg5.remote_ip4n)
2034 self.assertEqual(session.outside_ip_address,
2035 addresses[0].ip_address)
2036 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
2037 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
2038 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
2039 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
2040 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
2041 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
2042 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
2043 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
2044 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
2046 # in2out 3rd interface
2047 pkts = self.create_stream_in(self.pg6, self.pg3)
2048 self.pg6.add_stream(pkts)
2049 self.pg_enable_capture(self.pg_interfaces)
2051 capture = self.pg3.get_capture(len(pkts))
2052 self.verify_capture_out(capture, static_nat_ip, True)
2054 # out2in 3rd interface
2055 pkts = self.create_stream_out(self.pg3, static_nat_ip)
2056 self.pg3.add_stream(pkts)
2057 self.pg_enable_capture(self.pg_interfaces)
2059 capture = self.pg6.get_capture(len(pkts))
2060 self.verify_capture_in(capture, self.pg6)
2062 # general user and session dump verifications
2063 users = self.vapi.nat44_user_dump()
2064 self.assertTrue(len(users) >= 3)
2065 addresses = self.vapi.nat44_address_dump()
2066 self.assertEqual(len(addresses), 1)
2068 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
2070 for session in sessions:
2071 self.assertEqual(user.ip_address, session.inside_ip_address)
2072 self.assertTrue(session.total_bytes > session.total_pkts > 0)
2073 self.assertTrue(session.protocol in
2074 [IP_PROTOS.tcp, IP_PROTOS.udp,
2078 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
2079 self.assertTrue(len(sessions) >= 4)
2080 for session in sessions:
2081 self.assertFalse(session.is_static)
2082 self.assertEqual(session.inside_ip_address[0:4],
2083 self.pg4.remote_ip4n)
2084 self.assertEqual(session.outside_ip_address,
2085 addresses[0].ip_address)
2088 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
2089 self.assertTrue(len(sessions) >= 3)
2090 for session in sessions:
2091 self.assertTrue(session.is_static)
2092 self.assertEqual(session.inside_ip_address[0:4],
2093 self.pg6.remote_ip4n)
2094 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
2095 map(int, static_nat_ip.split('.')))
2096 self.assertTrue(session.inside_port in
2097 [self.tcp_port_in, self.udp_port_in,
2100 def test_hairpinning(self):
2101 """ NAT44 hairpinning - 1:1 NAPT """
2103 host = self.pg0.remote_hosts[0]
2104 server = self.pg0.remote_hosts[1]
2107 server_in_port = 5678
2108 server_out_port = 8765
2110 self.nat44_add_address(self.nat_addr)
2111 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2112 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2114 # add static mapping for server
2115 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2116 server_in_port, server_out_port,
2117 proto=IP_PROTOS.tcp)
2119 # send packet from host to server
2120 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2121 IP(src=host.ip4, dst=self.nat_addr) /
2122 TCP(sport=host_in_port, dport=server_out_port))
2123 self.pg0.add_stream(p)
2124 self.pg_enable_capture(self.pg_interfaces)
2126 capture = self.pg0.get_capture(1)
2131 self.assertEqual(ip.src, self.nat_addr)
2132 self.assertEqual(ip.dst, server.ip4)
2133 self.assertNotEqual(tcp.sport, host_in_port)
2134 self.assertEqual(tcp.dport, server_in_port)
2135 self.assert_packet_checksums_valid(p)
2136 host_out_port = tcp.sport
2138 self.logger.error(ppp("Unexpected or invalid packet:", p))
2141 # send reply from server to host
2142 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2143 IP(src=server.ip4, dst=self.nat_addr) /
2144 TCP(sport=server_in_port, dport=host_out_port))
2145 self.pg0.add_stream(p)
2146 self.pg_enable_capture(self.pg_interfaces)
2148 capture = self.pg0.get_capture(1)
2153 self.assertEqual(ip.src, self.nat_addr)
2154 self.assertEqual(ip.dst, host.ip4)
2155 self.assertEqual(tcp.sport, server_out_port)
2156 self.assertEqual(tcp.dport, host_in_port)
2157 self.assert_packet_checksums_valid(p)
2159 self.logger.error(ppp("Unexpected or invalid packet:", p))
2162 def test_hairpinning2(self):
2163 """ NAT44 hairpinning - 1:1 NAT"""
2165 server1_nat_ip = "10.0.0.10"
2166 server2_nat_ip = "10.0.0.11"
2167 host = self.pg0.remote_hosts[0]
2168 server1 = self.pg0.remote_hosts[1]
2169 server2 = self.pg0.remote_hosts[2]
2170 server_tcp_port = 22
2171 server_udp_port = 20
2173 self.nat44_add_address(self.nat_addr)
2174 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2175 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2178 # add static mapping for servers
2179 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2180 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2184 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2185 IP(src=host.ip4, dst=server1_nat_ip) /
2186 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2188 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2189 IP(src=host.ip4, dst=server1_nat_ip) /
2190 UDP(sport=self.udp_port_in, dport=server_udp_port))
2192 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2193 IP(src=host.ip4, dst=server1_nat_ip) /
2194 ICMP(id=self.icmp_id_in, type='echo-request'))
2196 self.pg0.add_stream(pkts)
2197 self.pg_enable_capture(self.pg_interfaces)
2199 capture = self.pg0.get_capture(len(pkts))
2200 for packet in capture:
2202 self.assertEqual(packet[IP].src, self.nat_addr)
2203 self.assertEqual(packet[IP].dst, server1.ip4)
2204 if packet.haslayer(TCP):
2205 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2206 self.assertEqual(packet[TCP].dport, server_tcp_port)
2207 self.tcp_port_out = packet[TCP].sport
2208 self.assert_packet_checksums_valid(packet)
2209 elif packet.haslayer(UDP):
2210 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2211 self.assertEqual(packet[UDP].dport, server_udp_port)
2212 self.udp_port_out = packet[UDP].sport
2214 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2215 self.icmp_id_out = packet[ICMP].id
2217 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2222 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2223 IP(src=server1.ip4, dst=self.nat_addr) /
2224 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2226 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2227 IP(src=server1.ip4, dst=self.nat_addr) /
2228 UDP(sport=server_udp_port, dport=self.udp_port_out))
2230 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2231 IP(src=server1.ip4, dst=self.nat_addr) /
2232 ICMP(id=self.icmp_id_out, type='echo-reply'))
2234 self.pg0.add_stream(pkts)
2235 self.pg_enable_capture(self.pg_interfaces)
2237 capture = self.pg0.get_capture(len(pkts))
2238 for packet in capture:
2240 self.assertEqual(packet[IP].src, server1_nat_ip)
2241 self.assertEqual(packet[IP].dst, host.ip4)
2242 if packet.haslayer(TCP):
2243 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2244 self.assertEqual(packet[TCP].sport, server_tcp_port)
2245 self.assert_packet_checksums_valid(packet)
2246 elif packet.haslayer(UDP):
2247 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2248 self.assertEqual(packet[UDP].sport, server_udp_port)
2250 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2252 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2255 # server2 to server1
2257 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2258 IP(src=server2.ip4, dst=server1_nat_ip) /
2259 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2261 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2262 IP(src=server2.ip4, dst=server1_nat_ip) /
2263 UDP(sport=self.udp_port_in, dport=server_udp_port))
2265 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2266 IP(src=server2.ip4, dst=server1_nat_ip) /
2267 ICMP(id=self.icmp_id_in, type='echo-request'))
2269 self.pg0.add_stream(pkts)
2270 self.pg_enable_capture(self.pg_interfaces)
2272 capture = self.pg0.get_capture(len(pkts))
2273 for packet in capture:
2275 self.assertEqual(packet[IP].src, server2_nat_ip)
2276 self.assertEqual(packet[IP].dst, server1.ip4)
2277 if packet.haslayer(TCP):
2278 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2279 self.assertEqual(packet[TCP].dport, server_tcp_port)
2280 self.tcp_port_out = packet[TCP].sport
2281 self.assert_packet_checksums_valid(packet)
2282 elif packet.haslayer(UDP):
2283 self.assertEqual(packet[UDP].sport, self.udp_port_in)
2284 self.assertEqual(packet[UDP].dport, server_udp_port)
2285 self.udp_port_out = packet[UDP].sport
2287 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2288 self.icmp_id_out = packet[ICMP].id
2290 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2293 # server1 to server2
2295 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2296 IP(src=server1.ip4, dst=server2_nat_ip) /
2297 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2299 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2300 IP(src=server1.ip4, dst=server2_nat_ip) /
2301 UDP(sport=server_udp_port, dport=self.udp_port_out))
2303 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2304 IP(src=server1.ip4, dst=server2_nat_ip) /
2305 ICMP(id=self.icmp_id_out, type='echo-reply'))
2307 self.pg0.add_stream(pkts)
2308 self.pg_enable_capture(self.pg_interfaces)
2310 capture = self.pg0.get_capture(len(pkts))
2311 for packet in capture:
2313 self.assertEqual(packet[IP].src, server1_nat_ip)
2314 self.assertEqual(packet[IP].dst, server2.ip4)
2315 if packet.haslayer(TCP):
2316 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2317 self.assertEqual(packet[TCP].sport, server_tcp_port)
2318 self.assert_packet_checksums_valid(packet)
2319 elif packet.haslayer(UDP):
2320 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2321 self.assertEqual(packet[UDP].sport, server_udp_port)
2323 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2325 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2328 def test_max_translations_per_user(self):
2329 """ MAX translations per user - recycle the least recently used """
2331 self.nat44_add_address(self.nat_addr)
2332 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2333 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2336 # get maximum number of translations per user
2337 nat44_config = self.vapi.nat_show_config()
2339 # send more than maximum number of translations per user packets
2340 pkts_num = nat44_config.max_translations_per_user + 5
2342 for port in range(0, pkts_num):
2343 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2344 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2345 TCP(sport=1025 + port))
2347 self.pg0.add_stream(pkts)
2348 self.pg_enable_capture(self.pg_interfaces)
2351 # verify number of translated packet
2352 self.pg1.get_capture(pkts_num)
2354 users = self.vapi.nat44_user_dump()
2356 if user.ip_address == self.pg0.remote_ip4n:
2357 self.assertEqual(user.nsessions,
2358 nat44_config.max_translations_per_user)
2359 self.assertEqual(user.nstaticsessions, 0)
2362 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
2364 proto=IP_PROTOS.tcp)
2365 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2366 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2367 TCP(sport=tcp_port))
2368 self.pg0.add_stream(p)
2369 self.pg_enable_capture(self.pg_interfaces)
2371 self.pg1.get_capture(1)
2372 users = self.vapi.nat44_user_dump()
2374 if user.ip_address == self.pg0.remote_ip4n:
2375 self.assertEqual(user.nsessions,
2376 nat44_config.max_translations_per_user - 1)
2377 self.assertEqual(user.nstaticsessions, 1)
2379 def test_interface_addr(self):
2380 """ Acquire NAT44 addresses from interface """
2381 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2383 # no address in NAT pool
2384 adresses = self.vapi.nat44_address_dump()
2385 self.assertEqual(0, len(adresses))
2387 # configure interface address and check NAT address pool
2388 self.pg7.config_ip4()
2389 adresses = self.vapi.nat44_address_dump()
2390 self.assertEqual(1, len(adresses))
2391 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2393 # remove interface address and check NAT address pool
2394 self.pg7.unconfig_ip4()
2395 adresses = self.vapi.nat44_address_dump()
2396 self.assertEqual(0, len(adresses))
2398 def test_interface_addr_static_mapping(self):
2399 """ Static mapping with addresses from interface """
2402 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2403 self.nat44_add_static_mapping(
2405 external_sw_if_index=self.pg7.sw_if_index,
2408 # static mappings with external interface
2409 static_mappings = self.vapi.nat44_static_mapping_dump()
2410 self.assertEqual(1, len(static_mappings))
2411 self.assertEqual(self.pg7.sw_if_index,
2412 static_mappings[0].external_sw_if_index)
2413 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2415 # configure interface address and check static mappings
2416 self.pg7.config_ip4()
2417 static_mappings = self.vapi.nat44_static_mapping_dump()
2418 self.assertEqual(2, len(static_mappings))
2420 for sm in static_mappings:
2421 if sm.external_sw_if_index == 0xFFFFFFFF:
2422 self.assertEqual(sm.external_ip_address[0:4],
2423 self.pg7.local_ip4n)
2424 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2426 self.assertTrue(resolved)
2428 # remove interface address and check static mappings
2429 self.pg7.unconfig_ip4()
2430 static_mappings = self.vapi.nat44_static_mapping_dump()
2431 self.assertEqual(1, len(static_mappings))
2432 self.assertEqual(self.pg7.sw_if_index,
2433 static_mappings[0].external_sw_if_index)
2434 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2436 # configure interface address again and check static mappings
2437 self.pg7.config_ip4()
2438 static_mappings = self.vapi.nat44_static_mapping_dump()
2439 self.assertEqual(2, len(static_mappings))
2441 for sm in static_mappings:
2442 if sm.external_sw_if_index == 0xFFFFFFFF:
2443 self.assertEqual(sm.external_ip_address[0:4],
2444 self.pg7.local_ip4n)
2445 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2447 self.assertTrue(resolved)
2449 # remove static mapping
2450 self.nat44_add_static_mapping(
2452 external_sw_if_index=self.pg7.sw_if_index,
2455 static_mappings = self.vapi.nat44_static_mapping_dump()
2456 self.assertEqual(0, len(static_mappings))
2458 def test_interface_addr_identity_nat(self):
2459 """ Identity NAT with addresses from interface """
2462 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2463 self.vapi.nat44_add_del_identity_mapping(
2464 sw_if_index=self.pg7.sw_if_index,
2466 protocol=IP_PROTOS.tcp,
2469 # identity mappings with external interface
2470 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2471 self.assertEqual(1, len(identity_mappings))
2472 self.assertEqual(self.pg7.sw_if_index,
2473 identity_mappings[0].sw_if_index)
2475 # configure interface address and check identity mappings
2476 self.pg7.config_ip4()
2477 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2479 self.assertEqual(2, len(identity_mappings))
2480 for sm in identity_mappings:
2481 if sm.sw_if_index == 0xFFFFFFFF:
2482 self.assertEqual(identity_mappings[0].ip_address,
2483 self.pg7.local_ip4n)
2484 self.assertEqual(port, identity_mappings[0].port)
2485 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2487 self.assertTrue(resolved)
2489 # remove interface address and check identity mappings
2490 self.pg7.unconfig_ip4()
2491 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2492 self.assertEqual(1, len(identity_mappings))
2493 self.assertEqual(self.pg7.sw_if_index,
2494 identity_mappings[0].sw_if_index)
2496 def test_ipfix_nat44_sess(self):
2497 """ IPFIX logging NAT44 session created/delted """
2498 self.ipfix_domain_id = 10
2499 self.ipfix_src_port = 20202
2500 colector_port = 30303
2501 bind_layers(UDP, IPFIX, dport=30303)
2502 self.nat44_add_address(self.nat_addr)
2503 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2504 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2506 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2507 src_address=self.pg3.local_ip4n,
2509 template_interval=10,
2510 collector_port=colector_port)
2511 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2512 src_port=self.ipfix_src_port)
2514 pkts = self.create_stream_in(self.pg0, self.pg1)
2515 self.pg0.add_stream(pkts)
2516 self.pg_enable_capture(self.pg_interfaces)
2518 capture = self.pg1.get_capture(len(pkts))
2519 self.verify_capture_out(capture)
2520 self.nat44_add_address(self.nat_addr, is_add=0)
2521 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2522 capture = self.pg3.get_capture(9)
2523 ipfix = IPFIXDecoder()
2524 # first load template
2526 self.assertTrue(p.haslayer(IPFIX))
2527 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2528 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2529 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2530 self.assertEqual(p[UDP].dport, colector_port)
2531 self.assertEqual(p[IPFIX].observationDomainID,
2532 self.ipfix_domain_id)
2533 if p.haslayer(Template):
2534 ipfix.add_template(p.getlayer(Template))
2535 # verify events in data set
2537 if p.haslayer(Data):
2538 data = ipfix.decode_data_set(p.getlayer(Set))
2539 self.verify_ipfix_nat44_ses(data)
2541 def test_ipfix_addr_exhausted(self):
2542 """ IPFIX logging NAT addresses exhausted """
2543 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2544 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2546 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2547 src_address=self.pg3.local_ip4n,
2549 template_interval=10)
2550 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2551 src_port=self.ipfix_src_port)
2553 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2554 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2556 self.pg0.add_stream(p)
2557 self.pg_enable_capture(self.pg_interfaces)
2559 capture = self.pg1.get_capture(0)
2560 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2561 capture = self.pg3.get_capture(9)
2562 ipfix = IPFIXDecoder()
2563 # first load template
2565 self.assertTrue(p.haslayer(IPFIX))
2566 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2567 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2568 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2569 self.assertEqual(p[UDP].dport, 4739)
2570 self.assertEqual(p[IPFIX].observationDomainID,
2571 self.ipfix_domain_id)
2572 if p.haslayer(Template):
2573 ipfix.add_template(p.getlayer(Template))
2574 # verify events in data set
2576 if p.haslayer(Data):
2577 data = ipfix.decode_data_set(p.getlayer(Set))
2578 self.verify_ipfix_addr_exhausted(data)
2580 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2581 def test_ipfix_max_sessions(self):
2582 """ IPFIX logging maximum session entries exceeded """
2583 self.nat44_add_address(self.nat_addr)
2584 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2585 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2588 nat44_config = self.vapi.nat_show_config()
2589 max_sessions = 10 * nat44_config.translation_buckets
2592 for i in range(0, max_sessions):
2593 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2594 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2595 IP(src=src, dst=self.pg1.remote_ip4) /
2598 self.pg0.add_stream(pkts)
2599 self.pg_enable_capture(self.pg_interfaces)
2602 self.pg1.get_capture(max_sessions)
2603 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2604 src_address=self.pg3.local_ip4n,
2606 template_interval=10)
2607 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2608 src_port=self.ipfix_src_port)
2610 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2611 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2613 self.pg0.add_stream(p)
2614 self.pg_enable_capture(self.pg_interfaces)
2616 self.pg1.get_capture(0)
2617 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2618 capture = self.pg3.get_capture(9)
2619 ipfix = IPFIXDecoder()
2620 # first load template
2622 self.assertTrue(p.haslayer(IPFIX))
2623 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2624 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2625 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2626 self.assertEqual(p[UDP].dport, 4739)
2627 self.assertEqual(p[IPFIX].observationDomainID,
2628 self.ipfix_domain_id)
2629 if p.haslayer(Template):
2630 ipfix.add_template(p.getlayer(Template))
2631 # verify events in data set
2633 if p.haslayer(Data):
2634 data = ipfix.decode_data_set(p.getlayer(Set))
2635 self.verify_ipfix_max_sessions(data, max_sessions)
2637 def test_pool_addr_fib(self):
2638 """ NAT44 add pool addresses to FIB """
2639 static_addr = '10.0.0.10'
2640 self.nat44_add_address(self.nat_addr)
2641 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2642 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2644 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2647 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2648 ARP(op=ARP.who_has, pdst=self.nat_addr,
2649 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2650 self.pg1.add_stream(p)
2651 self.pg_enable_capture(self.pg_interfaces)
2653 capture = self.pg1.get_capture(1)
2654 self.assertTrue(capture[0].haslayer(ARP))
2655 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2658 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2659 ARP(op=ARP.who_has, pdst=static_addr,
2660 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2661 self.pg1.add_stream(p)
2662 self.pg_enable_capture(self.pg_interfaces)
2664 capture = self.pg1.get_capture(1)
2665 self.assertTrue(capture[0].haslayer(ARP))
2666 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2668 # send ARP to non-NAT44 interface
2669 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2670 ARP(op=ARP.who_has, pdst=self.nat_addr,
2671 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2672 self.pg2.add_stream(p)
2673 self.pg_enable_capture(self.pg_interfaces)
2675 capture = self.pg1.get_capture(0)
2677 # remove addresses and verify
2678 self.nat44_add_address(self.nat_addr, is_add=0)
2679 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2682 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2683 ARP(op=ARP.who_has, pdst=self.nat_addr,
2684 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2685 self.pg1.add_stream(p)
2686 self.pg_enable_capture(self.pg_interfaces)
2688 capture = self.pg1.get_capture(0)
2690 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2691 ARP(op=ARP.who_has, pdst=static_addr,
2692 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2693 self.pg1.add_stream(p)
2694 self.pg_enable_capture(self.pg_interfaces)
2696 capture = self.pg1.get_capture(0)
2698 def test_vrf_mode(self):
2699 """ NAT44 tenant VRF aware address pool mode """
2703 nat_ip1 = "10.0.0.10"
2704 nat_ip2 = "10.0.0.11"
2706 self.pg0.unconfig_ip4()
2707 self.pg1.unconfig_ip4()
2708 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2709 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2710 self.pg0.set_table_ip4(vrf_id1)
2711 self.pg1.set_table_ip4(vrf_id2)
2712 self.pg0.config_ip4()
2713 self.pg1.config_ip4()
2715 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2716 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2717 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2718 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2719 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2723 pkts = self.create_stream_in(self.pg0, self.pg2)
2724 self.pg0.add_stream(pkts)
2725 self.pg_enable_capture(self.pg_interfaces)
2727 capture = self.pg2.get_capture(len(pkts))
2728 self.verify_capture_out(capture, nat_ip1)
2731 pkts = self.create_stream_in(self.pg1, self.pg2)
2732 self.pg1.add_stream(pkts)
2733 self.pg_enable_capture(self.pg_interfaces)
2735 capture = self.pg2.get_capture(len(pkts))
2736 self.verify_capture_out(capture, nat_ip2)
2738 self.pg0.unconfig_ip4()
2739 self.pg1.unconfig_ip4()
2740 self.pg0.set_table_ip4(0)
2741 self.pg1.set_table_ip4(0)
2742 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2743 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2745 def test_vrf_feature_independent(self):
2746 """ NAT44 tenant VRF independent address pool mode """
2748 nat_ip1 = "10.0.0.10"
2749 nat_ip2 = "10.0.0.11"
2751 self.nat44_add_address(nat_ip1)
2752 self.nat44_add_address(nat_ip2, vrf_id=99)
2753 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2754 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2755 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2759 pkts = self.create_stream_in(self.pg0, self.pg2)
2760 self.pg0.add_stream(pkts)
2761 self.pg_enable_capture(self.pg_interfaces)
2763 capture = self.pg2.get_capture(len(pkts))
2764 self.verify_capture_out(capture, nat_ip1)
2767 pkts = self.create_stream_in(self.pg1, self.pg2)
2768 self.pg1.add_stream(pkts)
2769 self.pg_enable_capture(self.pg_interfaces)
2771 capture = self.pg2.get_capture(len(pkts))
2772 self.verify_capture_out(capture, nat_ip1)
2774 def test_dynamic_ipless_interfaces(self):
2775 """ NAT44 interfaces without configured IP address """
2777 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2778 mactobinary(self.pg7.remote_mac),
2779 self.pg7.remote_ip4n,
2781 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2782 mactobinary(self.pg8.remote_mac),
2783 self.pg8.remote_ip4n,
2786 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2787 dst_address_length=32,
2788 next_hop_address=self.pg7.remote_ip4n,
2789 next_hop_sw_if_index=self.pg7.sw_if_index)
2790 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2791 dst_address_length=32,
2792 next_hop_address=self.pg8.remote_ip4n,
2793 next_hop_sw_if_index=self.pg8.sw_if_index)
2795 self.nat44_add_address(self.nat_addr)
2796 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2797 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2801 pkts = self.create_stream_in(self.pg7, self.pg8)
2802 self.pg7.add_stream(pkts)
2803 self.pg_enable_capture(self.pg_interfaces)
2805 capture = self.pg8.get_capture(len(pkts))
2806 self.verify_capture_out(capture)
2809 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2810 self.pg8.add_stream(pkts)
2811 self.pg_enable_capture(self.pg_interfaces)
2813 capture = self.pg7.get_capture(len(pkts))
2814 self.verify_capture_in(capture, self.pg7)
2816 def test_static_ipless_interfaces(self):
2817 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2819 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2820 mactobinary(self.pg7.remote_mac),
2821 self.pg7.remote_ip4n,
2823 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2824 mactobinary(self.pg8.remote_mac),
2825 self.pg8.remote_ip4n,
2828 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2829 dst_address_length=32,
2830 next_hop_address=self.pg7.remote_ip4n,
2831 next_hop_sw_if_index=self.pg7.sw_if_index)
2832 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2833 dst_address_length=32,
2834 next_hop_address=self.pg8.remote_ip4n,
2835 next_hop_sw_if_index=self.pg8.sw_if_index)
2837 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2838 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2839 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2843 pkts = self.create_stream_out(self.pg8)
2844 self.pg8.add_stream(pkts)
2845 self.pg_enable_capture(self.pg_interfaces)
2847 capture = self.pg7.get_capture(len(pkts))
2848 self.verify_capture_in(capture, self.pg7)
2851 pkts = self.create_stream_in(self.pg7, self.pg8)
2852 self.pg7.add_stream(pkts)
2853 self.pg_enable_capture(self.pg_interfaces)
2855 capture = self.pg8.get_capture(len(pkts))
2856 self.verify_capture_out(capture, self.nat_addr, True)
2858 def test_static_with_port_ipless_interfaces(self):
2859 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2861 self.tcp_port_out = 30606
2862 self.udp_port_out = 30607
2863 self.icmp_id_out = 30608
2865 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2866 mactobinary(self.pg7.remote_mac),
2867 self.pg7.remote_ip4n,
2869 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2870 mactobinary(self.pg8.remote_mac),
2871 self.pg8.remote_ip4n,
2874 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2875 dst_address_length=32,
2876 next_hop_address=self.pg7.remote_ip4n,
2877 next_hop_sw_if_index=self.pg7.sw_if_index)
2878 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2879 dst_address_length=32,
2880 next_hop_address=self.pg8.remote_ip4n,
2881 next_hop_sw_if_index=self.pg8.sw_if_index)
2883 self.nat44_add_address(self.nat_addr)
2884 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2885 self.tcp_port_in, self.tcp_port_out,
2886 proto=IP_PROTOS.tcp)
2887 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2888 self.udp_port_in, self.udp_port_out,
2889 proto=IP_PROTOS.udp)
2890 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2891 self.icmp_id_in, self.icmp_id_out,
2892 proto=IP_PROTOS.icmp)
2893 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2894 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2898 pkts = self.create_stream_out(self.pg8)
2899 self.pg8.add_stream(pkts)
2900 self.pg_enable_capture(self.pg_interfaces)
2902 capture = self.pg7.get_capture(len(pkts))
2903 self.verify_capture_in(capture, self.pg7)
2906 pkts = self.create_stream_in(self.pg7, self.pg8)
2907 self.pg7.add_stream(pkts)
2908 self.pg_enable_capture(self.pg_interfaces)
2910 capture = self.pg8.get_capture(len(pkts))
2911 self.verify_capture_out(capture)
2913 def test_static_unknown_proto(self):
2914 """ 1:1 NAT translate packet with unknown protocol """
2915 nat_ip = "10.0.0.10"
2916 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2917 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2918 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2922 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2923 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2925 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2926 TCP(sport=1234, dport=1234))
2927 self.pg0.add_stream(p)
2928 self.pg_enable_capture(self.pg_interfaces)
2930 p = self.pg1.get_capture(1)
2933 self.assertEqual(packet[IP].src, nat_ip)
2934 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2935 self.assertTrue(packet.haslayer(GRE))
2936 self.assert_packet_checksums_valid(packet)
2938 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2942 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2943 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2945 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2946 TCP(sport=1234, dport=1234))
2947 self.pg1.add_stream(p)
2948 self.pg_enable_capture(self.pg_interfaces)
2950 p = self.pg0.get_capture(1)
2953 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2954 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2955 self.assertTrue(packet.haslayer(GRE))
2956 self.assert_packet_checksums_valid(packet)
2958 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2961 def test_hairpinning_static_unknown_proto(self):
2962 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2964 host = self.pg0.remote_hosts[0]
2965 server = self.pg0.remote_hosts[1]
2967 host_nat_ip = "10.0.0.10"
2968 server_nat_ip = "10.0.0.11"
2970 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2971 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2972 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2973 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2977 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2978 IP(src=host.ip4, dst=server_nat_ip) /
2980 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2981 TCP(sport=1234, dport=1234))
2982 self.pg0.add_stream(p)
2983 self.pg_enable_capture(self.pg_interfaces)
2985 p = self.pg0.get_capture(1)
2988 self.assertEqual(packet[IP].src, host_nat_ip)
2989 self.assertEqual(packet[IP].dst, server.ip4)
2990 self.assertTrue(packet.haslayer(GRE))
2991 self.assert_packet_checksums_valid(packet)
2993 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2997 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2998 IP(src=server.ip4, dst=host_nat_ip) /
3000 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3001 TCP(sport=1234, dport=1234))
3002 self.pg0.add_stream(p)
3003 self.pg_enable_capture(self.pg_interfaces)
3005 p = self.pg0.get_capture(1)
3008 self.assertEqual(packet[IP].src, server_nat_ip)
3009 self.assertEqual(packet[IP].dst, host.ip4)
3010 self.assertTrue(packet.haslayer(GRE))
3011 self.assert_packet_checksums_valid(packet)
3013 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3016 def test_unknown_proto(self):
3017 """ NAT44 translate packet with unknown protocol """
3018 self.nat44_add_address(self.nat_addr)
3019 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3020 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3024 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3025 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3026 TCP(sport=self.tcp_port_in, dport=20))
3027 self.pg0.add_stream(p)
3028 self.pg_enable_capture(self.pg_interfaces)
3030 p = self.pg1.get_capture(1)
3032 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3033 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3035 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3036 TCP(sport=1234, dport=1234))
3037 self.pg0.add_stream(p)
3038 self.pg_enable_capture(self.pg_interfaces)
3040 p = self.pg1.get_capture(1)
3043 self.assertEqual(packet[IP].src, self.nat_addr)
3044 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3045 self.assertTrue(packet.haslayer(GRE))
3046 self.assert_packet_checksums_valid(packet)
3048 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3052 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3053 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3055 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3056 TCP(sport=1234, dport=1234))
3057 self.pg1.add_stream(p)
3058 self.pg_enable_capture(self.pg_interfaces)
3060 p = self.pg0.get_capture(1)
3063 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3064 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3065 self.assertTrue(packet.haslayer(GRE))
3066 self.assert_packet_checksums_valid(packet)
3068 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3071 def test_hairpinning_unknown_proto(self):
3072 """ NAT44 translate packet with unknown protocol - hairpinning """
3073 host = self.pg0.remote_hosts[0]
3074 server = self.pg0.remote_hosts[1]
3076 server_out_port = 8765
3077 server_nat_ip = "10.0.0.11"
3079 self.nat44_add_address(self.nat_addr)
3080 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3081 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3084 # add static mapping for server
3085 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3088 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3089 IP(src=host.ip4, dst=server_nat_ip) /
3090 TCP(sport=host_in_port, dport=server_out_port))
3091 self.pg0.add_stream(p)
3092 self.pg_enable_capture(self.pg_interfaces)
3094 capture = self.pg0.get_capture(1)
3096 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3097 IP(src=host.ip4, dst=server_nat_ip) /
3099 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3100 TCP(sport=1234, dport=1234))
3101 self.pg0.add_stream(p)
3102 self.pg_enable_capture(self.pg_interfaces)
3104 p = self.pg0.get_capture(1)
3107 self.assertEqual(packet[IP].src, self.nat_addr)
3108 self.assertEqual(packet[IP].dst, server.ip4)
3109 self.assertTrue(packet.haslayer(GRE))
3110 self.assert_packet_checksums_valid(packet)
3112 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3116 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3117 IP(src=server.ip4, dst=self.nat_addr) /
3119 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3120 TCP(sport=1234, dport=1234))
3121 self.pg0.add_stream(p)
3122 self.pg_enable_capture(self.pg_interfaces)
3124 p = self.pg0.get_capture(1)
3127 self.assertEqual(packet[IP].src, server_nat_ip)
3128 self.assertEqual(packet[IP].dst, host.ip4)
3129 self.assertTrue(packet.haslayer(GRE))
3130 self.assert_packet_checksums_valid(packet)
3132 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3135 def test_output_feature(self):
3136 """ NAT44 interface output feature (in2out postrouting) """
3137 self.nat44_add_address(self.nat_addr)
3138 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3139 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
3140 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3144 pkts = self.create_stream_in(self.pg0, self.pg3)
3145 self.pg0.add_stream(pkts)
3146 self.pg_enable_capture(self.pg_interfaces)
3148 capture = self.pg3.get_capture(len(pkts))
3149 self.verify_capture_out(capture)
3152 pkts = self.create_stream_out(self.pg3)
3153 self.pg3.add_stream(pkts)
3154 self.pg_enable_capture(self.pg_interfaces)
3156 capture = self.pg0.get_capture(len(pkts))
3157 self.verify_capture_in(capture, self.pg0)
3159 # from non-NAT interface to NAT inside interface
3160 pkts = self.create_stream_in(self.pg2, self.pg0)
3161 self.pg2.add_stream(pkts)
3162 self.pg_enable_capture(self.pg_interfaces)
3164 capture = self.pg0.get_capture(len(pkts))
3165 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3167 def test_output_feature_vrf_aware(self):
3168 """ NAT44 interface output feature VRF aware (in2out postrouting) """
3169 nat_ip_vrf10 = "10.0.0.10"
3170 nat_ip_vrf20 = "10.0.0.20"
3172 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3173 dst_address_length=32,
3174 next_hop_address=self.pg3.remote_ip4n,
3175 next_hop_sw_if_index=self.pg3.sw_if_index,
3177 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3178 dst_address_length=32,
3179 next_hop_address=self.pg3.remote_ip4n,
3180 next_hop_sw_if_index=self.pg3.sw_if_index,
3183 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3184 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3185 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
3186 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
3187 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3191 pkts = self.create_stream_in(self.pg4, self.pg3)
3192 self.pg4.add_stream(pkts)
3193 self.pg_enable_capture(self.pg_interfaces)
3195 capture = self.pg3.get_capture(len(pkts))
3196 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3199 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3200 self.pg3.add_stream(pkts)
3201 self.pg_enable_capture(self.pg_interfaces)
3203 capture = self.pg4.get_capture(len(pkts))
3204 self.verify_capture_in(capture, self.pg4)
3207 pkts = self.create_stream_in(self.pg6, self.pg3)
3208 self.pg6.add_stream(pkts)
3209 self.pg_enable_capture(self.pg_interfaces)
3211 capture = self.pg3.get_capture(len(pkts))
3212 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3215 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3216 self.pg3.add_stream(pkts)
3217 self.pg_enable_capture(self.pg_interfaces)
3219 capture = self.pg6.get_capture(len(pkts))
3220 self.verify_capture_in(capture, self.pg6)
3222 def test_output_feature_hairpinning(self):
3223 """ NAT44 interface output feature hairpinning (in2out postrouting) """
3224 host = self.pg0.remote_hosts[0]
3225 server = self.pg0.remote_hosts[1]
3228 server_in_port = 5678
3229 server_out_port = 8765
3231 self.nat44_add_address(self.nat_addr)
3232 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3233 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3236 # add static mapping for server
3237 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3238 server_in_port, server_out_port,
3239 proto=IP_PROTOS.tcp)
3241 # send packet from host to server
3242 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3243 IP(src=host.ip4, dst=self.nat_addr) /
3244 TCP(sport=host_in_port, dport=server_out_port))
3245 self.pg0.add_stream(p)
3246 self.pg_enable_capture(self.pg_interfaces)
3248 capture = self.pg0.get_capture(1)
3253 self.assertEqual(ip.src, self.nat_addr)
3254 self.assertEqual(ip.dst, server.ip4)
3255 self.assertNotEqual(tcp.sport, host_in_port)
3256 self.assertEqual(tcp.dport, server_in_port)
3257 self.assert_packet_checksums_valid(p)
3258 host_out_port = tcp.sport
3260 self.logger.error(ppp("Unexpected or invalid packet:", p))
3263 # send reply from server to host
3264 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3265 IP(src=server.ip4, dst=self.nat_addr) /
3266 TCP(sport=server_in_port, dport=host_out_port))
3267 self.pg0.add_stream(p)
3268 self.pg_enable_capture(self.pg_interfaces)
3270 capture = self.pg0.get_capture(1)
3275 self.assertEqual(ip.src, self.nat_addr)
3276 self.assertEqual(ip.dst, host.ip4)
3277 self.assertEqual(tcp.sport, server_out_port)
3278 self.assertEqual(tcp.dport, host_in_port)
3279 self.assert_packet_checksums_valid(p)
3281 self.logger.error(ppp("Unexpected or invalid packet:", p))
3284 def test_output_feature_and_service(self):
3285 """ NAT44 interface output feature and services """
3286 external_addr = '1.2.3.4'
3290 self.vapi.nat44_forwarding_enable_disable(1)
3291 self.nat44_add_address(self.nat_addr)
3292 self.vapi.nat44_add_del_identity_mapping(ip=self.pg1.remote_ip4n)
3293 self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
3294 local_port, external_port,
3295 proto=IP_PROTOS.tcp, out2in_only=1)
3296 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3297 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3299 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3302 # from client to service
3303 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3304 IP(src=self.pg1.remote_ip4, dst=external_addr) /
3305 TCP(sport=12345, dport=external_port))
3306 self.pg1.add_stream(p)
3307 self.pg_enable_capture(self.pg_interfaces)
3309 capture = self.pg0.get_capture(1)
3314 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3315 self.assertEqual(tcp.dport, local_port)
3316 self.assert_packet_checksums_valid(p)
3318 self.logger.error(ppp("Unexpected or invalid packet:", p))
3321 # from service back to client
3322 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3323 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3324 TCP(sport=local_port, dport=12345))
3325 self.pg0.add_stream(p)
3326 self.pg_enable_capture(self.pg_interfaces)
3328 capture = self.pg1.get_capture(1)
3333 self.assertEqual(ip.src, external_addr)
3334 self.assertEqual(tcp.sport, external_port)
3335 self.assert_packet_checksums_valid(p)
3337 self.logger.error(ppp("Unexpected or invalid packet:", p))
3340 # from local network host to external network
3341 pkts = self.create_stream_in(self.pg0, self.pg1)
3342 self.pg0.add_stream(pkts)
3343 self.pg_enable_capture(self.pg_interfaces)
3345 capture = self.pg1.get_capture(len(pkts))
3346 self.verify_capture_out(capture)
3347 pkts = self.create_stream_in(self.pg0, self.pg1)
3348 self.pg0.add_stream(pkts)
3349 self.pg_enable_capture(self.pg_interfaces)
3351 capture = self.pg1.get_capture(len(pkts))
3352 self.verify_capture_out(capture)
3354 # from external network back to local network host
3355 pkts = self.create_stream_out(self.pg1)
3356 self.pg1.add_stream(pkts)
3357 self.pg_enable_capture(self.pg_interfaces)
3359 capture = self.pg0.get_capture(len(pkts))
3360 self.verify_capture_in(capture, self.pg0)
3362 def test_output_feature_and_service2(self):
3363 """ NAT44 interface output feature and service host direct access """
3364 self.vapi.nat44_forwarding_enable_disable(1)
3365 self.nat44_add_address(self.nat_addr)
3366 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3369 # session initiaded from service host - translate
3370 pkts = self.create_stream_in(self.pg0, self.pg1)
3371 self.pg0.add_stream(pkts)
3372 self.pg_enable_capture(self.pg_interfaces)
3374 capture = self.pg1.get_capture(len(pkts))
3375 self.verify_capture_out(capture)
3377 pkts = self.create_stream_out(self.pg1)
3378 self.pg1.add_stream(pkts)
3379 self.pg_enable_capture(self.pg_interfaces)
3381 capture = self.pg0.get_capture(len(pkts))
3382 self.verify_capture_in(capture, self.pg0)
3384 # session initiaded from remote host - do not translate
3385 pkts = self.create_stream_out(self.pg1,
3386 self.pg0.remote_ip4,
3387 use_inside_ports=True)
3388 self.pg1.add_stream(pkts)
3389 self.pg_enable_capture(self.pg_interfaces)
3391 capture = self.pg0.get_capture(len(pkts))
3392 self.verify_capture_in(capture, self.pg0)
3394 pkts = self.create_stream_in(self.pg0, self.pg1)
3395 self.pg0.add_stream(pkts)
3396 self.pg_enable_capture(self.pg_interfaces)
3398 capture = self.pg1.get_capture(len(pkts))
3399 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3402 def test_output_feature_and_service3(self):
3403 """ NAT44 interface output feature and DST NAT """
3404 external_addr = '1.2.3.4'
3408 self.vapi.nat44_forwarding_enable_disable(1)
3409 self.nat44_add_address(self.nat_addr)
3410 self.nat44_add_static_mapping(self.pg1.remote_ip4, external_addr,
3411 local_port, external_port,
3412 proto=IP_PROTOS.tcp, out2in_only=1)
3413 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3414 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3416 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3419 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3420 IP(src=self.pg0.remote_ip4, dst=external_addr) /
3421 TCP(sport=12345, dport=external_port))
3422 self.pg0.add_stream(p)
3423 self.pg_enable_capture(self.pg_interfaces)
3425 capture = self.pg1.get_capture(1)
3430 self.assertEqual(ip.src, self.pg0.remote_ip4)
3431 self.assertEqual(tcp.sport, 12345)
3432 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3433 self.assertEqual(tcp.dport, local_port)
3434 self.assert_packet_checksums_valid(p)
3436 self.logger.error(ppp("Unexpected or invalid packet:", p))
3439 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3440 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
3441 TCP(sport=local_port, dport=12345))
3442 self.pg1.add_stream(p)
3443 self.pg_enable_capture(self.pg_interfaces)
3445 capture = self.pg0.get_capture(1)
3450 self.assertEqual(ip.src, external_addr)
3451 self.assertEqual(tcp.sport, external_port)
3452 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3453 self.assertEqual(tcp.dport, 12345)
3454 self.assert_packet_checksums_valid(p)
3456 self.logger.error(ppp("Unexpected or invalid packet:", p))
3459 def test_one_armed_nat44(self):
3460 """ One armed NAT44 """
3461 remote_host = self.pg9.remote_hosts[0]
3462 local_host = self.pg9.remote_hosts[1]
3465 self.nat44_add_address(self.nat_addr)
3466 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3467 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3471 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3472 IP(src=local_host.ip4, dst=remote_host.ip4) /
3473 TCP(sport=12345, dport=80))
3474 self.pg9.add_stream(p)
3475 self.pg_enable_capture(self.pg_interfaces)
3477 capture = self.pg9.get_capture(1)
3482 self.assertEqual(ip.src, self.nat_addr)
3483 self.assertEqual(ip.dst, remote_host.ip4)
3484 self.assertNotEqual(tcp.sport, 12345)
3485 external_port = tcp.sport
3486 self.assertEqual(tcp.dport, 80)
3487 self.assert_packet_checksums_valid(p)
3489 self.logger.error(ppp("Unexpected or invalid packet:", p))
3493 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3494 IP(src=remote_host.ip4, dst=self.nat_addr) /
3495 TCP(sport=80, dport=external_port))
3496 self.pg9.add_stream(p)
3497 self.pg_enable_capture(self.pg_interfaces)
3499 capture = self.pg9.get_capture(1)
3504 self.assertEqual(ip.src, remote_host.ip4)
3505 self.assertEqual(ip.dst, local_host.ip4)
3506 self.assertEqual(tcp.sport, 80)
3507 self.assertEqual(tcp.dport, 12345)
3508 self.assert_packet_checksums_valid(p)
3510 self.logger.error(ppp("Unexpected or invalid packet:", p))
3513 def test_one_armed_nat44_static(self):
3514 """ One armed NAT44 and 1:1 NAPT symmetrical rule """
3515 remote_host = self.pg9.remote_hosts[0]
3516 local_host = self.pg9.remote_hosts[1]
3521 self.vapi.nat44_forwarding_enable_disable(1)
3522 self.nat44_add_address(self.nat_addr, twice_nat=1)
3523 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
3524 local_port, external_port,
3525 proto=IP_PROTOS.tcp, out2in_only=1,
3527 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3528 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3531 # from client to service
3532 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3533 IP(src=remote_host.ip4, dst=self.nat_addr) /
3534 TCP(sport=12345, dport=external_port))
3535 self.pg9.add_stream(p)
3536 self.pg_enable_capture(self.pg_interfaces)
3538 capture = self.pg9.get_capture(1)
3543 self.assertEqual(ip.dst, local_host.ip4)
3544 self.assertEqual(ip.src, self.nat_addr)
3545 self.assertEqual(tcp.dport, local_port)
3546 self.assertNotEqual(tcp.sport, 12345)
3547 eh_port_in = tcp.sport
3548 self.assert_packet_checksums_valid(p)
3550 self.logger.error(ppp("Unexpected or invalid packet:", p))
3553 # from service back to client
3554 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3555 IP(src=local_host.ip4, dst=self.nat_addr) /
3556 TCP(sport=local_port, dport=eh_port_in))
3557 self.pg9.add_stream(p)
3558 self.pg_enable_capture(self.pg_interfaces)
3560 capture = self.pg9.get_capture(1)
3565 self.assertEqual(ip.src, self.nat_addr)
3566 self.assertEqual(ip.dst, remote_host.ip4)
3567 self.assertEqual(tcp.sport, external_port)
3568 self.assertEqual(tcp.dport, 12345)
3569 self.assert_packet_checksums_valid(p)
3571 self.logger.error(ppp("Unexpected or invalid packet:", p))
3574 def test_del_session(self):
3575 """ Delete NAT44 session """
3576 self.nat44_add_address(self.nat_addr)
3577 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3578 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3581 pkts = self.create_stream_in(self.pg0, self.pg1)
3582 self.pg0.add_stream(pkts)
3583 self.pg_enable_capture(self.pg_interfaces)
3585 capture = self.pg1.get_capture(len(pkts))
3587 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3588 nsessions = len(sessions)
3590 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
3591 sessions[0].inside_port,
3592 sessions[0].protocol)
3593 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
3594 sessions[1].outside_port,
3595 sessions[1].protocol,
3598 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3599 self.assertEqual(nsessions - len(sessions), 2)
3601 def test_set_get_reass(self):
3602 """ NAT44 set/get virtual fragmentation reassembly """
3603 reas_cfg1 = self.vapi.nat_get_reass()
3605 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
3606 max_reass=reas_cfg1.ip4_max_reass * 2,
3607 max_frag=reas_cfg1.ip4_max_frag * 2)
3609 reas_cfg2 = self.vapi.nat_get_reass()
3611 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
3612 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
3613 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
3615 self.vapi.nat_set_reass(drop_frag=1)
3616 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
3618 def test_frag_in_order(self):
3619 """ NAT44 translate fragments arriving in order """
3620 self.nat44_add_address(self.nat_addr)
3621 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3622 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3625 data = "A" * 4 + "B" * 16 + "C" * 3
3626 self.tcp_port_in = random.randint(1025, 65535)
3628 reass = self.vapi.nat_reass_dump()
3629 reass_n_start = len(reass)
3632 pkts = self.create_stream_frag(self.pg0,
3633 self.pg1.remote_ip4,
3637 self.pg0.add_stream(pkts)
3638 self.pg_enable_capture(self.pg_interfaces)
3640 frags = self.pg1.get_capture(len(pkts))
3641 p = self.reass_frags_and_verify(frags,
3643 self.pg1.remote_ip4)
3644 self.assertEqual(p[TCP].dport, 20)
3645 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3646 self.tcp_port_out = p[TCP].sport
3647 self.assertEqual(data, p[Raw].load)
3650 pkts = self.create_stream_frag(self.pg1,
3655 self.pg1.add_stream(pkts)
3656 self.pg_enable_capture(self.pg_interfaces)
3658 frags = self.pg0.get_capture(len(pkts))
3659 p = self.reass_frags_and_verify(frags,
3660 self.pg1.remote_ip4,
3661 self.pg0.remote_ip4)
3662 self.assertEqual(p[TCP].sport, 20)
3663 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3664 self.assertEqual(data, p[Raw].load)
3666 reass = self.vapi.nat_reass_dump()
3667 reass_n_end = len(reass)
3669 self.assertEqual(reass_n_end - reass_n_start, 2)
3671 def test_reass_hairpinning(self):
3672 """ NAT44 fragments hairpinning """
3673 server = self.pg0.remote_hosts[1]
3674 host_in_port = random.randint(1025, 65535)
3675 server_in_port = random.randint(1025, 65535)
3676 server_out_port = random.randint(1025, 65535)
3677 data = "A" * 4 + "B" * 16 + "C" * 3
3679 self.nat44_add_address(self.nat_addr)
3680 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3681 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3683 # add static mapping for server
3684 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3685 server_in_port, server_out_port,
3686 proto=IP_PROTOS.tcp)
3688 # send packet from host to server
3689 pkts = self.create_stream_frag(self.pg0,
3694 self.pg0.add_stream(pkts)
3695 self.pg_enable_capture(self.pg_interfaces)
3697 frags = self.pg0.get_capture(len(pkts))
3698 p = self.reass_frags_and_verify(frags,
3701 self.assertNotEqual(p[TCP].sport, host_in_port)
3702 self.assertEqual(p[TCP].dport, server_in_port)
3703 self.assertEqual(data, p[Raw].load)
3705 def test_frag_out_of_order(self):
3706 """ NAT44 translate fragments arriving out of order """
3707 self.nat44_add_address(self.nat_addr)
3708 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3709 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3712 data = "A" * 4 + "B" * 16 + "C" * 3
3713 random.randint(1025, 65535)
3716 pkts = self.create_stream_frag(self.pg0,
3717 self.pg1.remote_ip4,
3722 self.pg0.add_stream(pkts)
3723 self.pg_enable_capture(self.pg_interfaces)
3725 frags = self.pg1.get_capture(len(pkts))
3726 p = self.reass_frags_and_verify(frags,
3728 self.pg1.remote_ip4)
3729 self.assertEqual(p[TCP].dport, 20)
3730 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3731 self.tcp_port_out = p[TCP].sport
3732 self.assertEqual(data, p[Raw].load)
3735 pkts = self.create_stream_frag(self.pg1,
3741 self.pg1.add_stream(pkts)
3742 self.pg_enable_capture(self.pg_interfaces)
3744 frags = self.pg0.get_capture(len(pkts))
3745 p = self.reass_frags_and_verify(frags,
3746 self.pg1.remote_ip4,
3747 self.pg0.remote_ip4)
3748 self.assertEqual(p[TCP].sport, 20)
3749 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3750 self.assertEqual(data, p[Raw].load)
3752 def test_port_restricted(self):
3753 """ Port restricted NAT44 (MAP-E CE) """
3754 self.nat44_add_address(self.nat_addr)
3755 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3756 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3758 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3759 "psid-offset 6 psid-len 6")
3761 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3762 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3763 TCP(sport=4567, dport=22))
3764 self.pg0.add_stream(p)
3765 self.pg_enable_capture(self.pg_interfaces)
3767 capture = self.pg1.get_capture(1)
3772 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3773 self.assertEqual(ip.src, self.nat_addr)
3774 self.assertEqual(tcp.dport, 22)
3775 self.assertNotEqual(tcp.sport, 4567)
3776 self.assertEqual((tcp.sport >> 6) & 63, 10)
3777 self.assert_packet_checksums_valid(p)
3779 self.logger.error(ppp("Unexpected or invalid packet:", p))
3782 def twice_nat_common(self, self_twice_nat=False, same_pg=False, lb=False,
3784 twice_nat_addr = '10.0.1.3'
3792 port_in1 = port_in+1
3793 port_in2 = port_in+2
3798 server1 = self.pg0.remote_hosts[0]
3799 server2 = self.pg0.remote_hosts[1]
3811 eh_translate = ((not self_twice_nat) or (not lb and same_pg) or
3814 self.nat44_add_address(self.nat_addr)
3815 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3817 self.nat44_add_static_mapping(pg0.remote_ip4, self.nat_addr,
3819 proto=IP_PROTOS.tcp,
3820 twice_nat=int(not self_twice_nat),
3821 self_twice_nat=int(self_twice_nat))
3823 locals = [{'addr': server1.ip4n,
3826 {'addr': server2.ip4n,
3829 out_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3830 self.vapi.nat44_add_del_lb_static_mapping(out_addr_n,
3834 not self_twice_nat),
3837 local_num=len(locals),
3839 self.vapi.nat44_interface_add_del_feature(pg0.sw_if_index)
3840 self.vapi.nat44_interface_add_del_feature(pg1.sw_if_index,
3847 assert client_id is not None
3849 client = self.pg0.remote_hosts[0]
3850 elif client_id == 2:
3851 client = self.pg0.remote_hosts[1]
3853 client = pg1.remote_hosts[0]
3854 p = (Ether(src=pg1.remote_mac, dst=pg1.local_mac) /
3855 IP(src=client.ip4, dst=self.nat_addr) /
3856 TCP(sport=eh_port_out, dport=port_out))
3858 self.pg_enable_capture(self.pg_interfaces)
3860 capture = pg0.get_capture(1)
3866 if ip.dst == server1.ip4:
3872 self.assertEqual(ip.dst, server.ip4)
3874 self.assertIn(tcp.dport, [port_in1, port_in2])
3876 self.assertEqual(tcp.dport, port_in)
3878 self.assertEqual(ip.src, twice_nat_addr)
3879 self.assertNotEqual(tcp.sport, eh_port_out)
3881 self.assertEqual(ip.src, client.ip4)
3882 self.assertEqual(tcp.sport, eh_port_out)
3884 eh_port_in = tcp.sport
3885 saved_port_in = tcp.dport
3886 self.assert_packet_checksums_valid(p)
3888 self.logger.error(ppp("Unexpected or invalid packet:", p))
3891 p = (Ether(src=server.mac, dst=pg0.local_mac) /
3892 IP(src=server.ip4, dst=eh_addr_in) /
3893 TCP(sport=saved_port_in, dport=eh_port_in))
3895 self.pg_enable_capture(self.pg_interfaces)
3897 capture = pg1.get_capture(1)
3902 self.assertEqual(ip.dst, client.ip4)
3903 self.assertEqual(ip.src, self.nat_addr)
3904 self.assertEqual(tcp.dport, eh_port_out)
3905 self.assertEqual(tcp.sport, port_out)
3906 self.assert_packet_checksums_valid(p)
3908 self.logger.error(ppp("Unexpected or invalid packet:", p))
3911 def test_twice_nat(self):
3913 self.twice_nat_common()
3915 def test_self_twice_nat_positive(self):
3916 """ Self Twice NAT44 (positive test) """
3917 self.twice_nat_common(self_twice_nat=True, same_pg=True)
3919 def test_self_twice_nat_negative(self):
3920 """ Self Twice NAT44 (negative test) """
3921 self.twice_nat_common(self_twice_nat=True)
3923 def test_twice_nat_lb(self):
3924 """ Twice NAT44 local service load balancing """
3925 self.twice_nat_common(lb=True)
3927 def test_self_twice_nat_lb_positive(self):
3928 """ Self Twice NAT44 local service load balancing (positive test) """
3929 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
3932 def test_self_twice_nat_lb_negative(self):
3933 """ Self Twice NAT44 local service load balancing (negative test) """
3934 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
3937 def test_twice_nat_interface_addr(self):
3938 """ Acquire twice NAT44 addresses from interface """
3939 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3941 # no address in NAT pool
3942 adresses = self.vapi.nat44_address_dump()
3943 self.assertEqual(0, len(adresses))
3945 # configure interface address and check NAT address pool
3946 self.pg7.config_ip4()
3947 adresses = self.vapi.nat44_address_dump()
3948 self.assertEqual(1, len(adresses))
3949 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3950 self.assertEqual(adresses[0].twice_nat, 1)
3952 # remove interface address and check NAT address pool
3953 self.pg7.unconfig_ip4()
3954 adresses = self.vapi.nat44_address_dump()
3955 self.assertEqual(0, len(adresses))
3957 def test_ipfix_max_frags(self):
3958 """ IPFIX logging maximum fragments pending reassembly exceeded """
3959 self.nat44_add_address(self.nat_addr)
3960 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3961 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3963 self.vapi.nat_set_reass(max_frag=0)
3964 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3965 src_address=self.pg3.local_ip4n,
3967 template_interval=10)
3968 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3969 src_port=self.ipfix_src_port)
3971 data = "A" * 4 + "B" * 16 + "C" * 3
3972 self.tcp_port_in = random.randint(1025, 65535)
3973 pkts = self.create_stream_frag(self.pg0,
3974 self.pg1.remote_ip4,
3978 self.pg0.add_stream(pkts[-1])
3979 self.pg_enable_capture(self.pg_interfaces)
3981 frags = self.pg1.get_capture(0)
3982 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3983 capture = self.pg3.get_capture(9)
3984 ipfix = IPFIXDecoder()
3985 # first load template
3987 self.assertTrue(p.haslayer(IPFIX))
3988 self.assertEqual(p[IP].src, self.pg3.local_ip4)
3989 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3990 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3991 self.assertEqual(p[UDP].dport, 4739)
3992 self.assertEqual(p[IPFIX].observationDomainID,
3993 self.ipfix_domain_id)
3994 if p.haslayer(Template):
3995 ipfix.add_template(p.getlayer(Template))
3996 # verify events in data set
3998 if p.haslayer(Data):
3999 data = ipfix.decode_data_set(p.getlayer(Set))
4000 self.verify_ipfix_max_fragments_ip4(data, 0,
4001 self.pg0.remote_ip4n)
4003 def test_tcp_session_close_in(self):
4004 """ Close TCP session from inside network """
4005 self.nat44_add_address(self.nat_addr)
4006 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4007 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4010 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4011 start_sessnum = len(sessions)
4013 self.initiate_tcp_session(self.pg0, self.pg1)
4015 # close the session from inside
4017 # FIN packet in -> out
4018 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4019 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4020 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4022 self.pg0.add_stream(p)
4023 self.pg_enable_capture(self.pg_interfaces)
4025 self.pg1.get_capture(1)
4029 # ACK packet out -> in
4030 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4031 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4032 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4036 # FIN packet out -> in
4037 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4038 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4039 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4043 self.pg1.add_stream(pkts)
4044 self.pg_enable_capture(self.pg_interfaces)
4046 self.pg0.get_capture(2)
4048 # ACK packet in -> out
4049 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4050 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4051 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4053 self.pg0.add_stream(p)
4054 self.pg_enable_capture(self.pg_interfaces)
4056 self.pg1.get_capture(1)
4058 self.initiate_tcp_session(self.pg0, self.pg1)
4059 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4061 self.assertEqual(len(sessions) - start_sessnum, 1)
4063 self.logger.error("TCP session termination failed")
4066 def test_tcp_session_close_out(self):
4067 """ Close TCP session from outside network """
4068 self.nat44_add_address(self.nat_addr)
4069 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4070 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4073 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4074 start_sessnum = len(sessions)
4076 self.initiate_tcp_session(self.pg0, self.pg1)
4078 # close the session from outside
4080 # FIN packet out -> in
4081 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4082 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4083 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4085 self.pg1.add_stream(p)
4086 self.pg_enable_capture(self.pg_interfaces)
4088 self.pg0.get_capture(1)
4092 # ACK packet in -> out
4093 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4094 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4095 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4099 # ACK packet in -> out
4100 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4101 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4102 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4106 self.pg0.add_stream(pkts)
4107 self.pg_enable_capture(self.pg_interfaces)
4109 self.pg1.get_capture(2)
4111 # ACK packet out -> in
4112 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4113 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4114 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4116 self.pg1.add_stream(p)
4117 self.pg_enable_capture(self.pg_interfaces)
4119 self.pg0.get_capture(1)
4121 self.initiate_tcp_session(self.pg0, self.pg1)
4122 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4124 self.assertEqual(len(sessions) - start_sessnum, 1)
4126 self.logger.error("TCP session termination failed")
4129 def test_tcp_session_close_simultaneous(self):
4130 """ Close TCP session from inside network """
4131 self.nat44_add_address(self.nat_addr)
4132 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4133 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4136 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4137 start_sessnum = len(sessions)
4139 self.initiate_tcp_session(self.pg0, self.pg1)
4141 # close the session from inside
4143 # FIN packet in -> out
4144 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4145 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4146 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4148 self.pg0.add_stream(p)
4149 self.pg_enable_capture(self.pg_interfaces)
4151 self.pg1.get_capture(1)
4153 # FIN packet out -> in
4154 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4155 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4156 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4158 self.pg1.add_stream(p)
4159 self.pg_enable_capture(self.pg_interfaces)
4161 self.pg0.get_capture(1)
4163 # ACK packet in -> out
4164 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4165 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4166 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4168 self.pg0.add_stream(p)
4169 self.pg_enable_capture(self.pg_interfaces)
4171 self.pg1.get_capture(1)
4173 # ACK packet out -> in
4174 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4175 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4176 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4178 self.pg1.add_stream(p)
4179 self.pg_enable_capture(self.pg_interfaces)
4181 self.pg0.get_capture(1)
4183 self.initiate_tcp_session(self.pg0, self.pg1)
4184 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4186 self.assertEqual(len(sessions) - start_sessnum, 1)
4188 self.logger.error("TCP session termination failed")
4192 super(TestNAT44, self).tearDown()
4193 if not self.vpp_dead:
4194 self.logger.info(self.vapi.cli("show nat44 addresses"))
4195 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4196 self.logger.info(self.vapi.cli("show nat44 static mappings"))
4197 self.logger.info(self.vapi.cli("show nat44 interface address"))
4198 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
4199 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
4200 self.vapi.cli("nat addr-port-assignment-alg default")
4204 class TestNAT44Out2InDPO(MethodHolder):
4205 """ NAT44 Test Cases using out2in DPO """
4208 def setUpConstants(cls):
4209 super(TestNAT44Out2InDPO, cls).setUpConstants()
4210 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
4213 def setUpClass(cls):
4214 super(TestNAT44Out2InDPO, cls).setUpClass()
4217 cls.tcp_port_in = 6303
4218 cls.tcp_port_out = 6303
4219 cls.udp_port_in = 6304
4220 cls.udp_port_out = 6304
4221 cls.icmp_id_in = 6305
4222 cls.icmp_id_out = 6305
4223 cls.nat_addr = '10.0.0.3'
4224 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4225 cls.dst_ip4 = '192.168.70.1'
4227 cls.create_pg_interfaces(range(2))
4230 cls.pg0.config_ip4()
4231 cls.pg0.resolve_arp()
4234 cls.pg1.config_ip6()
4235 cls.pg1.resolve_ndp()
4237 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
4238 dst_address_length=0,
4239 next_hop_address=cls.pg1.remote_ip6n,
4240 next_hop_sw_if_index=cls.pg1.sw_if_index)
4243 super(TestNAT44Out2InDPO, cls).tearDownClass()
4246 def configure_xlat(self):
4247 self.dst_ip6_pfx = '1:2:3::'
4248 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4250 self.dst_ip6_pfx_len = 96
4251 self.src_ip6_pfx = '4:5:6::'
4252 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4254 self.src_ip6_pfx_len = 96
4255 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
4256 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
4257 '\x00\x00\x00\x00', 0, is_translation=1,
4260 def test_464xlat_ce(self):
4261 """ Test 464XLAT CE with NAT44 """
4263 self.configure_xlat()
4265 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4266 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
4268 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4269 self.dst_ip6_pfx_len)
4270 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
4271 self.src_ip6_pfx_len)
4274 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4275 self.pg0.add_stream(pkts)
4276 self.pg_enable_capture(self.pg_interfaces)
4278 capture = self.pg1.get_capture(len(pkts))
4279 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
4282 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
4284 self.pg1.add_stream(pkts)
4285 self.pg_enable_capture(self.pg_interfaces)
4287 capture = self.pg0.get_capture(len(pkts))
4288 self.verify_capture_in(capture, self.pg0)
4290 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4292 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
4293 self.nat_addr_n, is_add=0)
4295 def test_464xlat_ce_no_nat(self):
4296 """ Test 464XLAT CE without NAT44 """
4298 self.configure_xlat()
4300 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4301 self.dst_ip6_pfx_len)
4302 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
4303 self.src_ip6_pfx_len)
4305 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4306 self.pg0.add_stream(pkts)
4307 self.pg_enable_capture(self.pg_interfaces)
4309 capture = self.pg1.get_capture(len(pkts))
4310 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
4311 nat_ip=out_dst_ip6, same_port=True)
4313 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4314 self.pg1.add_stream(pkts)
4315 self.pg_enable_capture(self.pg_interfaces)
4317 capture = self.pg0.get_capture(len(pkts))
4318 self.verify_capture_in(capture, self.pg0)
4321 class TestDeterministicNAT(MethodHolder):
4322 """ Deterministic NAT Test Cases """
4325 def setUpConstants(cls):
4326 super(TestDeterministicNAT, cls).setUpConstants()
4327 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
4330 def setUpClass(cls):
4331 super(TestDeterministicNAT, cls).setUpClass()
4334 cls.tcp_port_in = 6303
4335 cls.tcp_external_port = 6303
4336 cls.udp_port_in = 6304
4337 cls.udp_external_port = 6304
4338 cls.icmp_id_in = 6305
4339 cls.nat_addr = '10.0.0.3'
4341 cls.create_pg_interfaces(range(3))
4342 cls.interfaces = list(cls.pg_interfaces)
4344 for i in cls.interfaces:
4349 cls.pg0.generate_remote_hosts(2)
4350 cls.pg0.configure_ipv4_neighbors()
4353 super(TestDeterministicNAT, cls).tearDownClass()
4356 def create_stream_in(self, in_if, out_if, ttl=64):
4358 Create packet stream for inside network
4360 :param in_if: Inside interface
4361 :param out_if: Outside interface
4362 :param ttl: TTL of generated packets
4366 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4367 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4368 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
4372 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4373 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4374 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
4378 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4379 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4380 ICMP(id=self.icmp_id_in, type='echo-request'))
4385 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
4387 Create packet stream for outside network
4389 :param out_if: Outside interface
4390 :param dst_ip: Destination IP address (Default use global NAT address)
4391 :param ttl: TTL of generated packets
4394 dst_ip = self.nat_addr
4397 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4398 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4399 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
4403 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4404 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4405 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
4409 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4410 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4411 ICMP(id=self.icmp_external_id, type='echo-reply'))
4416 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
4418 Verify captured packets on outside network
4420 :param capture: Captured packets
4421 :param nat_ip: Translated IP address (Default use global NAT address)
4422 :param same_port: Sorce port number is not translated (Default False)
4423 :param packet_num: Expected number of packets (Default 3)
4426 nat_ip = self.nat_addr
4427 self.assertEqual(packet_num, len(capture))
4428 for packet in capture:
4430 self.assertEqual(packet[IP].src, nat_ip)
4431 if packet.haslayer(TCP):
4432 self.tcp_port_out = packet[TCP].sport
4433 elif packet.haslayer(UDP):
4434 self.udp_port_out = packet[UDP].sport
4436 self.icmp_external_id = packet[ICMP].id
4438 self.logger.error(ppp("Unexpected or invalid packet "
4439 "(outside network):", packet))
4442 def verify_ipfix_max_entries_per_user(self, data):
4444 Verify IPFIX maximum entries per user exceeded event
4446 :param data: Decoded IPFIX data records
4448 self.assertEqual(1, len(data))
4451 self.assertEqual(ord(record[230]), 13)
4452 # natQuotaExceededEvent
4453 self.assertEqual('\x03\x00\x00\x00', record[466])
4455 self.assertEqual('\xe8\x03\x00\x00', record[473])
4457 self.assertEqual(self.pg0.remote_ip4n, record[8])
4459 def test_deterministic_mode(self):
4460 """ NAT plugin run deterministic mode """
4461 in_addr = '172.16.255.0'
4462 out_addr = '172.17.255.50'
4463 in_addr_t = '172.16.255.20'
4464 in_addr_n = socket.inet_aton(in_addr)
4465 out_addr_n = socket.inet_aton(out_addr)
4466 in_addr_t_n = socket.inet_aton(in_addr_t)
4470 nat_config = self.vapi.nat_show_config()
4471 self.assertEqual(1, nat_config.deterministic)
4473 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
4475 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
4476 self.assertEqual(rep1.out_addr[:4], out_addr_n)
4477 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
4478 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
4480 deterministic_mappings = self.vapi.nat_det_map_dump()
4481 self.assertEqual(len(deterministic_mappings), 1)
4482 dsm = deterministic_mappings[0]
4483 self.assertEqual(in_addr_n, dsm.in_addr[:4])
4484 self.assertEqual(in_plen, dsm.in_plen)
4485 self.assertEqual(out_addr_n, dsm.out_addr[:4])
4486 self.assertEqual(out_plen, dsm.out_plen)
4488 self.clear_nat_det()
4489 deterministic_mappings = self.vapi.nat_det_map_dump()
4490 self.assertEqual(len(deterministic_mappings), 0)
4492 def test_set_timeouts(self):
4493 """ Set deterministic NAT timeouts """
4494 timeouts_before = self.vapi.nat_det_get_timeouts()
4496 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
4497 timeouts_before.tcp_established + 10,
4498 timeouts_before.tcp_transitory + 10,
4499 timeouts_before.icmp + 10)
4501 timeouts_after = self.vapi.nat_det_get_timeouts()
4503 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
4504 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
4505 self.assertNotEqual(timeouts_before.tcp_established,
4506 timeouts_after.tcp_established)
4507 self.assertNotEqual(timeouts_before.tcp_transitory,
4508 timeouts_after.tcp_transitory)
4510 def test_det_in(self):
4511 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
4513 nat_ip = "10.0.0.10"
4515 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4517 socket.inet_aton(nat_ip),
4519 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4520 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4524 pkts = self.create_stream_in(self.pg0, self.pg1)
4525 self.pg0.add_stream(pkts)
4526 self.pg_enable_capture(self.pg_interfaces)
4528 capture = self.pg1.get_capture(len(pkts))
4529 self.verify_capture_out(capture, nat_ip)
4532 pkts = self.create_stream_out(self.pg1, nat_ip)
4533 self.pg1.add_stream(pkts)
4534 self.pg_enable_capture(self.pg_interfaces)
4536 capture = self.pg0.get_capture(len(pkts))
4537 self.verify_capture_in(capture, self.pg0)
4540 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
4541 self.assertEqual(len(sessions), 3)
4545 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4546 self.assertEqual(s.in_port, self.tcp_port_in)
4547 self.assertEqual(s.out_port, self.tcp_port_out)
4548 self.assertEqual(s.ext_port, self.tcp_external_port)
4552 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4553 self.assertEqual(s.in_port, self.udp_port_in)
4554 self.assertEqual(s.out_port, self.udp_port_out)
4555 self.assertEqual(s.ext_port, self.udp_external_port)
4559 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4560 self.assertEqual(s.in_port, self.icmp_id_in)
4561 self.assertEqual(s.out_port, self.icmp_external_id)
4563 def test_multiple_users(self):
4564 """ Deterministic NAT multiple users """
4566 nat_ip = "10.0.0.10"
4568 external_port = 6303
4570 host0 = self.pg0.remote_hosts[0]
4571 host1 = self.pg0.remote_hosts[1]
4573 self.vapi.nat_det_add_del_map(host0.ip4n,
4575 socket.inet_aton(nat_ip),
4577 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4578 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4582 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
4583 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
4584 TCP(sport=port_in, dport=external_port))
4585 self.pg0.add_stream(p)
4586 self.pg_enable_capture(self.pg_interfaces)
4588 capture = self.pg1.get_capture(1)
4593 self.assertEqual(ip.src, nat_ip)
4594 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4595 self.assertEqual(tcp.dport, external_port)
4596 port_out0 = tcp.sport
4598 self.logger.error(ppp("Unexpected or invalid packet:", p))
4602 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
4603 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
4604 TCP(sport=port_in, dport=external_port))
4605 self.pg0.add_stream(p)
4606 self.pg_enable_capture(self.pg_interfaces)
4608 capture = self.pg1.get_capture(1)
4613 self.assertEqual(ip.src, nat_ip)
4614 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4615 self.assertEqual(tcp.dport, external_port)
4616 port_out1 = tcp.sport
4618 self.logger.error(ppp("Unexpected or invalid packet:", p))
4621 dms = self.vapi.nat_det_map_dump()
4622 self.assertEqual(1, len(dms))
4623 self.assertEqual(2, dms[0].ses_num)
4626 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4627 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4628 TCP(sport=external_port, dport=port_out0))
4629 self.pg1.add_stream(p)
4630 self.pg_enable_capture(self.pg_interfaces)
4632 capture = self.pg0.get_capture(1)
4637 self.assertEqual(ip.src, self.pg1.remote_ip4)
4638 self.assertEqual(ip.dst, host0.ip4)
4639 self.assertEqual(tcp.dport, port_in)
4640 self.assertEqual(tcp.sport, external_port)
4642 self.logger.error(ppp("Unexpected or invalid packet:", p))
4646 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4647 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4648 TCP(sport=external_port, dport=port_out1))
4649 self.pg1.add_stream(p)
4650 self.pg_enable_capture(self.pg_interfaces)
4652 capture = self.pg0.get_capture(1)
4657 self.assertEqual(ip.src, self.pg1.remote_ip4)
4658 self.assertEqual(ip.dst, host1.ip4)
4659 self.assertEqual(tcp.dport, port_in)
4660 self.assertEqual(tcp.sport, external_port)
4662 self.logger.error(ppp("Unexpected or invalid packet", p))
4665 # session close api test
4666 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
4668 self.pg1.remote_ip4n,
4670 dms = self.vapi.nat_det_map_dump()
4671 self.assertEqual(dms[0].ses_num, 1)
4673 self.vapi.nat_det_close_session_in(host0.ip4n,
4675 self.pg1.remote_ip4n,
4677 dms = self.vapi.nat_det_map_dump()
4678 self.assertEqual(dms[0].ses_num, 0)
4680 def test_tcp_session_close_detection_in(self):
4681 """ Deterministic NAT TCP session close from inside network """
4682 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4684 socket.inet_aton(self.nat_addr),
4686 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4687 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4690 self.initiate_tcp_session(self.pg0, self.pg1)
4692 # close the session from inside
4694 # FIN packet in -> out
4695 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4696 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4697 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4699 self.pg0.add_stream(p)
4700 self.pg_enable_capture(self.pg_interfaces)
4702 self.pg1.get_capture(1)
4706 # ACK packet out -> in
4707 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4708 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4709 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4713 # FIN packet out -> in
4714 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4715 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4716 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4720 self.pg1.add_stream(pkts)
4721 self.pg_enable_capture(self.pg_interfaces)
4723 self.pg0.get_capture(2)
4725 # ACK packet in -> out
4726 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4727 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4728 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4730 self.pg0.add_stream(p)
4731 self.pg_enable_capture(self.pg_interfaces)
4733 self.pg1.get_capture(1)
4735 # Check if deterministic NAT44 closed the session
4736 dms = self.vapi.nat_det_map_dump()
4737 self.assertEqual(0, dms[0].ses_num)
4739 self.logger.error("TCP session termination failed")
4742 def test_tcp_session_close_detection_out(self):
4743 """ Deterministic NAT TCP session close from outside network """
4744 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4746 socket.inet_aton(self.nat_addr),
4748 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4749 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4752 self.initiate_tcp_session(self.pg0, self.pg1)
4754 # close the session from outside
4756 # FIN packet out -> in
4757 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4758 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4759 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4761 self.pg1.add_stream(p)
4762 self.pg_enable_capture(self.pg_interfaces)
4764 self.pg0.get_capture(1)
4768 # ACK packet in -> out
4769 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4770 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4771 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4775 # ACK packet in -> out
4776 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4777 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4778 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4782 self.pg0.add_stream(pkts)
4783 self.pg_enable_capture(self.pg_interfaces)
4785 self.pg1.get_capture(2)
4787 # ACK packet out -> in
4788 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4789 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4790 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4792 self.pg1.add_stream(p)
4793 self.pg_enable_capture(self.pg_interfaces)
4795 self.pg0.get_capture(1)
4797 # Check if deterministic NAT44 closed the session
4798 dms = self.vapi.nat_det_map_dump()
4799 self.assertEqual(0, dms[0].ses_num)
4801 self.logger.error("TCP session termination failed")
4804 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4805 def test_session_timeout(self):
4806 """ Deterministic NAT session timeouts """
4807 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4809 socket.inet_aton(self.nat_addr),
4811 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4812 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4815 self.initiate_tcp_session(self.pg0, self.pg1)
4816 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
4817 pkts = self.create_stream_in(self.pg0, self.pg1)
4818 self.pg0.add_stream(pkts)
4819 self.pg_enable_capture(self.pg_interfaces)
4821 capture = self.pg1.get_capture(len(pkts))
4824 dms = self.vapi.nat_det_map_dump()
4825 self.assertEqual(0, dms[0].ses_num)
4827 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4828 def test_session_limit_per_user(self):
4829 """ Deterministic NAT maximum sessions per user limit """
4830 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4832 socket.inet_aton(self.nat_addr),
4834 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4835 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4837 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
4838 src_address=self.pg2.local_ip4n,
4840 template_interval=10)
4841 self.vapi.nat_ipfix()
4844 for port in range(1025, 2025):
4845 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4846 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4847 UDP(sport=port, dport=port))
4850 self.pg0.add_stream(pkts)
4851 self.pg_enable_capture(self.pg_interfaces)
4853 capture = self.pg1.get_capture(len(pkts))
4855 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4856 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4857 UDP(sport=3001, dport=3002))
4858 self.pg0.add_stream(p)
4859 self.pg_enable_capture(self.pg_interfaces)
4861 capture = self.pg1.assert_nothing_captured()
4863 # verify ICMP error packet
4864 capture = self.pg0.get_capture(1)
4866 self.assertTrue(p.haslayer(ICMP))
4868 self.assertEqual(icmp.type, 3)
4869 self.assertEqual(icmp.code, 1)
4870 self.assertTrue(icmp.haslayer(IPerror))
4871 inner_ip = icmp[IPerror]
4872 self.assertEqual(inner_ip[UDPerror].sport, 3001)
4873 self.assertEqual(inner_ip[UDPerror].dport, 3002)
4875 dms = self.vapi.nat_det_map_dump()
4877 self.assertEqual(1000, dms[0].ses_num)
4879 # verify IPFIX logging
4880 self.vapi.cli("ipfix flush") # FIXME this should be an API call
4882 capture = self.pg2.get_capture(2)
4883 ipfix = IPFIXDecoder()
4884 # first load template
4886 self.assertTrue(p.haslayer(IPFIX))
4887 if p.haslayer(Template):
4888 ipfix.add_template(p.getlayer(Template))
4889 # verify events in data set
4891 if p.haslayer(Data):
4892 data = ipfix.decode_data_set(p.getlayer(Set))
4893 self.verify_ipfix_max_entries_per_user(data)
4895 def clear_nat_det(self):
4897 Clear deterministic NAT configuration.
4899 self.vapi.nat_ipfix(enable=0)
4900 self.vapi.nat_det_set_timeouts()
4901 deterministic_mappings = self.vapi.nat_det_map_dump()
4902 for dsm in deterministic_mappings:
4903 self.vapi.nat_det_add_del_map(dsm.in_addr,
4909 interfaces = self.vapi.nat44_interface_dump()
4910 for intf in interfaces:
4911 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
4916 super(TestDeterministicNAT, self).tearDown()
4917 if not self.vpp_dead:
4918 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4920 self.vapi.cli("show nat44 deterministic mappings"))
4922 self.vapi.cli("show nat44 deterministic timeouts"))
4924 self.vapi.cli("show nat44 deterministic sessions"))
4925 self.clear_nat_det()
4928 class TestNAT64(MethodHolder):
4929 """ NAT64 Test Cases """
4932 def setUpConstants(cls):
4933 super(TestNAT64, cls).setUpConstants()
4934 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
4935 "nat64 st hash buckets 256", "}"])
4938 def setUpClass(cls):
4939 super(TestNAT64, cls).setUpClass()
4942 cls.tcp_port_in = 6303
4943 cls.tcp_port_out = 6303
4944 cls.udp_port_in = 6304
4945 cls.udp_port_out = 6304
4946 cls.icmp_id_in = 6305
4947 cls.icmp_id_out = 6305
4948 cls.nat_addr = '10.0.0.3'
4949 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4951 cls.vrf1_nat_addr = '10.0.10.3'
4952 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
4954 cls.ipfix_src_port = 4739
4955 cls.ipfix_domain_id = 1
4957 cls.create_pg_interfaces(range(5))
4958 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
4959 cls.ip6_interfaces.append(cls.pg_interfaces[2])
4960 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
4962 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
4964 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
4966 cls.pg0.generate_remote_hosts(2)
4968 for i in cls.ip6_interfaces:
4971 i.configure_ipv6_neighbors()
4973 for i in cls.ip4_interfaces:
4979 cls.pg3.config_ip4()
4980 cls.pg3.resolve_arp()
4981 cls.pg3.config_ip6()
4982 cls.pg3.configure_ipv6_neighbors()
4985 super(TestNAT64, cls).tearDownClass()
4988 def test_pool(self):
4989 """ Add/delete address to NAT64 pool """
4990 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
4992 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
4994 addresses = self.vapi.nat64_pool_addr_dump()
4995 self.assertEqual(len(addresses), 1)
4996 self.assertEqual(addresses[0].address, nat_addr)
4998 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
5000 addresses = self.vapi.nat64_pool_addr_dump()
5001 self.assertEqual(len(addresses), 0)
5003 def test_interface(self):
5004 """ Enable/disable NAT64 feature on the interface """
5005 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5006 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5008 interfaces = self.vapi.nat64_interface_dump()
5009 self.assertEqual(len(interfaces), 2)
5012 for intf in interfaces:
5013 if intf.sw_if_index == self.pg0.sw_if_index:
5014 self.assertEqual(intf.is_inside, 1)
5016 elif intf.sw_if_index == self.pg1.sw_if_index:
5017 self.assertEqual(intf.is_inside, 0)
5019 self.assertTrue(pg0_found)
5020 self.assertTrue(pg1_found)
5022 features = self.vapi.cli("show interface features pg0")
5023 self.assertNotEqual(features.find('nat64-in2out'), -1)
5024 features = self.vapi.cli("show interface features pg1")
5025 self.assertNotEqual(features.find('nat64-out2in'), -1)
5027 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
5028 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
5030 interfaces = self.vapi.nat64_interface_dump()
5031 self.assertEqual(len(interfaces), 0)
5033 def test_static_bib(self):
5034 """ Add/delete static BIB entry """
5035 in_addr = socket.inet_pton(socket.AF_INET6,
5036 '2001:db8:85a3::8a2e:370:7334')
5037 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
5040 proto = IP_PROTOS.tcp
5042 self.vapi.nat64_add_del_static_bib(in_addr,
5047 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
5052 self.assertEqual(bibe.i_addr, in_addr)
5053 self.assertEqual(bibe.o_addr, out_addr)
5054 self.assertEqual(bibe.i_port, in_port)
5055 self.assertEqual(bibe.o_port, out_port)
5056 self.assertEqual(static_bib_num, 1)
5058 self.vapi.nat64_add_del_static_bib(in_addr,
5064 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
5069 self.assertEqual(static_bib_num, 0)
5071 def test_set_timeouts(self):
5072 """ Set NAT64 timeouts """
5073 # verify default values
5074 timeouts = self.vapi.nat64_get_timeouts()
5075 self.assertEqual(timeouts.udp, 300)
5076 self.assertEqual(timeouts.icmp, 60)
5077 self.assertEqual(timeouts.tcp_trans, 240)
5078 self.assertEqual(timeouts.tcp_est, 7440)
5079 self.assertEqual(timeouts.tcp_incoming_syn, 6)
5081 # set and verify custom values
5082 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
5083 tcp_est=7450, tcp_incoming_syn=10)
5084 timeouts = self.vapi.nat64_get_timeouts()
5085 self.assertEqual(timeouts.udp, 200)
5086 self.assertEqual(timeouts.icmp, 30)
5087 self.assertEqual(timeouts.tcp_trans, 250)
5088 self.assertEqual(timeouts.tcp_est, 7450)
5089 self.assertEqual(timeouts.tcp_incoming_syn, 10)
5091 def test_dynamic(self):
5092 """ NAT64 dynamic translation test """
5093 self.tcp_port_in = 6303
5094 self.udp_port_in = 6304
5095 self.icmp_id_in = 6305
5097 ses_num_start = self.nat64_get_ses_num()
5099 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5101 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5102 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5105 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5106 self.pg0.add_stream(pkts)
5107 self.pg_enable_capture(self.pg_interfaces)
5109 capture = self.pg1.get_capture(len(pkts))
5110 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5111 dst_ip=self.pg1.remote_ip4)
5114 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5115 self.pg1.add_stream(pkts)
5116 self.pg_enable_capture(self.pg_interfaces)
5118 capture = self.pg0.get_capture(len(pkts))
5119 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5120 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5123 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5124 self.pg0.add_stream(pkts)
5125 self.pg_enable_capture(self.pg_interfaces)
5127 capture = self.pg1.get_capture(len(pkts))
5128 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5129 dst_ip=self.pg1.remote_ip4)
5132 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5133 self.pg1.add_stream(pkts)
5134 self.pg_enable_capture(self.pg_interfaces)
5136 capture = self.pg0.get_capture(len(pkts))
5137 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5139 ses_num_end = self.nat64_get_ses_num()
5141 self.assertEqual(ses_num_end - ses_num_start, 3)
5143 # tenant with specific VRF
5144 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5145 self.vrf1_nat_addr_n,
5146 vrf_id=self.vrf1_id)
5147 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5149 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
5150 self.pg2.add_stream(pkts)
5151 self.pg_enable_capture(self.pg_interfaces)
5153 capture = self.pg1.get_capture(len(pkts))
5154 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5155 dst_ip=self.pg1.remote_ip4)
5157 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5158 self.pg1.add_stream(pkts)
5159 self.pg_enable_capture(self.pg_interfaces)
5161 capture = self.pg2.get_capture(len(pkts))
5162 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
5164 def test_static(self):
5165 """ NAT64 static translation test """
5166 self.tcp_port_in = 60303
5167 self.udp_port_in = 60304
5168 self.icmp_id_in = 60305
5169 self.tcp_port_out = 60303
5170 self.udp_port_out = 60304
5171 self.icmp_id_out = 60305
5173 ses_num_start = self.nat64_get_ses_num()
5175 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5177 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5178 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5180 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5185 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5190 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5197 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5198 self.pg0.add_stream(pkts)
5199 self.pg_enable_capture(self.pg_interfaces)
5201 capture = self.pg1.get_capture(len(pkts))
5202 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5203 dst_ip=self.pg1.remote_ip4, same_port=True)
5206 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5207 self.pg1.add_stream(pkts)
5208 self.pg_enable_capture(self.pg_interfaces)
5210 capture = self.pg0.get_capture(len(pkts))
5211 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5212 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5214 ses_num_end = self.nat64_get_ses_num()
5216 self.assertEqual(ses_num_end - ses_num_start, 3)
5218 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5219 def test_session_timeout(self):
5220 """ NAT64 session timeout """
5221 self.icmp_id_in = 1234
5222 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5224 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5225 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5226 self.vapi.nat64_set_timeouts(icmp=5)
5228 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5229 self.pg0.add_stream(pkts)
5230 self.pg_enable_capture(self.pg_interfaces)
5232 capture = self.pg1.get_capture(len(pkts))
5234 ses_num_before_timeout = self.nat64_get_ses_num()
5238 # ICMP session after timeout
5239 ses_num_after_timeout = self.nat64_get_ses_num()
5240 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
5242 def test_icmp_error(self):
5243 """ NAT64 ICMP Error message translation """
5244 self.tcp_port_in = 6303
5245 self.udp_port_in = 6304
5246 self.icmp_id_in = 6305
5248 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5250 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5251 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5253 # send some packets to create sessions
5254 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5255 self.pg0.add_stream(pkts)
5256 self.pg_enable_capture(self.pg_interfaces)
5258 capture_ip4 = self.pg1.get_capture(len(pkts))
5259 self.verify_capture_out(capture_ip4,
5260 nat_ip=self.nat_addr,
5261 dst_ip=self.pg1.remote_ip4)
5263 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5264 self.pg1.add_stream(pkts)
5265 self.pg_enable_capture(self.pg_interfaces)
5267 capture_ip6 = self.pg0.get_capture(len(pkts))
5268 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5269 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
5270 self.pg0.remote_ip6)
5273 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5274 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
5275 ICMPv6DestUnreach(code=1) /
5276 packet[IPv6] for packet in capture_ip6]
5277 self.pg0.add_stream(pkts)
5278 self.pg_enable_capture(self.pg_interfaces)
5280 capture = self.pg1.get_capture(len(pkts))
5281 for packet in capture:
5283 self.assertEqual(packet[IP].src, self.nat_addr)
5284 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5285 self.assertEqual(packet[ICMP].type, 3)
5286 self.assertEqual(packet[ICMP].code, 13)
5287 inner = packet[IPerror]
5288 self.assertEqual(inner.src, self.pg1.remote_ip4)
5289 self.assertEqual(inner.dst, self.nat_addr)
5290 self.assert_packet_checksums_valid(packet)
5291 if inner.haslayer(TCPerror):
5292 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
5293 elif inner.haslayer(UDPerror):
5294 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
5296 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
5298 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5302 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5303 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5304 ICMP(type=3, code=13) /
5305 packet[IP] for packet in capture_ip4]
5306 self.pg1.add_stream(pkts)
5307 self.pg_enable_capture(self.pg_interfaces)
5309 capture = self.pg0.get_capture(len(pkts))
5310 for packet in capture:
5312 self.assertEqual(packet[IPv6].src, ip.src)
5313 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5314 icmp = packet[ICMPv6DestUnreach]
5315 self.assertEqual(icmp.code, 1)
5316 inner = icmp[IPerror6]
5317 self.assertEqual(inner.src, self.pg0.remote_ip6)
5318 self.assertEqual(inner.dst, ip.src)
5319 self.assert_icmpv6_checksum_valid(packet)
5320 if inner.haslayer(TCPerror):
5321 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
5322 elif inner.haslayer(UDPerror):
5323 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
5325 self.assertEqual(inner[ICMPv6EchoRequest].id,
5328 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5331 def test_hairpinning(self):
5332 """ NAT64 hairpinning """
5334 client = self.pg0.remote_hosts[0]
5335 server = self.pg0.remote_hosts[1]
5336 server_tcp_in_port = 22
5337 server_tcp_out_port = 4022
5338 server_udp_in_port = 23
5339 server_udp_out_port = 4023
5340 client_tcp_in_port = 1234
5341 client_udp_in_port = 1235
5342 client_tcp_out_port = 0
5343 client_udp_out_port = 0
5344 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5345 nat_addr_ip6 = ip.src
5347 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5349 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5350 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5352 self.vapi.nat64_add_del_static_bib(server.ip6n,
5355 server_tcp_out_port,
5357 self.vapi.nat64_add_del_static_bib(server.ip6n,
5360 server_udp_out_port,
5365 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5366 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5367 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5369 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5370 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5371 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
5373 self.pg0.add_stream(pkts)
5374 self.pg_enable_capture(self.pg_interfaces)
5376 capture = self.pg0.get_capture(len(pkts))
5377 for packet in capture:
5379 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5380 self.assertEqual(packet[IPv6].dst, server.ip6)
5381 self.assert_packet_checksums_valid(packet)
5382 if packet.haslayer(TCP):
5383 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
5384 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
5385 client_tcp_out_port = packet[TCP].sport
5387 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
5388 self.assertEqual(packet[UDP].dport, server_udp_in_port)
5389 client_udp_out_port = packet[UDP].sport
5391 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5396 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5397 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5398 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
5400 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5401 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5402 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
5404 self.pg0.add_stream(pkts)
5405 self.pg_enable_capture(self.pg_interfaces)
5407 capture = self.pg0.get_capture(len(pkts))
5408 for packet in capture:
5410 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5411 self.assertEqual(packet[IPv6].dst, client.ip6)
5412 self.assert_packet_checksums_valid(packet)
5413 if packet.haslayer(TCP):
5414 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
5415 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
5417 self.assertEqual(packet[UDP].sport, server_udp_out_port)
5418 self.assertEqual(packet[UDP].dport, client_udp_in_port)
5420 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5425 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5426 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5427 ICMPv6DestUnreach(code=1) /
5428 packet[IPv6] for packet in capture]
5429 self.pg0.add_stream(pkts)
5430 self.pg_enable_capture(self.pg_interfaces)
5432 capture = self.pg0.get_capture(len(pkts))
5433 for packet in capture:
5435 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5436 self.assertEqual(packet[IPv6].dst, server.ip6)
5437 icmp = packet[ICMPv6DestUnreach]
5438 self.assertEqual(icmp.code, 1)
5439 inner = icmp[IPerror6]
5440 self.assertEqual(inner.src, server.ip6)
5441 self.assertEqual(inner.dst, nat_addr_ip6)
5442 self.assert_packet_checksums_valid(packet)
5443 if inner.haslayer(TCPerror):
5444 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
5445 self.assertEqual(inner[TCPerror].dport,
5446 client_tcp_out_port)
5448 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
5449 self.assertEqual(inner[UDPerror].dport,
5450 client_udp_out_port)
5452 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5455 def test_prefix(self):
5456 """ NAT64 Network-Specific Prefix """
5458 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5460 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5461 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5462 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5463 self.vrf1_nat_addr_n,
5464 vrf_id=self.vrf1_id)
5465 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5468 global_pref64 = "2001:db8::"
5469 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
5470 global_pref64_len = 32
5471 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
5473 prefix = self.vapi.nat64_prefix_dump()
5474 self.assertEqual(len(prefix), 1)
5475 self.assertEqual(prefix[0].prefix, global_pref64_n)
5476 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
5477 self.assertEqual(prefix[0].vrf_id, 0)
5479 # Add tenant specific prefix
5480 vrf1_pref64 = "2001:db8:122:300::"
5481 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
5482 vrf1_pref64_len = 56
5483 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
5485 vrf_id=self.vrf1_id)
5486 prefix = self.vapi.nat64_prefix_dump()
5487 self.assertEqual(len(prefix), 2)
5490 pkts = self.create_stream_in_ip6(self.pg0,
5493 plen=global_pref64_len)
5494 self.pg0.add_stream(pkts)
5495 self.pg_enable_capture(self.pg_interfaces)
5497 capture = self.pg1.get_capture(len(pkts))
5498 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5499 dst_ip=self.pg1.remote_ip4)
5501 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5502 self.pg1.add_stream(pkts)
5503 self.pg_enable_capture(self.pg_interfaces)
5505 capture = self.pg0.get_capture(len(pkts))
5506 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5509 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
5511 # Tenant specific prefix
5512 pkts = self.create_stream_in_ip6(self.pg2,
5515 plen=vrf1_pref64_len)
5516 self.pg2.add_stream(pkts)
5517 self.pg_enable_capture(self.pg_interfaces)
5519 capture = self.pg1.get_capture(len(pkts))
5520 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5521 dst_ip=self.pg1.remote_ip4)
5523 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5524 self.pg1.add_stream(pkts)
5525 self.pg_enable_capture(self.pg_interfaces)
5527 capture = self.pg2.get_capture(len(pkts))
5528 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5531 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
5533 def test_unknown_proto(self):
5534 """ NAT64 translate packet with unknown protocol """
5536 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5538 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5539 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5540 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5543 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5544 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
5545 TCP(sport=self.tcp_port_in, dport=20))
5546 self.pg0.add_stream(p)
5547 self.pg_enable_capture(self.pg_interfaces)
5549 p = self.pg1.get_capture(1)
5551 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5552 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
5554 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5555 TCP(sport=1234, dport=1234))
5556 self.pg0.add_stream(p)
5557 self.pg_enable_capture(self.pg_interfaces)
5559 p = self.pg1.get_capture(1)
5562 self.assertEqual(packet[IP].src, self.nat_addr)
5563 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5564 self.assertTrue(packet.haslayer(GRE))
5565 self.assert_packet_checksums_valid(packet)
5567 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5571 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5572 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5574 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5575 TCP(sport=1234, dport=1234))
5576 self.pg1.add_stream(p)
5577 self.pg_enable_capture(self.pg_interfaces)
5579 p = self.pg0.get_capture(1)
5582 self.assertEqual(packet[IPv6].src, remote_ip6)
5583 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5584 self.assertEqual(packet[IPv6].nh, 47)
5586 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5589 def test_hairpinning_unknown_proto(self):
5590 """ NAT64 translate packet with unknown protocol - hairpinning """
5592 client = self.pg0.remote_hosts[0]
5593 server = self.pg0.remote_hosts[1]
5594 server_tcp_in_port = 22
5595 server_tcp_out_port = 4022
5596 client_tcp_in_port = 1234
5597 client_tcp_out_port = 1235
5598 server_nat_ip = "10.0.0.100"
5599 client_nat_ip = "10.0.0.110"
5600 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
5601 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
5602 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
5603 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
5605 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
5607 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5608 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5610 self.vapi.nat64_add_del_static_bib(server.ip6n,
5613 server_tcp_out_port,
5616 self.vapi.nat64_add_del_static_bib(server.ip6n,
5622 self.vapi.nat64_add_del_static_bib(client.ip6n,
5625 client_tcp_out_port,
5629 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5630 IPv6(src=client.ip6, dst=server_nat_ip6) /
5631 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5632 self.pg0.add_stream(p)
5633 self.pg_enable_capture(self.pg_interfaces)
5635 p = self.pg0.get_capture(1)
5637 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5638 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
5640 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5641 TCP(sport=1234, dport=1234))
5642 self.pg0.add_stream(p)
5643 self.pg_enable_capture(self.pg_interfaces)
5645 p = self.pg0.get_capture(1)
5648 self.assertEqual(packet[IPv6].src, client_nat_ip6)
5649 self.assertEqual(packet[IPv6].dst, server.ip6)
5650 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5652 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5656 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5657 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
5659 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5660 TCP(sport=1234, dport=1234))
5661 self.pg0.add_stream(p)
5662 self.pg_enable_capture(self.pg_interfaces)
5664 p = self.pg0.get_capture(1)
5667 self.assertEqual(packet[IPv6].src, server_nat_ip6)
5668 self.assertEqual(packet[IPv6].dst, client.ip6)
5669 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5671 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5674 def test_one_armed_nat64(self):
5675 """ One armed NAT64 """
5677 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
5681 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5683 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
5684 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
5687 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5688 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
5689 TCP(sport=12345, dport=80))
5690 self.pg3.add_stream(p)
5691 self.pg_enable_capture(self.pg_interfaces)
5693 capture = self.pg3.get_capture(1)
5698 self.assertEqual(ip.src, self.nat_addr)
5699 self.assertEqual(ip.dst, self.pg3.remote_ip4)
5700 self.assertNotEqual(tcp.sport, 12345)
5701 external_port = tcp.sport
5702 self.assertEqual(tcp.dport, 80)
5703 self.assert_packet_checksums_valid(p)
5705 self.logger.error(ppp("Unexpected or invalid packet:", p))
5709 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5710 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5711 TCP(sport=80, dport=external_port))
5712 self.pg3.add_stream(p)
5713 self.pg_enable_capture(self.pg_interfaces)
5715 capture = self.pg3.get_capture(1)
5720 self.assertEqual(ip.src, remote_host_ip6)
5721 self.assertEqual(ip.dst, self.pg3.remote_ip6)
5722 self.assertEqual(tcp.sport, 80)
5723 self.assertEqual(tcp.dport, 12345)
5724 self.assert_packet_checksums_valid(p)
5726 self.logger.error(ppp("Unexpected or invalid packet:", p))
5729 def test_frag_in_order(self):
5730 """ NAT64 translate fragments arriving in order """
5731 self.tcp_port_in = random.randint(1025, 65535)
5733 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5735 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5736 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5738 reass = self.vapi.nat_reass_dump()
5739 reass_n_start = len(reass)
5743 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5744 self.tcp_port_in, 20, data)
5745 self.pg0.add_stream(pkts)
5746 self.pg_enable_capture(self.pg_interfaces)
5748 frags = self.pg1.get_capture(len(pkts))
5749 p = self.reass_frags_and_verify(frags,
5751 self.pg1.remote_ip4)
5752 self.assertEqual(p[TCP].dport, 20)
5753 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5754 self.tcp_port_out = p[TCP].sport
5755 self.assertEqual(data, p[Raw].load)
5758 data = "A" * 4 + "b" * 16 + "C" * 3
5759 pkts = self.create_stream_frag(self.pg1,
5764 self.pg1.add_stream(pkts)
5765 self.pg_enable_capture(self.pg_interfaces)
5767 frags = self.pg0.get_capture(len(pkts))
5768 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5769 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5770 self.assertEqual(p[TCP].sport, 20)
5771 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5772 self.assertEqual(data, p[Raw].load)
5774 reass = self.vapi.nat_reass_dump()
5775 reass_n_end = len(reass)
5777 self.assertEqual(reass_n_end - reass_n_start, 2)
5779 def test_reass_hairpinning(self):
5780 """ NAT64 fragments hairpinning """
5782 server = self.pg0.remote_hosts[1]
5783 server_in_port = random.randint(1025, 65535)
5784 server_out_port = random.randint(1025, 65535)
5785 client_in_port = random.randint(1025, 65535)
5786 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5787 nat_addr_ip6 = ip.src
5789 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5791 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5792 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5794 # add static BIB entry for server
5795 self.vapi.nat64_add_del_static_bib(server.ip6n,
5801 # send packet from host to server
5802 pkts = self.create_stream_frag_ip6(self.pg0,
5807 self.pg0.add_stream(pkts)
5808 self.pg_enable_capture(self.pg_interfaces)
5810 frags = self.pg0.get_capture(len(pkts))
5811 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
5812 self.assertNotEqual(p[TCP].sport, client_in_port)
5813 self.assertEqual(p[TCP].dport, server_in_port)
5814 self.assertEqual(data, p[Raw].load)
5816 def test_frag_out_of_order(self):
5817 """ NAT64 translate fragments arriving out of order """
5818 self.tcp_port_in = random.randint(1025, 65535)
5820 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5822 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5823 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5827 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5828 self.tcp_port_in, 20, data)
5830 self.pg0.add_stream(pkts)
5831 self.pg_enable_capture(self.pg_interfaces)
5833 frags = self.pg1.get_capture(len(pkts))
5834 p = self.reass_frags_and_verify(frags,
5836 self.pg1.remote_ip4)
5837 self.assertEqual(p[TCP].dport, 20)
5838 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5839 self.tcp_port_out = p[TCP].sport
5840 self.assertEqual(data, p[Raw].load)
5843 data = "A" * 4 + "B" * 16 + "C" * 3
5844 pkts = self.create_stream_frag(self.pg1,
5850 self.pg1.add_stream(pkts)
5851 self.pg_enable_capture(self.pg_interfaces)
5853 frags = self.pg0.get_capture(len(pkts))
5854 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5855 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5856 self.assertEqual(p[TCP].sport, 20)
5857 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5858 self.assertEqual(data, p[Raw].load)
5860 def test_interface_addr(self):
5861 """ Acquire NAT64 pool addresses from interface """
5862 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
5864 # no address in NAT64 pool
5865 adresses = self.vapi.nat44_address_dump()
5866 self.assertEqual(0, len(adresses))
5868 # configure interface address and check NAT64 address pool
5869 self.pg4.config_ip4()
5870 addresses = self.vapi.nat64_pool_addr_dump()
5871 self.assertEqual(len(addresses), 1)
5872 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
5874 # remove interface address and check NAT64 address pool
5875 self.pg4.unconfig_ip4()
5876 addresses = self.vapi.nat64_pool_addr_dump()
5877 self.assertEqual(0, len(adresses))
5879 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5880 def test_ipfix_max_bibs_sessions(self):
5881 """ IPFIX logging maximum session and BIB entries exceeded """
5884 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5888 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5890 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5891 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5895 for i in range(0, max_bibs):
5896 src = "fd01:aa::%x" % (i)
5897 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5898 IPv6(src=src, dst=remote_host_ip6) /
5899 TCP(sport=12345, dport=80))
5901 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5902 IPv6(src=src, dst=remote_host_ip6) /
5903 TCP(sport=12345, dport=22))
5905 self.pg0.add_stream(pkts)
5906 self.pg_enable_capture(self.pg_interfaces)
5908 self.pg1.get_capture(max_sessions)
5910 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5911 src_address=self.pg3.local_ip4n,
5913 template_interval=10)
5914 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5915 src_port=self.ipfix_src_port)
5917 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5918 IPv6(src=src, dst=remote_host_ip6) /
5919 TCP(sport=12345, dport=25))
5920 self.pg0.add_stream(p)
5921 self.pg_enable_capture(self.pg_interfaces)
5923 self.pg1.get_capture(0)
5924 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5925 capture = self.pg3.get_capture(9)
5926 ipfix = IPFIXDecoder()
5927 # first load template
5929 self.assertTrue(p.haslayer(IPFIX))
5930 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5931 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5932 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5933 self.assertEqual(p[UDP].dport, 4739)
5934 self.assertEqual(p[IPFIX].observationDomainID,
5935 self.ipfix_domain_id)
5936 if p.haslayer(Template):
5937 ipfix.add_template(p.getlayer(Template))
5938 # verify events in data set
5940 if p.haslayer(Data):
5941 data = ipfix.decode_data_set(p.getlayer(Set))
5942 self.verify_ipfix_max_sessions(data, max_sessions)
5944 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5945 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5946 TCP(sport=12345, dport=80))
5947 self.pg0.add_stream(p)
5948 self.pg_enable_capture(self.pg_interfaces)
5950 self.pg1.get_capture(0)
5951 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5952 capture = self.pg3.get_capture(1)
5953 # verify events in data set
5955 self.assertTrue(p.haslayer(IPFIX))
5956 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5957 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5958 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5959 self.assertEqual(p[UDP].dport, 4739)
5960 self.assertEqual(p[IPFIX].observationDomainID,
5961 self.ipfix_domain_id)
5962 if p.haslayer(Data):
5963 data = ipfix.decode_data_set(p.getlayer(Set))
5964 self.verify_ipfix_max_bibs(data, max_bibs)
5966 def test_ipfix_max_frags(self):
5967 """ IPFIX logging maximum fragments pending reassembly exceeded """
5968 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5970 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5971 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5972 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
5973 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5974 src_address=self.pg3.local_ip4n,
5976 template_interval=10)
5977 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5978 src_port=self.ipfix_src_port)
5981 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5982 self.tcp_port_in, 20, data)
5983 self.pg0.add_stream(pkts[-1])
5984 self.pg_enable_capture(self.pg_interfaces)
5986 self.pg1.get_capture(0)
5987 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5988 capture = self.pg3.get_capture(9)
5989 ipfix = IPFIXDecoder()
5990 # first load template
5992 self.assertTrue(p.haslayer(IPFIX))
5993 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5994 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5995 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5996 self.assertEqual(p[UDP].dport, 4739)
5997 self.assertEqual(p[IPFIX].observationDomainID,
5998 self.ipfix_domain_id)
5999 if p.haslayer(Template):
6000 ipfix.add_template(p.getlayer(Template))
6001 # verify events in data set
6003 if p.haslayer(Data):
6004 data = ipfix.decode_data_set(p.getlayer(Set))
6005 self.verify_ipfix_max_fragments_ip6(data, 0,
6006 self.pg0.remote_ip6n)
6008 def test_ipfix_bib_ses(self):
6009 """ IPFIX logging NAT64 BIB/session create and delete events """
6010 self.tcp_port_in = random.randint(1025, 65535)
6011 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
6015 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6017 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6018 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6019 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6020 src_address=self.pg3.local_ip4n,
6022 template_interval=10)
6023 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6024 src_port=self.ipfix_src_port)
6027 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6028 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
6029 TCP(sport=self.tcp_port_in, dport=25))
6030 self.pg0.add_stream(p)
6031 self.pg_enable_capture(self.pg_interfaces)
6033 p = self.pg1.get_capture(1)
6034 self.tcp_port_out = p[0][TCP].sport
6035 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6036 capture = self.pg3.get_capture(10)
6037 ipfix = IPFIXDecoder()
6038 # first load template
6040 self.assertTrue(p.haslayer(IPFIX))
6041 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6042 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6043 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6044 self.assertEqual(p[UDP].dport, 4739)
6045 self.assertEqual(p[IPFIX].observationDomainID,
6046 self.ipfix_domain_id)
6047 if p.haslayer(Template):
6048 ipfix.add_template(p.getlayer(Template))
6049 # verify events in data set
6051 if p.haslayer(Data):
6052 data = ipfix.decode_data_set(p.getlayer(Set))
6053 if ord(data[0][230]) == 10:
6054 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
6055 elif ord(data[0][230]) == 6:
6056 self.verify_ipfix_nat64_ses(data,
6058 self.pg0.remote_ip6n,
6059 self.pg1.remote_ip4,
6062 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6065 self.pg_enable_capture(self.pg_interfaces)
6066 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6069 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6070 capture = self.pg3.get_capture(2)
6071 # verify events in data set
6073 self.assertTrue(p.haslayer(IPFIX))
6074 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6075 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6076 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6077 self.assertEqual(p[UDP].dport, 4739)
6078 self.assertEqual(p[IPFIX].observationDomainID,
6079 self.ipfix_domain_id)
6080 if p.haslayer(Data):
6081 data = ipfix.decode_data_set(p.getlayer(Set))
6082 if ord(data[0][230]) == 11:
6083 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
6084 elif ord(data[0][230]) == 7:
6085 self.verify_ipfix_nat64_ses(data,
6087 self.pg0.remote_ip6n,
6088 self.pg1.remote_ip4,
6091 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6093 def nat64_get_ses_num(self):
6095 Return number of active NAT64 sessions.
6097 st = self.vapi.nat64_st_dump()
6100 def clear_nat64(self):
6102 Clear NAT64 configuration.
6104 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
6105 domain_id=self.ipfix_domain_id)
6106 self.ipfix_src_port = 4739
6107 self.ipfix_domain_id = 1
6109 self.vapi.nat64_set_timeouts()
6111 interfaces = self.vapi.nat64_interface_dump()
6112 for intf in interfaces:
6113 if intf.is_inside > 1:
6114 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6117 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6121 bib = self.vapi.nat64_bib_dump(255)
6124 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
6132 adresses = self.vapi.nat64_pool_addr_dump()
6133 for addr in adresses:
6134 self.vapi.nat64_add_del_pool_addr_range(addr.address,
6139 prefixes = self.vapi.nat64_prefix_dump()
6140 for prefix in prefixes:
6141 self.vapi.nat64_add_del_prefix(prefix.prefix,
6143 vrf_id=prefix.vrf_id,
6147 super(TestNAT64, self).tearDown()
6148 if not self.vpp_dead:
6149 self.logger.info(self.vapi.cli("show nat64 pool"))
6150 self.logger.info(self.vapi.cli("show nat64 interfaces"))
6151 self.logger.info(self.vapi.cli("show nat64 prefix"))
6152 self.logger.info(self.vapi.cli("show nat64 bib all"))
6153 self.logger.info(self.vapi.cli("show nat64 session table all"))
6154 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
6158 class TestDSlite(MethodHolder):
6159 """ DS-Lite Test Cases """
6162 def setUpClass(cls):
6163 super(TestDSlite, cls).setUpClass()
6166 cls.nat_addr = '10.0.0.3'
6167 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
6169 cls.create_pg_interfaces(range(2))
6171 cls.pg0.config_ip4()
6172 cls.pg0.resolve_arp()
6174 cls.pg1.config_ip6()
6175 cls.pg1.generate_remote_hosts(2)
6176 cls.pg1.configure_ipv6_neighbors()
6179 super(TestDSlite, cls).tearDownClass()
6182 def test_dslite(self):
6183 """ Test DS-Lite """
6184 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
6186 aftr_ip4 = '192.0.0.1'
6187 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6188 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6189 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6190 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6193 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6194 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
6195 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6196 UDP(sport=20000, dport=10000))
6197 self.pg1.add_stream(p)
6198 self.pg_enable_capture(self.pg_interfaces)
6200 capture = self.pg0.get_capture(1)
6201 capture = capture[0]
6202 self.assertFalse(capture.haslayer(IPv6))
6203 self.assertEqual(capture[IP].src, self.nat_addr)
6204 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6205 self.assertNotEqual(capture[UDP].sport, 20000)
6206 self.assertEqual(capture[UDP].dport, 10000)
6207 self.assert_packet_checksums_valid(capture)
6208 out_port = capture[UDP].sport
6210 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6211 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6212 UDP(sport=10000, dport=out_port))
6213 self.pg0.add_stream(p)
6214 self.pg_enable_capture(self.pg_interfaces)
6216 capture = self.pg1.get_capture(1)
6217 capture = capture[0]
6218 self.assertEqual(capture[IPv6].src, aftr_ip6)
6219 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6220 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6221 self.assertEqual(capture[IP].dst, '192.168.1.1')
6222 self.assertEqual(capture[UDP].sport, 10000)
6223 self.assertEqual(capture[UDP].dport, 20000)
6224 self.assert_packet_checksums_valid(capture)
6227 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6228 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6229 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6230 TCP(sport=20001, dport=10001))
6231 self.pg1.add_stream(p)
6232 self.pg_enable_capture(self.pg_interfaces)
6234 capture = self.pg0.get_capture(1)
6235 capture = capture[0]
6236 self.assertFalse(capture.haslayer(IPv6))
6237 self.assertEqual(capture[IP].src, self.nat_addr)
6238 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6239 self.assertNotEqual(capture[TCP].sport, 20001)
6240 self.assertEqual(capture[TCP].dport, 10001)
6241 self.assert_packet_checksums_valid(capture)
6242 out_port = capture[TCP].sport
6244 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6245 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6246 TCP(sport=10001, dport=out_port))
6247 self.pg0.add_stream(p)
6248 self.pg_enable_capture(self.pg_interfaces)
6250 capture = self.pg1.get_capture(1)
6251 capture = capture[0]
6252 self.assertEqual(capture[IPv6].src, aftr_ip6)
6253 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6254 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6255 self.assertEqual(capture[IP].dst, '192.168.1.1')
6256 self.assertEqual(capture[TCP].sport, 10001)
6257 self.assertEqual(capture[TCP].dport, 20001)
6258 self.assert_packet_checksums_valid(capture)
6261 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6262 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6263 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6264 ICMP(id=4000, type='echo-request'))
6265 self.pg1.add_stream(p)
6266 self.pg_enable_capture(self.pg_interfaces)
6268 capture = self.pg0.get_capture(1)
6269 capture = capture[0]
6270 self.assertFalse(capture.haslayer(IPv6))
6271 self.assertEqual(capture[IP].src, self.nat_addr)
6272 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6273 self.assertNotEqual(capture[ICMP].id, 4000)
6274 self.assert_packet_checksums_valid(capture)
6275 out_id = capture[ICMP].id
6277 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6278 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6279 ICMP(id=out_id, type='echo-reply'))
6280 self.pg0.add_stream(p)
6281 self.pg_enable_capture(self.pg_interfaces)
6283 capture = self.pg1.get_capture(1)
6284 capture = capture[0]
6285 self.assertEqual(capture[IPv6].src, aftr_ip6)
6286 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6287 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6288 self.assertEqual(capture[IP].dst, '192.168.1.1')
6289 self.assertEqual(capture[ICMP].id, 4000)
6290 self.assert_packet_checksums_valid(capture)
6292 # ping DS-Lite AFTR tunnel endpoint address
6293 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6294 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
6295 ICMPv6EchoRequest())
6296 self.pg1.add_stream(p)
6297 self.pg_enable_capture(self.pg_interfaces)
6299 capture = self.pg1.get_capture(1)
6300 self.assertEqual(1, len(capture))
6301 capture = capture[0]
6302 self.assertEqual(capture[IPv6].src, aftr_ip6)
6303 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6304 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6307 super(TestDSlite, self).tearDown()
6308 if not self.vpp_dead:
6309 self.logger.info(self.vapi.cli("show dslite pool"))
6311 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6312 self.logger.info(self.vapi.cli("show dslite sessions"))
6315 class TestDSliteCE(MethodHolder):
6316 """ DS-Lite CE Test Cases """
6319 def setUpConstants(cls):
6320 super(TestDSliteCE, cls).setUpConstants()
6321 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
6324 def setUpClass(cls):
6325 super(TestDSliteCE, cls).setUpClass()
6328 cls.create_pg_interfaces(range(2))
6330 cls.pg0.config_ip4()
6331 cls.pg0.resolve_arp()
6333 cls.pg1.config_ip6()
6334 cls.pg1.generate_remote_hosts(1)
6335 cls.pg1.configure_ipv6_neighbors()
6338 super(TestDSliteCE, cls).tearDownClass()
6341 def test_dslite_ce(self):
6342 """ Test DS-Lite CE """
6344 b4_ip4 = '192.0.0.2'
6345 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
6346 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
6347 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
6348 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
6350 aftr_ip4 = '192.0.0.1'
6351 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6352 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6353 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6354 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6356 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
6357 dst_address_length=128,
6358 next_hop_address=self.pg1.remote_ip6n,
6359 next_hop_sw_if_index=self.pg1.sw_if_index,
6363 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6364 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
6365 UDP(sport=10000, dport=20000))
6366 self.pg0.add_stream(p)
6367 self.pg_enable_capture(self.pg_interfaces)
6369 capture = self.pg1.get_capture(1)
6370 capture = capture[0]
6371 self.assertEqual(capture[IPv6].src, b4_ip6)
6372 self.assertEqual(capture[IPv6].dst, aftr_ip6)
6373 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6374 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
6375 self.assertEqual(capture[UDP].sport, 10000)
6376 self.assertEqual(capture[UDP].dport, 20000)
6377 self.assert_packet_checksums_valid(capture)
6380 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6381 IPv6(dst=b4_ip6, src=aftr_ip6) /
6382 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
6383 UDP(sport=20000, dport=10000))
6384 self.pg1.add_stream(p)
6385 self.pg_enable_capture(self.pg_interfaces)
6387 capture = self.pg0.get_capture(1)
6388 capture = capture[0]
6389 self.assertFalse(capture.haslayer(IPv6))
6390 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
6391 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6392 self.assertEqual(capture[UDP].sport, 20000)
6393 self.assertEqual(capture[UDP].dport, 10000)
6394 self.assert_packet_checksums_valid(capture)
6396 # ping DS-Lite B4 tunnel endpoint address
6397 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6398 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
6399 ICMPv6EchoRequest())
6400 self.pg1.add_stream(p)
6401 self.pg_enable_capture(self.pg_interfaces)
6403 capture = self.pg1.get_capture(1)
6404 self.assertEqual(1, len(capture))
6405 capture = capture[0]
6406 self.assertEqual(capture[IPv6].src, b4_ip6)
6407 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6408 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6411 super(TestDSliteCE, self).tearDown()
6412 if not self.vpp_dead:
6414 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6416 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
6419 class TestNAT66(MethodHolder):
6420 """ NAT66 Test Cases """
6423 def setUpClass(cls):
6424 super(TestNAT66, cls).setUpClass()
6427 cls.nat_addr = 'fd01:ff::2'
6428 cls.nat_addr_n = socket.inet_pton(socket.AF_INET6, cls.nat_addr)
6430 cls.create_pg_interfaces(range(2))
6431 cls.interfaces = list(cls.pg_interfaces)
6433 for i in cls.interfaces:
6436 i.configure_ipv6_neighbors()
6439 super(TestNAT66, cls).tearDownClass()
6442 def test_static(self):
6443 """ 1:1 NAT66 test """
6444 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
6445 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6446 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
6451 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6452 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6455 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6456 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6459 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6460 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6461 ICMPv6EchoRequest())
6463 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6464 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6465 GRE() / IP() / TCP())
6467 self.pg0.add_stream(pkts)
6468 self.pg_enable_capture(self.pg_interfaces)
6470 capture = self.pg1.get_capture(len(pkts))
6471 for packet in capture:
6473 self.assertEqual(packet[IPv6].src, self.nat_addr)
6474 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
6475 self.assert_packet_checksums_valid(packet)
6477 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6482 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6483 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6486 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6487 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6490 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6491 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6494 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6495 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6496 GRE() / IP() / TCP())
6498 self.pg1.add_stream(pkts)
6499 self.pg_enable_capture(self.pg_interfaces)
6501 capture = self.pg0.get_capture(len(pkts))
6502 for packet in capture:
6504 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
6505 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6506 self.assert_packet_checksums_valid(packet)
6508 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6511 sm = self.vapi.nat66_static_mapping_dump()
6512 self.assertEqual(len(sm), 1)
6513 self.assertEqual(sm[0].total_pkts, 8)
6515 def test_check_no_translate(self):
6516 """ NAT66 translate only when egress interface is outside interface """
6517 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
6518 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index)
6519 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
6523 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6524 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6526 self.pg0.add_stream([p])
6527 self.pg_enable_capture(self.pg_interfaces)
6529 capture = self.pg1.get_capture(1)
6532 self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
6533 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
6535 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6538 def clear_nat66(self):
6540 Clear NAT66 configuration.
6542 interfaces = self.vapi.nat66_interface_dump()
6543 for intf in interfaces:
6544 self.vapi.nat66_add_del_interface(intf.sw_if_index,
6548 static_mappings = self.vapi.nat66_static_mapping_dump()
6549 for sm in static_mappings:
6550 self.vapi.nat66_add_del_static_mapping(sm.local_ip_address,
6551 sm.external_ip_address,
6556 super(TestNAT66, self).tearDown()
6557 if not self.vpp_dead:
6558 self.logger.info(self.vapi.cli("show nat66 interfaces"))
6559 self.logger.info(self.vapi.cli("show nat66 static mappings"))
6563 if __name__ == '__main__':
6564 unittest.main(testRunner=VppTestRunner)