9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
31 super(MethodHolder, cls).setUpClass()
34 super(MethodHolder, self).tearDown()
36 def check_ip_checksum(self, pkt):
38 Check IP checksum of the packet
40 :param pkt: Packet to check IP checksum
42 new = pkt.__class__(str(pkt))
44 new = new.__class__(str(new))
45 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
47 def check_tcp_checksum(self, pkt):
49 Check TCP checksum in IP packet
51 :param pkt: Packet to check TCP checksum
53 new = pkt.__class__(str(pkt))
55 new = new.__class__(str(new))
56 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
58 def check_udp_checksum(self, pkt):
60 Check UDP checksum in IP packet
62 :param pkt: Packet to check UDP checksum
64 new = pkt.__class__(str(pkt))
66 new = new.__class__(str(new))
67 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
69 def check_icmp_errror_embedded(self, pkt):
71 Check ICMP error embeded packet checksum
73 :param pkt: Packet to check ICMP error embeded packet checksum
75 if pkt.haslayer(IPerror):
76 new = pkt.__class__(str(pkt))
77 del new['IPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
81 if pkt.haslayer(TCPerror):
82 new = pkt.__class__(str(pkt))
83 del new['TCPerror'].chksum
84 new = new.__class__(str(new))
85 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
87 if pkt.haslayer(UDPerror):
88 if pkt['UDPerror'].chksum != 0:
89 new = pkt.__class__(str(pkt))
90 del new['UDPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['UDPerror'].chksum,
93 pkt['UDPerror'].chksum)
95 if pkt.haslayer(ICMPerror):
96 del new['ICMPerror'].chksum
97 new = new.__class__(str(new))
98 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
100 def check_icmp_checksum(self, pkt):
102 Check ICMP checksum in IPv4 packet
104 :param pkt: Packet to check ICMP checksum
106 new = pkt.__class__(str(pkt))
107 del new['ICMP'].chksum
108 new = new.__class__(str(new))
109 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110 if pkt.haslayer(IPerror):
111 self.check_icmp_errror_embedded(pkt)
113 def check_icmpv6_checksum(self, pkt):
115 Check ICMPv6 checksum in IPv4 packet
117 :param pkt: Packet to check ICMPv6 checksum
119 new = pkt.__class__(str(pkt))
120 if pkt.haslayer(ICMPv6DestUnreach):
121 del new['ICMPv6DestUnreach'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124 pkt['ICMPv6DestUnreach'].cksum)
125 self.check_icmp_errror_embedded(pkt)
126 if pkt.haslayer(ICMPv6EchoRequest):
127 del new['ICMPv6EchoRequest'].cksum
128 new = new.__class__(str(new))
129 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130 pkt['ICMPv6EchoRequest'].cksum)
131 if pkt.haslayer(ICMPv6EchoReply):
132 del new['ICMPv6EchoReply'].cksum
133 new = new.__class__(str(new))
134 self.assertEqual(new['ICMPv6EchoReply'].cksum,
135 pkt['ICMPv6EchoReply'].cksum)
137 def create_stream_in(self, in_if, out_if, ttl=64):
139 Create packet stream for inside network
141 :param in_if: Inside interface
142 :param out_if: Outside interface
143 :param ttl: TTL of generated packets
147 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
148 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
149 TCP(sport=self.tcp_port_in, dport=20))
153 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
154 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
155 UDP(sport=self.udp_port_in, dport=20))
159 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
160 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
161 ICMP(id=self.icmp_id_in, type='echo-request'))
166 def compose_ip6(self, ip4, pref, plen):
168 Compose IPv4-embedded IPv6 addresses
170 :param ip4: IPv4 address
171 :param pref: IPv6 prefix
172 :param plen: IPv6 prefix length
173 :returns: IPv4-embedded IPv6 addresses
175 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
176 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
191 pref_n[10] = ip4_n[3]
195 pref_n[10] = ip4_n[2]
196 pref_n[11] = ip4_n[3]
199 pref_n[10] = ip4_n[1]
200 pref_n[11] = ip4_n[2]
201 pref_n[12] = ip4_n[3]
203 pref_n[12] = ip4_n[0]
204 pref_n[13] = ip4_n[1]
205 pref_n[14] = ip4_n[2]
206 pref_n[15] = ip4_n[3]
207 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
209 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
211 Create IPv6 packet stream for inside network
213 :param in_if: Inside interface
214 :param out_if: Outside interface
215 :param ttl: Hop Limit of generated packets
216 :param pref: NAT64 prefix
217 :param plen: NAT64 prefix length
221 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
223 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
226 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
227 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
228 TCP(sport=self.tcp_port_in, dport=20))
232 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
233 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
234 UDP(sport=self.udp_port_in, dport=20))
238 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
239 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
240 ICMPv6EchoRequest(id=self.icmp_id_in))
245 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
247 Create packet stream for outside network
249 :param out_if: Outside interface
250 :param dst_ip: Destination IP address (Default use global NAT address)
251 :param ttl: TTL of generated packets
254 dst_ip = self.nat_addr
257 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
258 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
259 TCP(dport=self.tcp_port_out, sport=20))
263 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
264 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
265 UDP(dport=self.udp_port_out, sport=20))
269 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
270 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
271 ICMP(id=self.icmp_id_out, type='echo-reply'))
276 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
277 packet_num=3, dst_ip=None):
279 Verify captured packets on outside network
281 :param capture: Captured packets
282 :param nat_ip: Translated IP address (Default use global NAT address)
283 :param same_port: Sorce port number is not translated (Default False)
284 :param packet_num: Expected number of packets (Default 3)
285 :param dst_ip: Destination IP address (Default do not verify)
288 nat_ip = self.nat_addr
289 self.assertEqual(packet_num, len(capture))
290 for packet in capture:
292 self.check_ip_checksum(packet)
293 self.assertEqual(packet[IP].src, nat_ip)
294 if dst_ip is not None:
295 self.assertEqual(packet[IP].dst, dst_ip)
296 if packet.haslayer(TCP):
298 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
301 packet[TCP].sport, self.tcp_port_in)
302 self.tcp_port_out = packet[TCP].sport
303 self.check_tcp_checksum(packet)
304 elif packet.haslayer(UDP):
306 self.assertEqual(packet[UDP].sport, self.udp_port_in)
309 packet[UDP].sport, self.udp_port_in)
310 self.udp_port_out = packet[UDP].sport
313 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
315 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
316 self.icmp_id_out = packet[ICMP].id
317 self.check_icmp_checksum(packet)
319 self.logger.error(ppp("Unexpected or invalid packet "
320 "(outside network):", packet))
323 def verify_capture_in(self, capture, in_if, packet_num=3):
325 Verify captured packets on inside network
327 :param capture: Captured packets
328 :param in_if: Inside interface
329 :param packet_num: Expected number of packets (Default 3)
331 self.assertEqual(packet_num, len(capture))
332 for packet in capture:
334 self.check_ip_checksum(packet)
335 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
336 if packet.haslayer(TCP):
337 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
338 self.check_tcp_checksum(packet)
339 elif packet.haslayer(UDP):
340 self.assertEqual(packet[UDP].dport, self.udp_port_in)
342 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
343 self.check_icmp_checksum(packet)
345 self.logger.error(ppp("Unexpected or invalid packet "
346 "(inside network):", packet))
349 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
351 Verify captured IPv6 packets on inside network
353 :param capture: Captured packets
354 :param src_ip: Source IP
355 :param dst_ip: Destination IP address
356 :param packet_num: Expected number of packets (Default 3)
358 self.assertEqual(packet_num, len(capture))
359 for packet in capture:
361 self.assertEqual(packet[IPv6].src, src_ip)
362 self.assertEqual(packet[IPv6].dst, dst_ip)
363 if packet.haslayer(TCP):
364 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
365 self.check_tcp_checksum(packet)
366 elif packet.haslayer(UDP):
367 self.assertEqual(packet[UDP].dport, self.udp_port_in)
368 self.check_udp_checksum(packet)
370 self.assertEqual(packet[ICMPv6EchoReply].id,
372 self.check_icmpv6_checksum(packet)
374 self.logger.error(ppp("Unexpected or invalid packet "
375 "(inside network):", packet))
378 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
380 Verify captured packet that don't have to be translated
382 :param capture: Captured packets
383 :param ingress_if: Ingress interface
384 :param egress_if: Egress interface
386 for packet in capture:
388 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
389 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
390 if packet.haslayer(TCP):
391 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
392 elif packet.haslayer(UDP):
393 self.assertEqual(packet[UDP].sport, self.udp_port_in)
395 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
397 self.logger.error(ppp("Unexpected or invalid packet "
398 "(inside network):", packet))
401 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
402 packet_num=3, icmp_type=11):
404 Verify captured packets with ICMP errors on outside network
406 :param capture: Captured packets
407 :param src_ip: Translated IP address or IP address of VPP
408 (Default use global NAT address)
409 :param packet_num: Expected number of packets (Default 3)
410 :param icmp_type: Type of error ICMP packet
411 we are expecting (Default 11)
414 src_ip = self.nat_addr
415 self.assertEqual(packet_num, len(capture))
416 for packet in capture:
418 self.assertEqual(packet[IP].src, src_ip)
419 self.assertTrue(packet.haslayer(ICMP))
421 self.assertEqual(icmp.type, icmp_type)
422 self.assertTrue(icmp.haslayer(IPerror))
423 inner_ip = icmp[IPerror]
424 if inner_ip.haslayer(TCPerror):
425 self.assertEqual(inner_ip[TCPerror].dport,
427 elif inner_ip.haslayer(UDPerror):
428 self.assertEqual(inner_ip[UDPerror].dport,
431 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
433 self.logger.error(ppp("Unexpected or invalid packet "
434 "(outside network):", packet))
437 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
440 Verify captured packets with ICMP errors on inside network
442 :param capture: Captured packets
443 :param in_if: Inside interface
444 :param packet_num: Expected number of packets (Default 3)
445 :param icmp_type: Type of error ICMP packet
446 we are expecting (Default 11)
448 self.assertEqual(packet_num, len(capture))
449 for packet in capture:
451 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
452 self.assertTrue(packet.haslayer(ICMP))
454 self.assertEqual(icmp.type, icmp_type)
455 self.assertTrue(icmp.haslayer(IPerror))
456 inner_ip = icmp[IPerror]
457 if inner_ip.haslayer(TCPerror):
458 self.assertEqual(inner_ip[TCPerror].sport,
460 elif inner_ip.haslayer(UDPerror):
461 self.assertEqual(inner_ip[UDPerror].sport,
464 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
466 self.logger.error(ppp("Unexpected or invalid packet "
467 "(inside network):", packet))
470 def create_stream_frag(self, src_if, dst, sport, dport, data):
472 Create fragmented packet stream
474 :param src_if: Source interface
475 :param dst: Destination IPv4 address
476 :param sport: Source TCP port
477 :param dport: Destination TCP port
478 :param data: Payload data
481 id = random.randint(0, 65535)
482 p = (IP(src=src_if.remote_ip4, dst=dst) /
483 TCP(sport=sport, dport=dport) /
485 p = p.__class__(str(p))
486 chksum = p['TCP'].chksum
488 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
489 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
490 TCP(sport=sport, dport=dport, chksum=chksum) /
493 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
494 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
495 proto=IP_PROTOS.tcp) /
498 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
499 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
505 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
506 pref=None, plen=0, frag_size=128):
508 Create fragmented packet stream
510 :param src_if: Source interface
511 :param dst: Destination IPv4 address
512 :param sport: Source TCP port
513 :param dport: Destination TCP port
514 :param data: Payload data
515 :param pref: NAT64 prefix
516 :param plen: NAT64 prefix length
517 :param fragsize: size of fragments
521 dst_ip6 = ''.join(['64:ff9b::', dst])
523 dst_ip6 = self.compose_ip6(dst, pref, plen)
525 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
526 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
527 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
528 TCP(sport=sport, dport=dport) /
531 return fragment6(p, frag_size)
533 def reass_frags_and_verify(self, frags, src, dst):
535 Reassemble and verify fragmented packet
537 :param frags: Captured fragments
538 :param src: Source IPv4 address to verify
539 :param dst: Destination IPv4 address to verify
541 :returns: Reassembled IPv4 packet
543 buffer = StringIO.StringIO()
545 self.assertEqual(p[IP].src, src)
546 self.assertEqual(p[IP].dst, dst)
547 self.check_ip_checksum(p)
548 buffer.seek(p[IP].frag * 8)
549 buffer.write(p[IP].payload)
550 ip = frags[0].getlayer(IP)
551 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
552 proto=frags[0][IP].proto)
553 if ip.proto == IP_PROTOS.tcp:
554 p = (ip / TCP(buffer.getvalue()))
555 self.check_tcp_checksum(p)
556 elif ip.proto == IP_PROTOS.udp:
557 p = (ip / UDP(buffer.getvalue()))
560 def reass_frags_and_verify_ip6(self, frags, src, dst):
562 Reassemble and verify fragmented packet
564 :param frags: Captured fragments
565 :param src: Source IPv6 address to verify
566 :param dst: Destination IPv6 address to verify
568 :returns: Reassembled IPv6 packet
570 buffer = StringIO.StringIO()
572 self.assertEqual(p[IPv6].src, src)
573 self.assertEqual(p[IPv6].dst, dst)
574 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
575 buffer.write(p[IPv6ExtHdrFragment].payload)
576 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
577 nh=frags[0][IPv6ExtHdrFragment].nh)
578 if ip.nh == IP_PROTOS.tcp:
579 p = (ip / TCP(buffer.getvalue()))
580 self.check_tcp_checksum(p)
581 elif ip.nh == IP_PROTOS.udp:
582 p = (ip / UDP(buffer.getvalue()))
585 def verify_ipfix_nat44_ses(self, data):
587 Verify IPFIX NAT44 session create/delete event
589 :param data: Decoded IPFIX data records
591 nat44_ses_create_num = 0
592 nat44_ses_delete_num = 0
593 self.assertEqual(6, len(data))
596 self.assertIn(ord(record[230]), [4, 5])
597 if ord(record[230]) == 4:
598 nat44_ses_create_num += 1
600 nat44_ses_delete_num += 1
602 self.assertEqual(self.pg0.remote_ip4n, record[8])
603 # postNATSourceIPv4Address
604 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
607 self.assertEqual(struct.pack("!I", 0), record[234])
608 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
609 if IP_PROTOS.icmp == ord(record[4]):
610 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
611 self.assertEqual(struct.pack("!H", self.icmp_id_out),
613 elif IP_PROTOS.tcp == ord(record[4]):
614 self.assertEqual(struct.pack("!H", self.tcp_port_in),
616 self.assertEqual(struct.pack("!H", self.tcp_port_out),
618 elif IP_PROTOS.udp == ord(record[4]):
619 self.assertEqual(struct.pack("!H", self.udp_port_in),
621 self.assertEqual(struct.pack("!H", self.udp_port_out),
624 self.fail("Invalid protocol")
625 self.assertEqual(3, nat44_ses_create_num)
626 self.assertEqual(3, nat44_ses_delete_num)
628 def verify_ipfix_addr_exhausted(self, data):
630 Verify IPFIX NAT addresses event
632 :param data: Decoded IPFIX data records
634 self.assertEqual(1, len(data))
637 self.assertEqual(ord(record[230]), 3)
639 self.assertEqual(struct.pack("!I", 0), record[283])
642 class TestNAT44(MethodHolder):
643 """ NAT44 Test Cases """
647 super(TestNAT44, cls).setUpClass()
650 cls.tcp_port_in = 6303
651 cls.tcp_port_out = 6303
652 cls.udp_port_in = 6304
653 cls.udp_port_out = 6304
654 cls.icmp_id_in = 6305
655 cls.icmp_id_out = 6305
656 cls.nat_addr = '10.0.0.3'
657 cls.ipfix_src_port = 4739
658 cls.ipfix_domain_id = 1
660 cls.create_pg_interfaces(range(10))
661 cls.interfaces = list(cls.pg_interfaces[0:4])
663 for i in cls.interfaces:
668 cls.pg0.generate_remote_hosts(3)
669 cls.pg0.configure_ipv4_neighbors()
671 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
672 cls.vapi.ip_table_add_del(10, is_add=1)
673 cls.vapi.ip_table_add_del(20, is_add=1)
675 cls.pg4._local_ip4 = "172.16.255.1"
676 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
677 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
678 cls.pg4.set_table_ip4(10)
679 cls.pg5._local_ip4 = "172.17.255.3"
680 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
681 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
682 cls.pg5.set_table_ip4(10)
683 cls.pg6._local_ip4 = "172.16.255.1"
684 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
685 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
686 cls.pg6.set_table_ip4(20)
687 for i in cls.overlapping_interfaces:
695 cls.pg9.generate_remote_hosts(2)
697 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
698 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
702 cls.pg9.resolve_arp()
703 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
704 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
705 cls.pg9.resolve_arp()
708 super(TestNAT44, cls).tearDownClass()
711 def clear_nat44(self):
713 Clear NAT44 configuration.
715 # I found no elegant way to do this
716 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
717 dst_address_length=32,
718 next_hop_address=self.pg7.remote_ip4n,
719 next_hop_sw_if_index=self.pg7.sw_if_index,
721 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
722 dst_address_length=32,
723 next_hop_address=self.pg8.remote_ip4n,
724 next_hop_sw_if_index=self.pg8.sw_if_index,
727 for intf in [self.pg7, self.pg8]:
728 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
730 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
735 if self.pg7.has_ip4_config:
736 self.pg7.unconfig_ip4()
738 interfaces = self.vapi.nat44_interface_addr_dump()
739 for intf in interfaces:
740 self.vapi.nat44_add_interface_addr(intf.sw_if_index, is_add=0)
742 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
743 domain_id=self.ipfix_domain_id)
744 self.ipfix_src_port = 4739
745 self.ipfix_domain_id = 1
747 interfaces = self.vapi.nat44_interface_dump()
748 for intf in interfaces:
749 if intf.is_inside > 1:
750 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
753 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
757 interfaces = self.vapi.nat44_interface_output_feature_dump()
758 for intf in interfaces:
759 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
763 static_mappings = self.vapi.nat44_static_mapping_dump()
764 for sm in static_mappings:
765 self.vapi.nat44_add_del_static_mapping(
767 sm.external_ip_address,
768 local_port=sm.local_port,
769 external_port=sm.external_port,
770 addr_only=sm.addr_only,
772 protocol=sm.protocol,
775 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
776 for lb_sm in lb_static_mappings:
777 self.vapi.nat44_add_del_lb_static_mapping(
786 adresses = self.vapi.nat44_address_dump()
787 for addr in adresses:
788 self.vapi.nat44_add_del_address_range(addr.ip_address,
792 self.vapi.nat_set_reass()
793 self.vapi.nat_set_reass(is_ip6=1)
795 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
796 local_port=0, external_port=0, vrf_id=0,
797 is_add=1, external_sw_if_index=0xFFFFFFFF,
800 Add/delete NAT44 static mapping
802 :param local_ip: Local IP address
803 :param external_ip: External IP address
804 :param local_port: Local port number (Optional)
805 :param external_port: External port number (Optional)
806 :param vrf_id: VRF ID (Default 0)
807 :param is_add: 1 if add, 0 if delete (Default add)
808 :param external_sw_if_index: External interface instead of IP address
809 :param proto: IP protocol (Mandatory if port specified)
812 if local_port and external_port:
814 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
815 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
816 self.vapi.nat44_add_del_static_mapping(
819 external_sw_if_index,
827 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
829 Add/delete NAT44 address
831 :param ip: IP address
832 :param is_add: 1 if add, 0 if delete (Default add)
834 nat_addr = socket.inet_pton(socket.AF_INET, ip)
835 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
838 def test_dynamic(self):
839 """ NAT44 dynamic translation test """
841 self.nat44_add_address(self.nat_addr)
842 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
843 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
847 pkts = self.create_stream_in(self.pg0, self.pg1)
848 self.pg0.add_stream(pkts)
849 self.pg_enable_capture(self.pg_interfaces)
851 capture = self.pg1.get_capture(len(pkts))
852 self.verify_capture_out(capture)
855 pkts = self.create_stream_out(self.pg1)
856 self.pg1.add_stream(pkts)
857 self.pg_enable_capture(self.pg_interfaces)
859 capture = self.pg0.get_capture(len(pkts))
860 self.verify_capture_in(capture, self.pg0)
862 def test_dynamic_icmp_errors_in2out_ttl_1(self):
863 """ NAT44 handling of client packets with TTL=1 """
865 self.nat44_add_address(self.nat_addr)
866 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
867 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
870 # Client side - generate traffic
871 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
872 self.pg0.add_stream(pkts)
873 self.pg_enable_capture(self.pg_interfaces)
876 # Client side - verify ICMP type 11 packets
877 capture = self.pg0.get_capture(len(pkts))
878 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
880 def test_dynamic_icmp_errors_out2in_ttl_1(self):
881 """ NAT44 handling of server packets with TTL=1 """
883 self.nat44_add_address(self.nat_addr)
884 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
885 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
888 # Client side - create sessions
889 pkts = self.create_stream_in(self.pg0, self.pg1)
890 self.pg0.add_stream(pkts)
891 self.pg_enable_capture(self.pg_interfaces)
894 # Server side - generate traffic
895 capture = self.pg1.get_capture(len(pkts))
896 self.verify_capture_out(capture)
897 pkts = self.create_stream_out(self.pg1, ttl=1)
898 self.pg1.add_stream(pkts)
899 self.pg_enable_capture(self.pg_interfaces)
902 # Server side - verify ICMP type 11 packets
903 capture = self.pg1.get_capture(len(pkts))
904 self.verify_capture_out_with_icmp_errors(capture,
905 src_ip=self.pg1.local_ip4)
907 def test_dynamic_icmp_errors_in2out_ttl_2(self):
908 """ NAT44 handling of error responses to client packets with TTL=2 """
910 self.nat44_add_address(self.nat_addr)
911 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
912 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
915 # Client side - generate traffic
916 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
917 self.pg0.add_stream(pkts)
918 self.pg_enable_capture(self.pg_interfaces)
921 # Server side - simulate ICMP type 11 response
922 capture = self.pg1.get_capture(len(pkts))
923 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
924 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
925 ICMP(type=11) / packet[IP] for packet in capture]
926 self.pg1.add_stream(pkts)
927 self.pg_enable_capture(self.pg_interfaces)
930 # Client side - verify ICMP type 11 packets
931 capture = self.pg0.get_capture(len(pkts))
932 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
934 def test_dynamic_icmp_errors_out2in_ttl_2(self):
935 """ NAT44 handling of error responses to server packets with TTL=2 """
937 self.nat44_add_address(self.nat_addr)
938 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
939 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
942 # Client side - create sessions
943 pkts = self.create_stream_in(self.pg0, self.pg1)
944 self.pg0.add_stream(pkts)
945 self.pg_enable_capture(self.pg_interfaces)
948 # Server side - generate traffic
949 capture = self.pg1.get_capture(len(pkts))
950 self.verify_capture_out(capture)
951 pkts = self.create_stream_out(self.pg1, ttl=2)
952 self.pg1.add_stream(pkts)
953 self.pg_enable_capture(self.pg_interfaces)
956 # Client side - simulate ICMP type 11 response
957 capture = self.pg0.get_capture(len(pkts))
958 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
959 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
960 ICMP(type=11) / packet[IP] for packet in capture]
961 self.pg0.add_stream(pkts)
962 self.pg_enable_capture(self.pg_interfaces)
965 # Server side - verify ICMP type 11 packets
966 capture = self.pg1.get_capture(len(pkts))
967 self.verify_capture_out_with_icmp_errors(capture)
969 def test_ping_out_interface_from_outside(self):
970 """ Ping NAT44 out interface from outside network """
972 self.nat44_add_address(self.nat_addr)
973 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
974 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
977 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
978 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
979 ICMP(id=self.icmp_id_out, type='echo-request'))
981 self.pg1.add_stream(pkts)
982 self.pg_enable_capture(self.pg_interfaces)
984 capture = self.pg1.get_capture(len(pkts))
985 self.assertEqual(1, len(capture))
988 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
989 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
990 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
991 self.assertEqual(packet[ICMP].type, 0) # echo reply
993 self.logger.error(ppp("Unexpected or invalid packet "
994 "(outside network):", packet))
997 def test_ping_internal_host_from_outside(self):
998 """ Ping internal host from outside network """
1000 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1001 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1002 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1006 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1007 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1008 ICMP(id=self.icmp_id_out, type='echo-request'))
1009 self.pg1.add_stream(pkt)
1010 self.pg_enable_capture(self.pg_interfaces)
1012 capture = self.pg0.get_capture(1)
1013 self.verify_capture_in(capture, self.pg0, packet_num=1)
1014 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1017 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1018 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1019 ICMP(id=self.icmp_id_in, type='echo-reply'))
1020 self.pg0.add_stream(pkt)
1021 self.pg_enable_capture(self.pg_interfaces)
1023 capture = self.pg1.get_capture(1)
1024 self.verify_capture_out(capture, same_port=True, packet_num=1)
1025 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1027 def test_static_in(self):
1028 """ 1:1 NAT initialized from inside network """
1030 nat_ip = "10.0.0.10"
1031 self.tcp_port_out = 6303
1032 self.udp_port_out = 6304
1033 self.icmp_id_out = 6305
1035 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1036 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1037 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1041 pkts = self.create_stream_in(self.pg0, self.pg1)
1042 self.pg0.add_stream(pkts)
1043 self.pg_enable_capture(self.pg_interfaces)
1045 capture = self.pg1.get_capture(len(pkts))
1046 self.verify_capture_out(capture, nat_ip, True)
1049 pkts = self.create_stream_out(self.pg1, nat_ip)
1050 self.pg1.add_stream(pkts)
1051 self.pg_enable_capture(self.pg_interfaces)
1053 capture = self.pg0.get_capture(len(pkts))
1054 self.verify_capture_in(capture, self.pg0)
1056 def test_static_out(self):
1057 """ 1:1 NAT initialized from outside network """
1059 nat_ip = "10.0.0.20"
1060 self.tcp_port_out = 6303
1061 self.udp_port_out = 6304
1062 self.icmp_id_out = 6305
1064 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1065 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1066 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1070 pkts = self.create_stream_out(self.pg1, nat_ip)
1071 self.pg1.add_stream(pkts)
1072 self.pg_enable_capture(self.pg_interfaces)
1074 capture = self.pg0.get_capture(len(pkts))
1075 self.verify_capture_in(capture, self.pg0)
1078 pkts = self.create_stream_in(self.pg0, self.pg1)
1079 self.pg0.add_stream(pkts)
1080 self.pg_enable_capture(self.pg_interfaces)
1082 capture = self.pg1.get_capture(len(pkts))
1083 self.verify_capture_out(capture, nat_ip, True)
1085 def test_static_with_port_in(self):
1086 """ 1:1 NAPT initialized from inside network """
1088 self.tcp_port_out = 3606
1089 self.udp_port_out = 3607
1090 self.icmp_id_out = 3608
1092 self.nat44_add_address(self.nat_addr)
1093 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1094 self.tcp_port_in, self.tcp_port_out,
1095 proto=IP_PROTOS.tcp)
1096 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1097 self.udp_port_in, self.udp_port_out,
1098 proto=IP_PROTOS.udp)
1099 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1100 self.icmp_id_in, self.icmp_id_out,
1101 proto=IP_PROTOS.icmp)
1102 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1103 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1107 pkts = self.create_stream_in(self.pg0, self.pg1)
1108 self.pg0.add_stream(pkts)
1109 self.pg_enable_capture(self.pg_interfaces)
1111 capture = self.pg1.get_capture(len(pkts))
1112 self.verify_capture_out(capture)
1115 pkts = self.create_stream_out(self.pg1)
1116 self.pg1.add_stream(pkts)
1117 self.pg_enable_capture(self.pg_interfaces)
1119 capture = self.pg0.get_capture(len(pkts))
1120 self.verify_capture_in(capture, self.pg0)
1122 def test_static_with_port_out(self):
1123 """ 1:1 NAPT initialized from outside network """
1125 self.tcp_port_out = 30606
1126 self.udp_port_out = 30607
1127 self.icmp_id_out = 30608
1129 self.nat44_add_address(self.nat_addr)
1130 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1131 self.tcp_port_in, self.tcp_port_out,
1132 proto=IP_PROTOS.tcp)
1133 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1134 self.udp_port_in, self.udp_port_out,
1135 proto=IP_PROTOS.udp)
1136 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1137 self.icmp_id_in, self.icmp_id_out,
1138 proto=IP_PROTOS.icmp)
1139 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1140 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1144 pkts = self.create_stream_out(self.pg1)
1145 self.pg1.add_stream(pkts)
1146 self.pg_enable_capture(self.pg_interfaces)
1148 capture = self.pg0.get_capture(len(pkts))
1149 self.verify_capture_in(capture, self.pg0)
1152 pkts = self.create_stream_in(self.pg0, self.pg1)
1153 self.pg0.add_stream(pkts)
1154 self.pg_enable_capture(self.pg_interfaces)
1156 capture = self.pg1.get_capture(len(pkts))
1157 self.verify_capture_out(capture)
1159 def test_static_vrf_aware(self):
1160 """ 1:1 NAT VRF awareness """
1162 nat_ip1 = "10.0.0.30"
1163 nat_ip2 = "10.0.0.40"
1164 self.tcp_port_out = 6303
1165 self.udp_port_out = 6304
1166 self.icmp_id_out = 6305
1168 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1170 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1172 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1174 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1175 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1177 # inside interface VRF match NAT44 static mapping VRF
1178 pkts = self.create_stream_in(self.pg4, self.pg3)
1179 self.pg4.add_stream(pkts)
1180 self.pg_enable_capture(self.pg_interfaces)
1182 capture = self.pg3.get_capture(len(pkts))
1183 self.verify_capture_out(capture, nat_ip1, True)
1185 # inside interface VRF don't match NAT44 static mapping VRF (packets
1187 pkts = self.create_stream_in(self.pg0, self.pg3)
1188 self.pg0.add_stream(pkts)
1189 self.pg_enable_capture(self.pg_interfaces)
1191 self.pg3.assert_nothing_captured()
1193 def test_static_lb(self):
1194 """ NAT44 local service load balancing """
1195 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1198 server1 = self.pg0.remote_hosts[0]
1199 server2 = self.pg0.remote_hosts[1]
1201 locals = [{'addr': server1.ip4n,
1204 {'addr': server2.ip4n,
1208 self.nat44_add_address(self.nat_addr)
1209 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1212 local_num=len(locals),
1214 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1215 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1218 # from client to service
1219 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1220 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1221 TCP(sport=12345, dport=external_port))
1222 self.pg1.add_stream(p)
1223 self.pg_enable_capture(self.pg_interfaces)
1225 capture = self.pg0.get_capture(1)
1231 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1232 if ip.dst == server1.ip4:
1236 self.assertEqual(tcp.dport, local_port)
1237 self.check_tcp_checksum(p)
1238 self.check_ip_checksum(p)
1240 self.logger.error(ppp("Unexpected or invalid packet:", p))
1243 # from service back to client
1244 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1245 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1246 TCP(sport=local_port, dport=12345))
1247 self.pg0.add_stream(p)
1248 self.pg_enable_capture(self.pg_interfaces)
1250 capture = self.pg1.get_capture(1)
1255 self.assertEqual(ip.src, self.nat_addr)
1256 self.assertEqual(tcp.sport, external_port)
1257 self.check_tcp_checksum(p)
1258 self.check_ip_checksum(p)
1260 self.logger.error(ppp("Unexpected or invalid packet:", p))
1266 clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1268 for client in clients:
1269 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1270 IP(src=client, dst=self.nat_addr) /
1271 TCP(sport=12345, dport=external_port))
1273 self.pg1.add_stream(pkts)
1274 self.pg_enable_capture(self.pg_interfaces)
1276 capture = self.pg0.get_capture(len(pkts))
1278 if p[IP].dst == server1.ip4:
1282 self.assertTrue(server1_n > server2_n)
1284 def test_multiple_inside_interfaces(self):
1285 """ NAT44 multiple non-overlapping address space inside interfaces """
1287 self.nat44_add_address(self.nat_addr)
1288 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1289 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1290 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1293 # between two NAT44 inside interfaces (no translation)
1294 pkts = self.create_stream_in(self.pg0, self.pg1)
1295 self.pg0.add_stream(pkts)
1296 self.pg_enable_capture(self.pg_interfaces)
1298 capture = self.pg1.get_capture(len(pkts))
1299 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1301 # from NAT44 inside to interface without NAT44 feature (no translation)
1302 pkts = self.create_stream_in(self.pg0, self.pg2)
1303 self.pg0.add_stream(pkts)
1304 self.pg_enable_capture(self.pg_interfaces)
1306 capture = self.pg2.get_capture(len(pkts))
1307 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1309 # in2out 1st interface
1310 pkts = self.create_stream_in(self.pg0, self.pg3)
1311 self.pg0.add_stream(pkts)
1312 self.pg_enable_capture(self.pg_interfaces)
1314 capture = self.pg3.get_capture(len(pkts))
1315 self.verify_capture_out(capture)
1317 # out2in 1st interface
1318 pkts = self.create_stream_out(self.pg3)
1319 self.pg3.add_stream(pkts)
1320 self.pg_enable_capture(self.pg_interfaces)
1322 capture = self.pg0.get_capture(len(pkts))
1323 self.verify_capture_in(capture, self.pg0)
1325 # in2out 2nd interface
1326 pkts = self.create_stream_in(self.pg1, self.pg3)
1327 self.pg1.add_stream(pkts)
1328 self.pg_enable_capture(self.pg_interfaces)
1330 capture = self.pg3.get_capture(len(pkts))
1331 self.verify_capture_out(capture)
1333 # out2in 2nd interface
1334 pkts = self.create_stream_out(self.pg3)
1335 self.pg3.add_stream(pkts)
1336 self.pg_enable_capture(self.pg_interfaces)
1338 capture = self.pg1.get_capture(len(pkts))
1339 self.verify_capture_in(capture, self.pg1)
1341 def test_inside_overlapping_interfaces(self):
1342 """ NAT44 multiple inside interfaces with overlapping address space """
1344 static_nat_ip = "10.0.0.10"
1345 self.nat44_add_address(self.nat_addr)
1346 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1348 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1349 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1350 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1351 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1354 # between NAT44 inside interfaces with same VRF (no translation)
1355 pkts = self.create_stream_in(self.pg4, self.pg5)
1356 self.pg4.add_stream(pkts)
1357 self.pg_enable_capture(self.pg_interfaces)
1359 capture = self.pg5.get_capture(len(pkts))
1360 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1362 # between NAT44 inside interfaces with different VRF (hairpinning)
1363 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1364 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1365 TCP(sport=1234, dport=5678))
1366 self.pg4.add_stream(p)
1367 self.pg_enable_capture(self.pg_interfaces)
1369 capture = self.pg6.get_capture(1)
1374 self.assertEqual(ip.src, self.nat_addr)
1375 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1376 self.assertNotEqual(tcp.sport, 1234)
1377 self.assertEqual(tcp.dport, 5678)
1379 self.logger.error(ppp("Unexpected or invalid packet:", p))
1382 # in2out 1st interface
1383 pkts = self.create_stream_in(self.pg4, self.pg3)
1384 self.pg4.add_stream(pkts)
1385 self.pg_enable_capture(self.pg_interfaces)
1387 capture = self.pg3.get_capture(len(pkts))
1388 self.verify_capture_out(capture)
1390 # out2in 1st interface
1391 pkts = self.create_stream_out(self.pg3)
1392 self.pg3.add_stream(pkts)
1393 self.pg_enable_capture(self.pg_interfaces)
1395 capture = self.pg4.get_capture(len(pkts))
1396 self.verify_capture_in(capture, self.pg4)
1398 # in2out 2nd interface
1399 pkts = self.create_stream_in(self.pg5, self.pg3)
1400 self.pg5.add_stream(pkts)
1401 self.pg_enable_capture(self.pg_interfaces)
1403 capture = self.pg3.get_capture(len(pkts))
1404 self.verify_capture_out(capture)
1406 # out2in 2nd interface
1407 pkts = self.create_stream_out(self.pg3)
1408 self.pg3.add_stream(pkts)
1409 self.pg_enable_capture(self.pg_interfaces)
1411 capture = self.pg5.get_capture(len(pkts))
1412 self.verify_capture_in(capture, self.pg5)
1415 addresses = self.vapi.nat44_address_dump()
1416 self.assertEqual(len(addresses), 1)
1417 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1418 self.assertEqual(len(sessions), 3)
1419 for session in sessions:
1420 self.assertFalse(session.is_static)
1421 self.assertEqual(session.inside_ip_address[0:4],
1422 self.pg5.remote_ip4n)
1423 self.assertEqual(session.outside_ip_address,
1424 addresses[0].ip_address)
1425 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1426 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1427 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1428 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1429 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1430 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1431 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1432 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1433 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1435 # in2out 3rd interface
1436 pkts = self.create_stream_in(self.pg6, self.pg3)
1437 self.pg6.add_stream(pkts)
1438 self.pg_enable_capture(self.pg_interfaces)
1440 capture = self.pg3.get_capture(len(pkts))
1441 self.verify_capture_out(capture, static_nat_ip, True)
1443 # out2in 3rd interface
1444 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1445 self.pg3.add_stream(pkts)
1446 self.pg_enable_capture(self.pg_interfaces)
1448 capture = self.pg6.get_capture(len(pkts))
1449 self.verify_capture_in(capture, self.pg6)
1451 # general user and session dump verifications
1452 users = self.vapi.nat44_user_dump()
1453 self.assertTrue(len(users) >= 3)
1454 addresses = self.vapi.nat44_address_dump()
1455 self.assertEqual(len(addresses), 1)
1457 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1459 for session in sessions:
1460 self.assertEqual(user.ip_address, session.inside_ip_address)
1461 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1462 self.assertTrue(session.protocol in
1463 [IP_PROTOS.tcp, IP_PROTOS.udp,
1467 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1468 self.assertTrue(len(sessions) >= 4)
1469 for session in sessions:
1470 self.assertFalse(session.is_static)
1471 self.assertEqual(session.inside_ip_address[0:4],
1472 self.pg4.remote_ip4n)
1473 self.assertEqual(session.outside_ip_address,
1474 addresses[0].ip_address)
1477 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1478 self.assertTrue(len(sessions) >= 3)
1479 for session in sessions:
1480 self.assertTrue(session.is_static)
1481 self.assertEqual(session.inside_ip_address[0:4],
1482 self.pg6.remote_ip4n)
1483 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1484 map(int, static_nat_ip.split('.')))
1485 self.assertTrue(session.inside_port in
1486 [self.tcp_port_in, self.udp_port_in,
1489 def test_hairpinning(self):
1490 """ NAT44 hairpinning - 1:1 NAPT """
1492 host = self.pg0.remote_hosts[0]
1493 server = self.pg0.remote_hosts[1]
1496 server_in_port = 5678
1497 server_out_port = 8765
1499 self.nat44_add_address(self.nat_addr)
1500 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1501 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1503 # add static mapping for server
1504 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1505 server_in_port, server_out_port,
1506 proto=IP_PROTOS.tcp)
1508 # send packet from host to server
1509 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1510 IP(src=host.ip4, dst=self.nat_addr) /
1511 TCP(sport=host_in_port, dport=server_out_port))
1512 self.pg0.add_stream(p)
1513 self.pg_enable_capture(self.pg_interfaces)
1515 capture = self.pg0.get_capture(1)
1520 self.assertEqual(ip.src, self.nat_addr)
1521 self.assertEqual(ip.dst, server.ip4)
1522 self.assertNotEqual(tcp.sport, host_in_port)
1523 self.assertEqual(tcp.dport, server_in_port)
1524 self.check_tcp_checksum(p)
1525 host_out_port = tcp.sport
1527 self.logger.error(ppp("Unexpected or invalid packet:", p))
1530 # send reply from server to host
1531 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1532 IP(src=server.ip4, dst=self.nat_addr) /
1533 TCP(sport=server_in_port, dport=host_out_port))
1534 self.pg0.add_stream(p)
1535 self.pg_enable_capture(self.pg_interfaces)
1537 capture = self.pg0.get_capture(1)
1542 self.assertEqual(ip.src, self.nat_addr)
1543 self.assertEqual(ip.dst, host.ip4)
1544 self.assertEqual(tcp.sport, server_out_port)
1545 self.assertEqual(tcp.dport, host_in_port)
1546 self.check_tcp_checksum(p)
1548 self.logger.error(ppp("Unexpected or invalid packet:"), p)
1551 def test_hairpinning2(self):
1552 """ NAT44 hairpinning - 1:1 NAT"""
1554 server1_nat_ip = "10.0.0.10"
1555 server2_nat_ip = "10.0.0.11"
1556 host = self.pg0.remote_hosts[0]
1557 server1 = self.pg0.remote_hosts[1]
1558 server2 = self.pg0.remote_hosts[2]
1559 server_tcp_port = 22
1560 server_udp_port = 20
1562 self.nat44_add_address(self.nat_addr)
1563 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1564 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1567 # add static mapping for servers
1568 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1569 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1573 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1574 IP(src=host.ip4, dst=server1_nat_ip) /
1575 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1577 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1578 IP(src=host.ip4, dst=server1_nat_ip) /
1579 UDP(sport=self.udp_port_in, dport=server_udp_port))
1581 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1582 IP(src=host.ip4, dst=server1_nat_ip) /
1583 ICMP(id=self.icmp_id_in, type='echo-request'))
1585 self.pg0.add_stream(pkts)
1586 self.pg_enable_capture(self.pg_interfaces)
1588 capture = self.pg0.get_capture(len(pkts))
1589 for packet in capture:
1591 self.assertEqual(packet[IP].src, self.nat_addr)
1592 self.assertEqual(packet[IP].dst, server1.ip4)
1593 if packet.haslayer(TCP):
1594 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1595 self.assertEqual(packet[TCP].dport, server_tcp_port)
1596 self.tcp_port_out = packet[TCP].sport
1597 self.check_tcp_checksum(packet)
1598 elif packet.haslayer(UDP):
1599 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1600 self.assertEqual(packet[UDP].dport, server_udp_port)
1601 self.udp_port_out = packet[UDP].sport
1603 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1604 self.icmp_id_out = packet[ICMP].id
1606 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1611 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1612 IP(src=server1.ip4, dst=self.nat_addr) /
1613 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1615 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1616 IP(src=server1.ip4, dst=self.nat_addr) /
1617 UDP(sport=server_udp_port, dport=self.udp_port_out))
1619 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1620 IP(src=server1.ip4, dst=self.nat_addr) /
1621 ICMP(id=self.icmp_id_out, type='echo-reply'))
1623 self.pg0.add_stream(pkts)
1624 self.pg_enable_capture(self.pg_interfaces)
1626 capture = self.pg0.get_capture(len(pkts))
1627 for packet in capture:
1629 self.assertEqual(packet[IP].src, server1_nat_ip)
1630 self.assertEqual(packet[IP].dst, host.ip4)
1631 if packet.haslayer(TCP):
1632 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1633 self.assertEqual(packet[TCP].sport, server_tcp_port)
1634 self.check_tcp_checksum(packet)
1635 elif packet.haslayer(UDP):
1636 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1637 self.assertEqual(packet[UDP].sport, server_udp_port)
1639 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1641 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1644 # server2 to server1
1646 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1647 IP(src=server2.ip4, dst=server1_nat_ip) /
1648 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1650 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1651 IP(src=server2.ip4, dst=server1_nat_ip) /
1652 UDP(sport=self.udp_port_in, dport=server_udp_port))
1654 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1655 IP(src=server2.ip4, dst=server1_nat_ip) /
1656 ICMP(id=self.icmp_id_in, type='echo-request'))
1658 self.pg0.add_stream(pkts)
1659 self.pg_enable_capture(self.pg_interfaces)
1661 capture = self.pg0.get_capture(len(pkts))
1662 for packet in capture:
1664 self.assertEqual(packet[IP].src, server2_nat_ip)
1665 self.assertEqual(packet[IP].dst, server1.ip4)
1666 if packet.haslayer(TCP):
1667 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1668 self.assertEqual(packet[TCP].dport, server_tcp_port)
1669 self.tcp_port_out = packet[TCP].sport
1670 self.check_tcp_checksum(packet)
1671 elif packet.haslayer(UDP):
1672 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1673 self.assertEqual(packet[UDP].dport, server_udp_port)
1674 self.udp_port_out = packet[UDP].sport
1676 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1677 self.icmp_id_out = packet[ICMP].id
1679 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1682 # server1 to server2
1684 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1685 IP(src=server1.ip4, dst=server2_nat_ip) /
1686 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1688 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1689 IP(src=server1.ip4, dst=server2_nat_ip) /
1690 UDP(sport=server_udp_port, dport=self.udp_port_out))
1692 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1693 IP(src=server1.ip4, dst=server2_nat_ip) /
1694 ICMP(id=self.icmp_id_out, type='echo-reply'))
1696 self.pg0.add_stream(pkts)
1697 self.pg_enable_capture(self.pg_interfaces)
1699 capture = self.pg0.get_capture(len(pkts))
1700 for packet in capture:
1702 self.assertEqual(packet[IP].src, server1_nat_ip)
1703 self.assertEqual(packet[IP].dst, server2.ip4)
1704 if packet.haslayer(TCP):
1705 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1706 self.assertEqual(packet[TCP].sport, server_tcp_port)
1707 self.check_tcp_checksum(packet)
1708 elif packet.haslayer(UDP):
1709 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1710 self.assertEqual(packet[UDP].sport, server_udp_port)
1712 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1714 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1717 def test_max_translations_per_user(self):
1718 """ MAX translations per user - recycle the least recently used """
1720 self.nat44_add_address(self.nat_addr)
1721 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1722 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1725 # get maximum number of translations per user
1726 nat44_config = self.vapi.nat_show_config()
1728 # send more than maximum number of translations per user packets
1729 pkts_num = nat44_config.max_translations_per_user + 5
1731 for port in range(0, pkts_num):
1732 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1733 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1734 TCP(sport=1025 + port))
1736 self.pg0.add_stream(pkts)
1737 self.pg_enable_capture(self.pg_interfaces)
1740 # verify number of translated packet
1741 self.pg1.get_capture(pkts_num)
1743 def test_interface_addr(self):
1744 """ Acquire NAT44 addresses from interface """
1745 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1747 # no address in NAT pool
1748 adresses = self.vapi.nat44_address_dump()
1749 self.assertEqual(0, len(adresses))
1751 # configure interface address and check NAT address pool
1752 self.pg7.config_ip4()
1753 adresses = self.vapi.nat44_address_dump()
1754 self.assertEqual(1, len(adresses))
1755 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1757 # remove interface address and check NAT address pool
1758 self.pg7.unconfig_ip4()
1759 adresses = self.vapi.nat44_address_dump()
1760 self.assertEqual(0, len(adresses))
1762 def test_interface_addr_static_mapping(self):
1763 """ Static mapping with addresses from interface """
1764 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1765 self.nat44_add_static_mapping(
1767 external_sw_if_index=self.pg7.sw_if_index)
1769 # static mappings with external interface
1770 static_mappings = self.vapi.nat44_static_mapping_dump()
1771 self.assertEqual(1, len(static_mappings))
1772 self.assertEqual(self.pg7.sw_if_index,
1773 static_mappings[0].external_sw_if_index)
1775 # configure interface address and check static mappings
1776 self.pg7.config_ip4()
1777 static_mappings = self.vapi.nat44_static_mapping_dump()
1778 self.assertEqual(1, len(static_mappings))
1779 self.assertEqual(static_mappings[0].external_ip_address[0:4],
1780 self.pg7.local_ip4n)
1781 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1783 # remove interface address and check static mappings
1784 self.pg7.unconfig_ip4()
1785 static_mappings = self.vapi.nat44_static_mapping_dump()
1786 self.assertEqual(0, len(static_mappings))
1788 def test_ipfix_nat44_sess(self):
1789 """ IPFIX logging NAT44 session created/delted """
1790 self.ipfix_domain_id = 10
1791 self.ipfix_src_port = 20202
1792 colector_port = 30303
1793 bind_layers(UDP, IPFIX, dport=30303)
1794 self.nat44_add_address(self.nat_addr)
1795 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1796 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1798 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1799 src_address=self.pg3.local_ip4n,
1801 template_interval=10,
1802 collector_port=colector_port)
1803 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1804 src_port=self.ipfix_src_port)
1806 pkts = self.create_stream_in(self.pg0, self.pg1)
1807 self.pg0.add_stream(pkts)
1808 self.pg_enable_capture(self.pg_interfaces)
1810 capture = self.pg1.get_capture(len(pkts))
1811 self.verify_capture_out(capture)
1812 self.nat44_add_address(self.nat_addr, is_add=0)
1813 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1814 capture = self.pg3.get_capture(3)
1815 ipfix = IPFIXDecoder()
1816 # first load template
1818 self.assertTrue(p.haslayer(IPFIX))
1819 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1820 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1821 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1822 self.assertEqual(p[UDP].dport, colector_port)
1823 self.assertEqual(p[IPFIX].observationDomainID,
1824 self.ipfix_domain_id)
1825 if p.haslayer(Template):
1826 ipfix.add_template(p.getlayer(Template))
1827 # verify events in data set
1829 if p.haslayer(Data):
1830 data = ipfix.decode_data_set(p.getlayer(Set))
1831 self.verify_ipfix_nat44_ses(data)
1833 def test_ipfix_addr_exhausted(self):
1834 """ IPFIX logging NAT addresses exhausted """
1835 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1836 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1838 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1839 src_address=self.pg3.local_ip4n,
1841 template_interval=10)
1842 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1843 src_port=self.ipfix_src_port)
1845 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1846 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1848 self.pg0.add_stream(p)
1849 self.pg_enable_capture(self.pg_interfaces)
1851 capture = self.pg1.get_capture(0)
1852 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1853 capture = self.pg3.get_capture(3)
1854 ipfix = IPFIXDecoder()
1855 # first load template
1857 self.assertTrue(p.haslayer(IPFIX))
1858 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1859 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1860 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1861 self.assertEqual(p[UDP].dport, 4739)
1862 self.assertEqual(p[IPFIX].observationDomainID,
1863 self.ipfix_domain_id)
1864 if p.haslayer(Template):
1865 ipfix.add_template(p.getlayer(Template))
1866 # verify events in data set
1868 if p.haslayer(Data):
1869 data = ipfix.decode_data_set(p.getlayer(Set))
1870 self.verify_ipfix_addr_exhausted(data)
1872 def test_pool_addr_fib(self):
1873 """ NAT44 add pool addresses to FIB """
1874 static_addr = '10.0.0.10'
1875 self.nat44_add_address(self.nat_addr)
1876 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1877 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1879 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
1882 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1883 ARP(op=ARP.who_has, pdst=self.nat_addr,
1884 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1885 self.pg1.add_stream(p)
1886 self.pg_enable_capture(self.pg_interfaces)
1888 capture = self.pg1.get_capture(1)
1889 self.assertTrue(capture[0].haslayer(ARP))
1890 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1893 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1894 ARP(op=ARP.who_has, pdst=static_addr,
1895 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1896 self.pg1.add_stream(p)
1897 self.pg_enable_capture(self.pg_interfaces)
1899 capture = self.pg1.get_capture(1)
1900 self.assertTrue(capture[0].haslayer(ARP))
1901 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1903 # send ARP to non-NAT44 interface
1904 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1905 ARP(op=ARP.who_has, pdst=self.nat_addr,
1906 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1907 self.pg2.add_stream(p)
1908 self.pg_enable_capture(self.pg_interfaces)
1910 capture = self.pg1.get_capture(0)
1912 # remove addresses and verify
1913 self.nat44_add_address(self.nat_addr, is_add=0)
1914 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
1917 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1918 ARP(op=ARP.who_has, pdst=self.nat_addr,
1919 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1920 self.pg1.add_stream(p)
1921 self.pg_enable_capture(self.pg_interfaces)
1923 capture = self.pg1.get_capture(0)
1925 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1926 ARP(op=ARP.who_has, pdst=static_addr,
1927 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1928 self.pg1.add_stream(p)
1929 self.pg_enable_capture(self.pg_interfaces)
1931 capture = self.pg1.get_capture(0)
1933 def test_vrf_mode(self):
1934 """ NAT44 tenant VRF aware address pool mode """
1938 nat_ip1 = "10.0.0.10"
1939 nat_ip2 = "10.0.0.11"
1941 self.pg0.unconfig_ip4()
1942 self.pg1.unconfig_ip4()
1943 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
1944 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
1945 self.pg0.set_table_ip4(vrf_id1)
1946 self.pg1.set_table_ip4(vrf_id2)
1947 self.pg0.config_ip4()
1948 self.pg1.config_ip4()
1950 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
1951 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
1952 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1953 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1954 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1958 pkts = self.create_stream_in(self.pg0, self.pg2)
1959 self.pg0.add_stream(pkts)
1960 self.pg_enable_capture(self.pg_interfaces)
1962 capture = self.pg2.get_capture(len(pkts))
1963 self.verify_capture_out(capture, nat_ip1)
1966 pkts = self.create_stream_in(self.pg1, self.pg2)
1967 self.pg1.add_stream(pkts)
1968 self.pg_enable_capture(self.pg_interfaces)
1970 capture = self.pg2.get_capture(len(pkts))
1971 self.verify_capture_out(capture, nat_ip2)
1973 self.pg0.unconfig_ip4()
1974 self.pg1.unconfig_ip4()
1975 self.pg0.set_table_ip4(0)
1976 self.pg1.set_table_ip4(0)
1977 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
1978 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
1980 def test_vrf_feature_independent(self):
1981 """ NAT44 tenant VRF independent address pool mode """
1983 nat_ip1 = "10.0.0.10"
1984 nat_ip2 = "10.0.0.11"
1986 self.nat44_add_address(nat_ip1)
1987 self.nat44_add_address(nat_ip2)
1988 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1989 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1990 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1994 pkts = self.create_stream_in(self.pg0, self.pg2)
1995 self.pg0.add_stream(pkts)
1996 self.pg_enable_capture(self.pg_interfaces)
1998 capture = self.pg2.get_capture(len(pkts))
1999 self.verify_capture_out(capture, nat_ip1)
2002 pkts = self.create_stream_in(self.pg1, self.pg2)
2003 self.pg1.add_stream(pkts)
2004 self.pg_enable_capture(self.pg_interfaces)
2006 capture = self.pg2.get_capture(len(pkts))
2007 self.verify_capture_out(capture, nat_ip1)
2009 def test_dynamic_ipless_interfaces(self):
2010 """ NAT44 interfaces without configured IP address """
2012 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2013 mactobinary(self.pg7.remote_mac),
2014 self.pg7.remote_ip4n,
2016 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2017 mactobinary(self.pg8.remote_mac),
2018 self.pg8.remote_ip4n,
2021 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2022 dst_address_length=32,
2023 next_hop_address=self.pg7.remote_ip4n,
2024 next_hop_sw_if_index=self.pg7.sw_if_index)
2025 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2026 dst_address_length=32,
2027 next_hop_address=self.pg8.remote_ip4n,
2028 next_hop_sw_if_index=self.pg8.sw_if_index)
2030 self.nat44_add_address(self.nat_addr)
2031 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2032 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2036 pkts = self.create_stream_in(self.pg7, self.pg8)
2037 self.pg7.add_stream(pkts)
2038 self.pg_enable_capture(self.pg_interfaces)
2040 capture = self.pg8.get_capture(len(pkts))
2041 self.verify_capture_out(capture)
2044 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2045 self.pg8.add_stream(pkts)
2046 self.pg_enable_capture(self.pg_interfaces)
2048 capture = self.pg7.get_capture(len(pkts))
2049 self.verify_capture_in(capture, self.pg7)
2051 def test_static_ipless_interfaces(self):
2052 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2054 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2055 mactobinary(self.pg7.remote_mac),
2056 self.pg7.remote_ip4n,
2058 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2059 mactobinary(self.pg8.remote_mac),
2060 self.pg8.remote_ip4n,
2063 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2064 dst_address_length=32,
2065 next_hop_address=self.pg7.remote_ip4n,
2066 next_hop_sw_if_index=self.pg7.sw_if_index)
2067 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2068 dst_address_length=32,
2069 next_hop_address=self.pg8.remote_ip4n,
2070 next_hop_sw_if_index=self.pg8.sw_if_index)
2072 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2073 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2074 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2078 pkts = self.create_stream_out(self.pg8)
2079 self.pg8.add_stream(pkts)
2080 self.pg_enable_capture(self.pg_interfaces)
2082 capture = self.pg7.get_capture(len(pkts))
2083 self.verify_capture_in(capture, self.pg7)
2086 pkts = self.create_stream_in(self.pg7, self.pg8)
2087 self.pg7.add_stream(pkts)
2088 self.pg_enable_capture(self.pg_interfaces)
2090 capture = self.pg8.get_capture(len(pkts))
2091 self.verify_capture_out(capture, self.nat_addr, True)
2093 def test_static_with_port_ipless_interfaces(self):
2094 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2096 self.tcp_port_out = 30606
2097 self.udp_port_out = 30607
2098 self.icmp_id_out = 30608
2100 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2101 mactobinary(self.pg7.remote_mac),
2102 self.pg7.remote_ip4n,
2104 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2105 mactobinary(self.pg8.remote_mac),
2106 self.pg8.remote_ip4n,
2109 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2110 dst_address_length=32,
2111 next_hop_address=self.pg7.remote_ip4n,
2112 next_hop_sw_if_index=self.pg7.sw_if_index)
2113 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2114 dst_address_length=32,
2115 next_hop_address=self.pg8.remote_ip4n,
2116 next_hop_sw_if_index=self.pg8.sw_if_index)
2118 self.nat44_add_address(self.nat_addr)
2119 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2120 self.tcp_port_in, self.tcp_port_out,
2121 proto=IP_PROTOS.tcp)
2122 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2123 self.udp_port_in, self.udp_port_out,
2124 proto=IP_PROTOS.udp)
2125 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2126 self.icmp_id_in, self.icmp_id_out,
2127 proto=IP_PROTOS.icmp)
2128 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2129 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2133 pkts = self.create_stream_out(self.pg8)
2134 self.pg8.add_stream(pkts)
2135 self.pg_enable_capture(self.pg_interfaces)
2137 capture = self.pg7.get_capture(len(pkts))
2138 self.verify_capture_in(capture, self.pg7)
2141 pkts = self.create_stream_in(self.pg7, self.pg8)
2142 self.pg7.add_stream(pkts)
2143 self.pg_enable_capture(self.pg_interfaces)
2145 capture = self.pg8.get_capture(len(pkts))
2146 self.verify_capture_out(capture)
2148 def test_static_unknown_proto(self):
2149 """ 1:1 NAT translate packet with unknown protocol """
2150 nat_ip = "10.0.0.10"
2151 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2152 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2153 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2157 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2158 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2160 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2161 TCP(sport=1234, dport=1234))
2162 self.pg0.add_stream(p)
2163 self.pg_enable_capture(self.pg_interfaces)
2165 p = self.pg1.get_capture(1)
2168 self.assertEqual(packet[IP].src, nat_ip)
2169 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2170 self.assertTrue(packet.haslayer(GRE))
2171 self.check_ip_checksum(packet)
2173 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2177 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2178 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2180 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2181 TCP(sport=1234, dport=1234))
2182 self.pg1.add_stream(p)
2183 self.pg_enable_capture(self.pg_interfaces)
2185 p = self.pg0.get_capture(1)
2188 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2189 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2190 self.assertTrue(packet.haslayer(GRE))
2191 self.check_ip_checksum(packet)
2193 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2196 def test_hairpinning_static_unknown_proto(self):
2197 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2199 host = self.pg0.remote_hosts[0]
2200 server = self.pg0.remote_hosts[1]
2202 host_nat_ip = "10.0.0.10"
2203 server_nat_ip = "10.0.0.11"
2205 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2206 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2207 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2208 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2212 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2213 IP(src=host.ip4, dst=server_nat_ip) /
2215 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2216 TCP(sport=1234, dport=1234))
2217 self.pg0.add_stream(p)
2218 self.pg_enable_capture(self.pg_interfaces)
2220 p = self.pg0.get_capture(1)
2223 self.assertEqual(packet[IP].src, host_nat_ip)
2224 self.assertEqual(packet[IP].dst, server.ip4)
2225 self.assertTrue(packet.haslayer(GRE))
2226 self.check_ip_checksum(packet)
2228 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2232 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2233 IP(src=server.ip4, dst=host_nat_ip) /
2235 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2236 TCP(sport=1234, dport=1234))
2237 self.pg0.add_stream(p)
2238 self.pg_enable_capture(self.pg_interfaces)
2240 p = self.pg0.get_capture(1)
2243 self.assertEqual(packet[IP].src, server_nat_ip)
2244 self.assertEqual(packet[IP].dst, host.ip4)
2245 self.assertTrue(packet.haslayer(GRE))
2246 self.check_ip_checksum(packet)
2248 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2251 def test_unknown_proto(self):
2252 """ NAT44 translate packet with unknown protocol """
2253 self.nat44_add_address(self.nat_addr)
2254 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2255 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2259 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2260 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2261 TCP(sport=self.tcp_port_in, dport=20))
2262 self.pg0.add_stream(p)
2263 self.pg_enable_capture(self.pg_interfaces)
2265 p = self.pg1.get_capture(1)
2267 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2268 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2270 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2271 TCP(sport=1234, dport=1234))
2272 self.pg0.add_stream(p)
2273 self.pg_enable_capture(self.pg_interfaces)
2275 p = self.pg1.get_capture(1)
2278 self.assertEqual(packet[IP].src, self.nat_addr)
2279 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2280 self.assertTrue(packet.haslayer(GRE))
2281 self.check_ip_checksum(packet)
2283 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2287 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2288 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2290 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2291 TCP(sport=1234, dport=1234))
2292 self.pg1.add_stream(p)
2293 self.pg_enable_capture(self.pg_interfaces)
2295 p = self.pg0.get_capture(1)
2298 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2299 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2300 self.assertTrue(packet.haslayer(GRE))
2301 self.check_ip_checksum(packet)
2303 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2306 def test_hairpinning_unknown_proto(self):
2307 """ NAT44 translate packet with unknown protocol - hairpinning """
2308 host = self.pg0.remote_hosts[0]
2309 server = self.pg0.remote_hosts[1]
2312 server_in_port = 5678
2313 server_out_port = 8765
2314 server_nat_ip = "10.0.0.11"
2316 self.nat44_add_address(self.nat_addr)
2317 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2318 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2321 # add static mapping for server
2322 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2325 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2326 IP(src=host.ip4, dst=server_nat_ip) /
2327 TCP(sport=host_in_port, dport=server_out_port))
2328 self.pg0.add_stream(p)
2329 self.pg_enable_capture(self.pg_interfaces)
2331 capture = self.pg0.get_capture(1)
2333 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2334 IP(src=host.ip4, dst=server_nat_ip) /
2336 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2337 TCP(sport=1234, dport=1234))
2338 self.pg0.add_stream(p)
2339 self.pg_enable_capture(self.pg_interfaces)
2341 p = self.pg0.get_capture(1)
2344 self.assertEqual(packet[IP].src, self.nat_addr)
2345 self.assertEqual(packet[IP].dst, server.ip4)
2346 self.assertTrue(packet.haslayer(GRE))
2347 self.check_ip_checksum(packet)
2349 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2353 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2354 IP(src=server.ip4, dst=self.nat_addr) /
2356 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2357 TCP(sport=1234, dport=1234))
2358 self.pg0.add_stream(p)
2359 self.pg_enable_capture(self.pg_interfaces)
2361 p = self.pg0.get_capture(1)
2364 self.assertEqual(packet[IP].src, server_nat_ip)
2365 self.assertEqual(packet[IP].dst, host.ip4)
2366 self.assertTrue(packet.haslayer(GRE))
2367 self.check_ip_checksum(packet)
2369 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2372 def test_output_feature(self):
2373 """ NAT44 interface output feature (in2out postrouting) """
2374 self.nat44_add_address(self.nat_addr)
2375 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2376 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2377 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2381 pkts = self.create_stream_in(self.pg0, self.pg3)
2382 self.pg0.add_stream(pkts)
2383 self.pg_enable_capture(self.pg_interfaces)
2385 capture = self.pg3.get_capture(len(pkts))
2386 self.verify_capture_out(capture)
2389 pkts = self.create_stream_out(self.pg3)
2390 self.pg3.add_stream(pkts)
2391 self.pg_enable_capture(self.pg_interfaces)
2393 capture = self.pg0.get_capture(len(pkts))
2394 self.verify_capture_in(capture, self.pg0)
2396 # from non-NAT interface to NAT inside interface
2397 pkts = self.create_stream_in(self.pg2, self.pg0)
2398 self.pg2.add_stream(pkts)
2399 self.pg_enable_capture(self.pg_interfaces)
2401 capture = self.pg0.get_capture(len(pkts))
2402 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2404 def test_output_feature_vrf_aware(self):
2405 """ NAT44 interface output feature VRF aware (in2out postrouting) """
2406 nat_ip_vrf10 = "10.0.0.10"
2407 nat_ip_vrf20 = "10.0.0.20"
2409 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2410 dst_address_length=32,
2411 next_hop_address=self.pg3.remote_ip4n,
2412 next_hop_sw_if_index=self.pg3.sw_if_index,
2414 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2415 dst_address_length=32,
2416 next_hop_address=self.pg3.remote_ip4n,
2417 next_hop_sw_if_index=self.pg3.sw_if_index,
2420 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2421 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2422 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2423 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2424 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2428 pkts = self.create_stream_in(self.pg4, self.pg3)
2429 self.pg4.add_stream(pkts)
2430 self.pg_enable_capture(self.pg_interfaces)
2432 capture = self.pg3.get_capture(len(pkts))
2433 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2436 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2437 self.pg3.add_stream(pkts)
2438 self.pg_enable_capture(self.pg_interfaces)
2440 capture = self.pg4.get_capture(len(pkts))
2441 self.verify_capture_in(capture, self.pg4)
2444 pkts = self.create_stream_in(self.pg6, self.pg3)
2445 self.pg6.add_stream(pkts)
2446 self.pg_enable_capture(self.pg_interfaces)
2448 capture = self.pg3.get_capture(len(pkts))
2449 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2452 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2453 self.pg3.add_stream(pkts)
2454 self.pg_enable_capture(self.pg_interfaces)
2456 capture = self.pg6.get_capture(len(pkts))
2457 self.verify_capture_in(capture, self.pg6)
2459 def test_output_feature_hairpinning(self):
2460 """ NAT44 interface output feature hairpinning (in2out postrouting) """
2461 host = self.pg0.remote_hosts[0]
2462 server = self.pg0.remote_hosts[1]
2465 server_in_port = 5678
2466 server_out_port = 8765
2468 self.nat44_add_address(self.nat_addr)
2469 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2470 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2473 # add static mapping for server
2474 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2475 server_in_port, server_out_port,
2476 proto=IP_PROTOS.tcp)
2478 # send packet from host to server
2479 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2480 IP(src=host.ip4, dst=self.nat_addr) /
2481 TCP(sport=host_in_port, dport=server_out_port))
2482 self.pg0.add_stream(p)
2483 self.pg_enable_capture(self.pg_interfaces)
2485 capture = self.pg0.get_capture(1)
2490 self.assertEqual(ip.src, self.nat_addr)
2491 self.assertEqual(ip.dst, server.ip4)
2492 self.assertNotEqual(tcp.sport, host_in_port)
2493 self.assertEqual(tcp.dport, server_in_port)
2494 self.check_tcp_checksum(p)
2495 host_out_port = tcp.sport
2497 self.logger.error(ppp("Unexpected or invalid packet:", p))
2500 # send reply from server to host
2501 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2502 IP(src=server.ip4, dst=self.nat_addr) /
2503 TCP(sport=server_in_port, dport=host_out_port))
2504 self.pg0.add_stream(p)
2505 self.pg_enable_capture(self.pg_interfaces)
2507 capture = self.pg0.get_capture(1)
2512 self.assertEqual(ip.src, self.nat_addr)
2513 self.assertEqual(ip.dst, host.ip4)
2514 self.assertEqual(tcp.sport, server_out_port)
2515 self.assertEqual(tcp.dport, host_in_port)
2516 self.check_tcp_checksum(p)
2518 self.logger.error(ppp("Unexpected or invalid packet:"), p)
2521 def test_one_armed_nat44(self):
2522 """ One armed NAT44 """
2523 remote_host = self.pg9.remote_hosts[0]
2524 local_host = self.pg9.remote_hosts[1]
2527 self.nat44_add_address(self.nat_addr)
2528 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2529 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2533 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2534 IP(src=local_host.ip4, dst=remote_host.ip4) /
2535 TCP(sport=12345, dport=80))
2536 self.pg9.add_stream(p)
2537 self.pg_enable_capture(self.pg_interfaces)
2539 capture = self.pg9.get_capture(1)
2544 self.assertEqual(ip.src, self.nat_addr)
2545 self.assertEqual(ip.dst, remote_host.ip4)
2546 self.assertNotEqual(tcp.sport, 12345)
2547 external_port = tcp.sport
2548 self.assertEqual(tcp.dport, 80)
2549 self.check_tcp_checksum(p)
2550 self.check_ip_checksum(p)
2552 self.logger.error(ppp("Unexpected or invalid packet:", p))
2556 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2557 IP(src=remote_host.ip4, dst=self.nat_addr) /
2558 TCP(sport=80, dport=external_port))
2559 self.pg9.add_stream(p)
2560 self.pg_enable_capture(self.pg_interfaces)
2562 capture = self.pg9.get_capture(1)
2567 self.assertEqual(ip.src, remote_host.ip4)
2568 self.assertEqual(ip.dst, local_host.ip4)
2569 self.assertEqual(tcp.sport, 80)
2570 self.assertEqual(tcp.dport, 12345)
2571 self.check_tcp_checksum(p)
2572 self.check_ip_checksum(p)
2574 self.logger.error(ppp("Unexpected or invalid packet:", p))
2577 def test_del_session(self):
2578 """ Delete NAT44 session """
2579 self.nat44_add_address(self.nat_addr)
2580 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2581 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2584 pkts = self.create_stream_in(self.pg0, self.pg1)
2585 self.pg0.add_stream(pkts)
2586 self.pg_enable_capture(self.pg_interfaces)
2588 capture = self.pg1.get_capture(len(pkts))
2590 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2591 nsessions = len(sessions)
2593 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2594 sessions[0].inside_port,
2595 sessions[0].protocol)
2596 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2597 sessions[1].outside_port,
2598 sessions[1].protocol,
2601 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2602 self.assertEqual(nsessions - len(sessions), 2)
2604 def test_set_get_reass(self):
2605 """ NAT44 set/get virtual fragmentation reassembly """
2606 reas_cfg1 = self.vapi.nat_get_reass()
2608 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
2609 max_reass=reas_cfg1.ip4_max_reass * 2,
2610 max_frag=reas_cfg1.ip4_max_frag * 2)
2612 reas_cfg2 = self.vapi.nat_get_reass()
2614 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
2615 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
2616 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
2618 self.vapi.nat_set_reass(drop_frag=1)
2619 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
2621 def test_frag_in_order(self):
2622 """ NAT44 translate fragments arriving in order """
2623 self.nat44_add_address(self.nat_addr)
2624 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2625 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2628 data = "A" * 4 + "B" * 16 + "C" * 3
2629 self.tcp_port_in = random.randint(1025, 65535)
2631 reass = self.vapi.nat_reass_dump()
2632 reass_n_start = len(reass)
2635 pkts = self.create_stream_frag(self.pg0,
2636 self.pg1.remote_ip4,
2640 self.pg0.add_stream(pkts)
2641 self.pg_enable_capture(self.pg_interfaces)
2643 frags = self.pg1.get_capture(len(pkts))
2644 p = self.reass_frags_and_verify(frags,
2646 self.pg1.remote_ip4)
2647 self.assertEqual(p[TCP].dport, 20)
2648 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2649 self.tcp_port_out = p[TCP].sport
2650 self.assertEqual(data, p[Raw].load)
2653 pkts = self.create_stream_frag(self.pg1,
2658 self.pg1.add_stream(pkts)
2659 self.pg_enable_capture(self.pg_interfaces)
2661 frags = self.pg0.get_capture(len(pkts))
2662 p = self.reass_frags_and_verify(frags,
2663 self.pg1.remote_ip4,
2664 self.pg0.remote_ip4)
2665 self.assertEqual(p[TCP].sport, 20)
2666 self.assertEqual(p[TCP].dport, self.tcp_port_in)
2667 self.assertEqual(data, p[Raw].load)
2669 reass = self.vapi.nat_reass_dump()
2670 reass_n_end = len(reass)
2672 self.assertEqual(reass_n_end - reass_n_start, 2)
2674 def test_reass_hairpinning(self):
2675 """ NAT44 fragments hairpinning """
2676 host = self.pg0.remote_hosts[0]
2677 server = self.pg0.remote_hosts[1]
2678 host_in_port = random.randint(1025, 65535)
2680 server_in_port = random.randint(1025, 65535)
2681 server_out_port = random.randint(1025, 65535)
2682 data = "A" * 4 + "B" * 16 + "C" * 3
2684 self.nat44_add_address(self.nat_addr)
2685 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2686 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2688 # add static mapping for server
2689 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2690 server_in_port, server_out_port,
2691 proto=IP_PROTOS.tcp)
2693 # send packet from host to server
2694 pkts = self.create_stream_frag(self.pg0,
2699 self.pg0.add_stream(pkts)
2700 self.pg_enable_capture(self.pg_interfaces)
2702 frags = self.pg0.get_capture(len(pkts))
2703 p = self.reass_frags_and_verify(frags,
2706 self.assertNotEqual(p[TCP].sport, host_in_port)
2707 self.assertEqual(p[TCP].dport, server_in_port)
2708 self.assertEqual(data, p[Raw].load)
2710 def test_frag_out_of_order(self):
2711 """ NAT44 translate fragments arriving out of order """
2712 self.nat44_add_address(self.nat_addr)
2713 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2714 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2717 data = "A" * 4 + "B" * 16 + "C" * 3
2718 random.randint(1025, 65535)
2721 pkts = self.create_stream_frag(self.pg0,
2722 self.pg1.remote_ip4,
2727 self.pg0.add_stream(pkts)
2728 self.pg_enable_capture(self.pg_interfaces)
2730 frags = self.pg1.get_capture(len(pkts))
2731 p = self.reass_frags_and_verify(frags,
2733 self.pg1.remote_ip4)
2734 self.assertEqual(p[TCP].dport, 20)
2735 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2736 self.tcp_port_out = p[TCP].sport
2737 self.assertEqual(data, p[Raw].load)
2740 pkts = self.create_stream_frag(self.pg1,
2746 self.pg1.add_stream(pkts)
2747 self.pg_enable_capture(self.pg_interfaces)
2749 frags = self.pg0.get_capture(len(pkts))
2750 p = self.reass_frags_and_verify(frags,
2751 self.pg1.remote_ip4,
2752 self.pg0.remote_ip4)
2753 self.assertEqual(p[TCP].sport, 20)
2754 self.assertEqual(p[TCP].dport, self.tcp_port_in)
2755 self.assertEqual(data, p[Raw].load)
2757 def test_port_restricted(self):
2758 """ Port restricted NAT44 (MAP-E CE) """
2759 self.nat44_add_address(self.nat_addr)
2760 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2761 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2763 self.vapi.cli("nat44 addr-port-assignment-alg map-e psid 10 "
2764 "psid-offset 6 psid-len 6")
2766 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2767 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2768 TCP(sport=4567, dport=22))
2769 self.pg0.add_stream(p)
2770 self.pg_enable_capture(self.pg_interfaces)
2772 capture = self.pg1.get_capture(1)
2777 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2778 self.assertEqual(ip.src, self.nat_addr)
2779 self.assertEqual(tcp.dport, 22)
2780 self.assertNotEqual(tcp.sport, 4567)
2781 self.assertEqual((tcp.sport >> 6) & 63, 10)
2782 self.check_tcp_checksum(p)
2783 self.check_ip_checksum(p)
2785 self.logger.error(ppp("Unexpected or invalid packet:", p))
2789 super(TestNAT44, self).tearDown()
2790 if not self.vpp_dead:
2791 self.logger.info(self.vapi.cli("show nat44 verbose"))
2792 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
2793 self.vapi.cli("nat44 addr-port-assignment-alg default")
2797 class TestDeterministicNAT(MethodHolder):
2798 """ Deterministic NAT Test Cases """
2801 def setUpConstants(cls):
2802 super(TestDeterministicNAT, cls).setUpConstants()
2803 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
2806 def setUpClass(cls):
2807 super(TestDeterministicNAT, cls).setUpClass()
2810 cls.tcp_port_in = 6303
2811 cls.tcp_external_port = 6303
2812 cls.udp_port_in = 6304
2813 cls.udp_external_port = 6304
2814 cls.icmp_id_in = 6305
2815 cls.nat_addr = '10.0.0.3'
2817 cls.create_pg_interfaces(range(3))
2818 cls.interfaces = list(cls.pg_interfaces)
2820 for i in cls.interfaces:
2825 cls.pg0.generate_remote_hosts(2)
2826 cls.pg0.configure_ipv4_neighbors()
2829 super(TestDeterministicNAT, cls).tearDownClass()
2832 def create_stream_in(self, in_if, out_if, ttl=64):
2834 Create packet stream for inside network
2836 :param in_if: Inside interface
2837 :param out_if: Outside interface
2838 :param ttl: TTL of generated packets
2842 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2843 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2844 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
2848 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2849 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2850 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2854 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2855 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2856 ICMP(id=self.icmp_id_in, type='echo-request'))
2861 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2863 Create packet stream for outside network
2865 :param out_if: Outside interface
2866 :param dst_ip: Destination IP address (Default use global NAT address)
2867 :param ttl: TTL of generated packets
2870 dst_ip = self.nat_addr
2873 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2874 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2875 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2879 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2880 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2881 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2885 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2886 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2887 ICMP(id=self.icmp_external_id, type='echo-reply'))
2892 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2894 Verify captured packets on outside network
2896 :param capture: Captured packets
2897 :param nat_ip: Translated IP address (Default use global NAT address)
2898 :param same_port: Sorce port number is not translated (Default False)
2899 :param packet_num: Expected number of packets (Default 3)
2902 nat_ip = self.nat_addr
2903 self.assertEqual(packet_num, len(capture))
2904 for packet in capture:
2906 self.assertEqual(packet[IP].src, nat_ip)
2907 if packet.haslayer(TCP):
2908 self.tcp_port_out = packet[TCP].sport
2909 elif packet.haslayer(UDP):
2910 self.udp_port_out = packet[UDP].sport
2912 self.icmp_external_id = packet[ICMP].id
2914 self.logger.error(ppp("Unexpected or invalid packet "
2915 "(outside network):", packet))
2918 def initiate_tcp_session(self, in_if, out_if):
2920 Initiates TCP session
2922 :param in_if: Inside interface
2923 :param out_if: Outside interface
2926 # SYN packet in->out
2927 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2928 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2929 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2932 self.pg_enable_capture(self.pg_interfaces)
2934 capture = out_if.get_capture(1)
2936 self.tcp_port_out = p[TCP].sport
2938 # SYN + ACK packet out->in
2939 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2940 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
2941 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2943 out_if.add_stream(p)
2944 self.pg_enable_capture(self.pg_interfaces)
2946 in_if.get_capture(1)
2948 # ACK packet in->out
2949 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2950 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2951 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2954 self.pg_enable_capture(self.pg_interfaces)
2956 out_if.get_capture(1)
2959 self.logger.error("TCP 3 way handshake failed")
2962 def verify_ipfix_max_entries_per_user(self, data):
2964 Verify IPFIX maximum entries per user exceeded event
2966 :param data: Decoded IPFIX data records
2968 self.assertEqual(1, len(data))
2971 self.assertEqual(ord(record[230]), 13)
2972 # natQuotaExceededEvent
2973 self.assertEqual('\x03\x00\x00\x00', record[466])
2975 self.assertEqual(self.pg0.remote_ip4n, record[8])
2977 def test_deterministic_mode(self):
2978 """ NAT plugin run deterministic mode """
2979 in_addr = '172.16.255.0'
2980 out_addr = '172.17.255.50'
2981 in_addr_t = '172.16.255.20'
2982 in_addr_n = socket.inet_aton(in_addr)
2983 out_addr_n = socket.inet_aton(out_addr)
2984 in_addr_t_n = socket.inet_aton(in_addr_t)
2988 nat_config = self.vapi.nat_show_config()
2989 self.assertEqual(1, nat_config.deterministic)
2991 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
2993 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
2994 self.assertEqual(rep1.out_addr[:4], out_addr_n)
2995 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
2996 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2998 deterministic_mappings = self.vapi.nat_det_map_dump()
2999 self.assertEqual(len(deterministic_mappings), 1)
3000 dsm = deterministic_mappings[0]
3001 self.assertEqual(in_addr_n, dsm.in_addr[:4])
3002 self.assertEqual(in_plen, dsm.in_plen)
3003 self.assertEqual(out_addr_n, dsm.out_addr[:4])
3004 self.assertEqual(out_plen, dsm.out_plen)
3006 self.clear_nat_det()
3007 deterministic_mappings = self.vapi.nat_det_map_dump()
3008 self.assertEqual(len(deterministic_mappings), 0)
3010 def test_set_timeouts(self):
3011 """ Set deterministic NAT timeouts """
3012 timeouts_before = self.vapi.nat_det_get_timeouts()
3014 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
3015 timeouts_before.tcp_established + 10,
3016 timeouts_before.tcp_transitory + 10,
3017 timeouts_before.icmp + 10)
3019 timeouts_after = self.vapi.nat_det_get_timeouts()
3021 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
3022 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
3023 self.assertNotEqual(timeouts_before.tcp_established,
3024 timeouts_after.tcp_established)
3025 self.assertNotEqual(timeouts_before.tcp_transitory,
3026 timeouts_after.tcp_transitory)
3028 def test_det_in(self):
3029 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
3031 nat_ip = "10.0.0.10"
3033 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3035 socket.inet_aton(nat_ip),
3037 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3038 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3042 pkts = self.create_stream_in(self.pg0, self.pg1)
3043 self.pg0.add_stream(pkts)
3044 self.pg_enable_capture(self.pg_interfaces)
3046 capture = self.pg1.get_capture(len(pkts))
3047 self.verify_capture_out(capture, nat_ip)
3050 pkts = self.create_stream_out(self.pg1, nat_ip)
3051 self.pg1.add_stream(pkts)
3052 self.pg_enable_capture(self.pg_interfaces)
3054 capture = self.pg0.get_capture(len(pkts))
3055 self.verify_capture_in(capture, self.pg0)
3058 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
3059 self.assertEqual(len(sessions), 3)
3063 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3064 self.assertEqual(s.in_port, self.tcp_port_in)
3065 self.assertEqual(s.out_port, self.tcp_port_out)
3066 self.assertEqual(s.ext_port, self.tcp_external_port)
3070 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3071 self.assertEqual(s.in_port, self.udp_port_in)
3072 self.assertEqual(s.out_port, self.udp_port_out)
3073 self.assertEqual(s.ext_port, self.udp_external_port)
3077 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3078 self.assertEqual(s.in_port, self.icmp_id_in)
3079 self.assertEqual(s.out_port, self.icmp_external_id)
3081 def test_multiple_users(self):
3082 """ Deterministic NAT multiple users """
3084 nat_ip = "10.0.0.10"
3086 external_port = 6303
3088 host0 = self.pg0.remote_hosts[0]
3089 host1 = self.pg0.remote_hosts[1]
3091 self.vapi.nat_det_add_del_map(host0.ip4n,
3093 socket.inet_aton(nat_ip),
3095 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3096 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3100 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
3101 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
3102 TCP(sport=port_in, dport=external_port))
3103 self.pg0.add_stream(p)
3104 self.pg_enable_capture(self.pg_interfaces)
3106 capture = self.pg1.get_capture(1)
3111 self.assertEqual(ip.src, nat_ip)
3112 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3113 self.assertEqual(tcp.dport, external_port)
3114 port_out0 = tcp.sport
3116 self.logger.error(ppp("Unexpected or invalid packet:", p))
3120 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
3121 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
3122 TCP(sport=port_in, dport=external_port))
3123 self.pg0.add_stream(p)
3124 self.pg_enable_capture(self.pg_interfaces)
3126 capture = self.pg1.get_capture(1)
3131 self.assertEqual(ip.src, nat_ip)
3132 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3133 self.assertEqual(tcp.dport, external_port)
3134 port_out1 = tcp.sport
3136 self.logger.error(ppp("Unexpected or invalid packet:", p))
3139 dms = self.vapi.nat_det_map_dump()
3140 self.assertEqual(1, len(dms))
3141 self.assertEqual(2, dms[0].ses_num)
3144 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3145 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3146 TCP(sport=external_port, dport=port_out0))
3147 self.pg1.add_stream(p)
3148 self.pg_enable_capture(self.pg_interfaces)
3150 capture = self.pg0.get_capture(1)
3155 self.assertEqual(ip.src, self.pg1.remote_ip4)
3156 self.assertEqual(ip.dst, host0.ip4)
3157 self.assertEqual(tcp.dport, port_in)
3158 self.assertEqual(tcp.sport, external_port)
3160 self.logger.error(ppp("Unexpected or invalid packet:", p))
3164 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3165 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3166 TCP(sport=external_port, dport=port_out1))
3167 self.pg1.add_stream(p)
3168 self.pg_enable_capture(self.pg_interfaces)
3170 capture = self.pg0.get_capture(1)
3175 self.assertEqual(ip.src, self.pg1.remote_ip4)
3176 self.assertEqual(ip.dst, host1.ip4)
3177 self.assertEqual(tcp.dport, port_in)
3178 self.assertEqual(tcp.sport, external_port)
3180 self.logger.error(ppp("Unexpected or invalid packet", p))
3183 # session close api test
3184 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
3186 self.pg1.remote_ip4n,
3188 dms = self.vapi.nat_det_map_dump()
3189 self.assertEqual(dms[0].ses_num, 1)
3191 self.vapi.nat_det_close_session_in(host0.ip4n,
3193 self.pg1.remote_ip4n,
3195 dms = self.vapi.nat_det_map_dump()
3196 self.assertEqual(dms[0].ses_num, 0)
3198 def test_tcp_session_close_detection_in(self):
3199 """ Deterministic NAT TCP session close from inside network """
3200 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3202 socket.inet_aton(self.nat_addr),
3204 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3205 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3208 self.initiate_tcp_session(self.pg0, self.pg1)
3210 # close the session from inside
3212 # FIN packet in -> out
3213 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3214 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3215 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3217 self.pg0.add_stream(p)
3218 self.pg_enable_capture(self.pg_interfaces)
3220 self.pg1.get_capture(1)
3224 # ACK packet out -> in
3225 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3226 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3227 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3231 # FIN packet out -> in
3232 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3233 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3234 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3238 self.pg1.add_stream(pkts)
3239 self.pg_enable_capture(self.pg_interfaces)
3241 self.pg0.get_capture(2)
3243 # ACK packet in -> out
3244 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3245 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3246 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3248 self.pg0.add_stream(p)
3249 self.pg_enable_capture(self.pg_interfaces)
3251 self.pg1.get_capture(1)
3253 # Check if deterministic NAT44 closed the session
3254 dms = self.vapi.nat_det_map_dump()
3255 self.assertEqual(0, dms[0].ses_num)
3257 self.logger.error("TCP session termination failed")
3260 def test_tcp_session_close_detection_out(self):
3261 """ Deterministic NAT TCP session close from outside network """
3262 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3264 socket.inet_aton(self.nat_addr),
3266 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3267 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3270 self.initiate_tcp_session(self.pg0, self.pg1)
3272 # close the session from outside
3274 # FIN packet out -> in
3275 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3276 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3277 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3279 self.pg1.add_stream(p)
3280 self.pg_enable_capture(self.pg_interfaces)
3282 self.pg0.get_capture(1)
3286 # ACK packet in -> out
3287 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3288 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3289 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3293 # ACK packet in -> out
3294 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3295 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3296 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3300 self.pg0.add_stream(pkts)
3301 self.pg_enable_capture(self.pg_interfaces)
3303 self.pg1.get_capture(2)
3305 # ACK packet out -> in
3306 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3307 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3308 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3310 self.pg1.add_stream(p)
3311 self.pg_enable_capture(self.pg_interfaces)
3313 self.pg0.get_capture(1)
3315 # Check if deterministic NAT44 closed the session
3316 dms = self.vapi.nat_det_map_dump()
3317 self.assertEqual(0, dms[0].ses_num)
3319 self.logger.error("TCP session termination failed")
3322 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3323 def test_session_timeout(self):
3324 """ Deterministic NAT session timeouts """
3325 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3327 socket.inet_aton(self.nat_addr),
3329 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3330 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3333 self.initiate_tcp_session(self.pg0, self.pg1)
3334 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
3335 pkts = self.create_stream_in(self.pg0, self.pg1)
3336 self.pg0.add_stream(pkts)
3337 self.pg_enable_capture(self.pg_interfaces)
3339 capture = self.pg1.get_capture(len(pkts))
3342 dms = self.vapi.nat_det_map_dump()
3343 self.assertEqual(0, dms[0].ses_num)
3345 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3346 def test_session_limit_per_user(self):
3347 """ Deterministic NAT maximum sessions per user limit """
3348 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3350 socket.inet_aton(self.nat_addr),
3352 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3353 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3355 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
3356 src_address=self.pg2.local_ip4n,
3358 template_interval=10)
3359 self.vapi.nat_ipfix()
3362 for port in range(1025, 2025):
3363 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3364 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3365 UDP(sport=port, dport=port))
3368 self.pg0.add_stream(pkts)
3369 self.pg_enable_capture(self.pg_interfaces)
3371 capture = self.pg1.get_capture(len(pkts))
3373 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3374 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3375 UDP(sport=3001, dport=3002))
3376 self.pg0.add_stream(p)
3377 self.pg_enable_capture(self.pg_interfaces)
3379 capture = self.pg1.assert_nothing_captured()
3381 # verify ICMP error packet
3382 capture = self.pg0.get_capture(1)
3384 self.assertTrue(p.haslayer(ICMP))
3386 self.assertEqual(icmp.type, 3)
3387 self.assertEqual(icmp.code, 1)
3388 self.assertTrue(icmp.haslayer(IPerror))
3389 inner_ip = icmp[IPerror]
3390 self.assertEqual(inner_ip[UDPerror].sport, 3001)
3391 self.assertEqual(inner_ip[UDPerror].dport, 3002)
3393 dms = self.vapi.nat_det_map_dump()
3395 self.assertEqual(1000, dms[0].ses_num)
3397 # verify IPFIX logging
3398 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3400 capture = self.pg2.get_capture(2)
3401 ipfix = IPFIXDecoder()
3402 # first load template
3404 self.assertTrue(p.haslayer(IPFIX))
3405 if p.haslayer(Template):
3406 ipfix.add_template(p.getlayer(Template))
3407 # verify events in data set
3409 if p.haslayer(Data):
3410 data = ipfix.decode_data_set(p.getlayer(Set))
3411 self.verify_ipfix_max_entries_per_user(data)
3413 def clear_nat_det(self):
3415 Clear deterministic NAT configuration.
3417 self.vapi.nat_ipfix(enable=0)
3418 self.vapi.nat_det_set_timeouts()
3419 deterministic_mappings = self.vapi.nat_det_map_dump()
3420 for dsm in deterministic_mappings:
3421 self.vapi.nat_det_add_del_map(dsm.in_addr,
3427 interfaces = self.vapi.nat44_interface_dump()
3428 for intf in interfaces:
3429 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
3434 super(TestDeterministicNAT, self).tearDown()
3435 if not self.vpp_dead:
3436 self.logger.info(self.vapi.cli("show nat44 detail"))
3437 self.clear_nat_det()
3440 class TestNAT64(MethodHolder):
3441 """ NAT64 Test Cases """
3444 def setUpClass(cls):
3445 super(TestNAT64, cls).setUpClass()
3448 cls.tcp_port_in = 6303
3449 cls.tcp_port_out = 6303
3450 cls.udp_port_in = 6304
3451 cls.udp_port_out = 6304
3452 cls.icmp_id_in = 6305
3453 cls.icmp_id_out = 6305
3454 cls.nat_addr = '10.0.0.3'
3455 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3457 cls.vrf1_nat_addr = '10.0.10.3'
3458 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
3461 cls.create_pg_interfaces(range(5))
3462 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
3463 cls.ip6_interfaces.append(cls.pg_interfaces[2])
3464 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
3466 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
3468 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
3470 cls.pg0.generate_remote_hosts(2)
3472 for i in cls.ip6_interfaces:
3475 i.configure_ipv6_neighbors()
3477 for i in cls.ip4_interfaces:
3483 cls.pg3.config_ip4()
3484 cls.pg3.resolve_arp()
3485 cls.pg3.config_ip6()
3486 cls.pg3.configure_ipv6_neighbors()
3489 super(TestNAT64, cls).tearDownClass()
3492 def test_pool(self):
3493 """ Add/delete address to NAT64 pool """
3494 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
3496 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
3498 addresses = self.vapi.nat64_pool_addr_dump()
3499 self.assertEqual(len(addresses), 1)
3500 self.assertEqual(addresses[0].address, nat_addr)
3502 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
3504 addresses = self.vapi.nat64_pool_addr_dump()
3505 self.assertEqual(len(addresses), 0)
3507 def test_interface(self):
3508 """ Enable/disable NAT64 feature on the interface """
3509 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3510 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3512 interfaces = self.vapi.nat64_interface_dump()
3513 self.assertEqual(len(interfaces), 2)
3516 for intf in interfaces:
3517 if intf.sw_if_index == self.pg0.sw_if_index:
3518 self.assertEqual(intf.is_inside, 1)
3520 elif intf.sw_if_index == self.pg1.sw_if_index:
3521 self.assertEqual(intf.is_inside, 0)
3523 self.assertTrue(pg0_found)
3524 self.assertTrue(pg1_found)
3526 features = self.vapi.cli("show interface features pg0")
3527 self.assertNotEqual(features.find('nat64-in2out'), -1)
3528 features = self.vapi.cli("show interface features pg1")
3529 self.assertNotEqual(features.find('nat64-out2in'), -1)
3531 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
3532 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
3534 interfaces = self.vapi.nat64_interface_dump()
3535 self.assertEqual(len(interfaces), 0)
3537 def test_static_bib(self):
3538 """ Add/delete static BIB entry """
3539 in_addr = socket.inet_pton(socket.AF_INET6,
3540 '2001:db8:85a3::8a2e:370:7334')
3541 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
3544 proto = IP_PROTOS.tcp
3546 self.vapi.nat64_add_del_static_bib(in_addr,
3551 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3556 self.assertEqual(bibe.i_addr, in_addr)
3557 self.assertEqual(bibe.o_addr, out_addr)
3558 self.assertEqual(bibe.i_port, in_port)
3559 self.assertEqual(bibe.o_port, out_port)
3560 self.assertEqual(static_bib_num, 1)
3562 self.vapi.nat64_add_del_static_bib(in_addr,
3568 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3573 self.assertEqual(static_bib_num, 0)
3575 def test_set_timeouts(self):
3576 """ Set NAT64 timeouts """
3577 # verify default values
3578 timeouts = self.vapi.nat64_get_timeouts()
3579 self.assertEqual(timeouts.udp, 300)
3580 self.assertEqual(timeouts.icmp, 60)
3581 self.assertEqual(timeouts.tcp_trans, 240)
3582 self.assertEqual(timeouts.tcp_est, 7440)
3583 self.assertEqual(timeouts.tcp_incoming_syn, 6)
3585 # set and verify custom values
3586 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
3587 tcp_est=7450, tcp_incoming_syn=10)
3588 timeouts = self.vapi.nat64_get_timeouts()
3589 self.assertEqual(timeouts.udp, 200)
3590 self.assertEqual(timeouts.icmp, 30)
3591 self.assertEqual(timeouts.tcp_trans, 250)
3592 self.assertEqual(timeouts.tcp_est, 7450)
3593 self.assertEqual(timeouts.tcp_incoming_syn, 10)
3595 def test_dynamic(self):
3596 """ NAT64 dynamic translation test """
3597 self.tcp_port_in = 6303
3598 self.udp_port_in = 6304
3599 self.icmp_id_in = 6305
3601 ses_num_start = self.nat64_get_ses_num()
3603 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3605 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3606 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3609 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3610 self.pg0.add_stream(pkts)
3611 self.pg_enable_capture(self.pg_interfaces)
3613 capture = self.pg1.get_capture(len(pkts))
3614 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3615 dst_ip=self.pg1.remote_ip4)
3618 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3619 self.pg1.add_stream(pkts)
3620 self.pg_enable_capture(self.pg_interfaces)
3622 capture = self.pg0.get_capture(len(pkts))
3623 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3624 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3627 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3628 self.pg0.add_stream(pkts)
3629 self.pg_enable_capture(self.pg_interfaces)
3631 capture = self.pg1.get_capture(len(pkts))
3632 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3633 dst_ip=self.pg1.remote_ip4)
3636 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3637 self.pg1.add_stream(pkts)
3638 self.pg_enable_capture(self.pg_interfaces)
3640 capture = self.pg0.get_capture(len(pkts))
3641 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3643 ses_num_end = self.nat64_get_ses_num()
3645 self.assertEqual(ses_num_end - ses_num_start, 3)
3647 # tenant with specific VRF
3648 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3649 self.vrf1_nat_addr_n,
3650 vrf_id=self.vrf1_id)
3651 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3653 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
3654 self.pg2.add_stream(pkts)
3655 self.pg_enable_capture(self.pg_interfaces)
3657 capture = self.pg1.get_capture(len(pkts))
3658 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3659 dst_ip=self.pg1.remote_ip4)
3661 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3662 self.pg1.add_stream(pkts)
3663 self.pg_enable_capture(self.pg_interfaces)
3665 capture = self.pg2.get_capture(len(pkts))
3666 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
3668 def test_static(self):
3669 """ NAT64 static translation test """
3670 self.tcp_port_in = 60303
3671 self.udp_port_in = 60304
3672 self.icmp_id_in = 60305
3673 self.tcp_port_out = 60303
3674 self.udp_port_out = 60304
3675 self.icmp_id_out = 60305
3677 ses_num_start = self.nat64_get_ses_num()
3679 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3681 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3682 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3684 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3689 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3694 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3701 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3702 self.pg0.add_stream(pkts)
3703 self.pg_enable_capture(self.pg_interfaces)
3705 capture = self.pg1.get_capture(len(pkts))
3706 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3707 dst_ip=self.pg1.remote_ip4, same_port=True)
3710 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3711 self.pg1.add_stream(pkts)
3712 self.pg_enable_capture(self.pg_interfaces)
3714 capture = self.pg0.get_capture(len(pkts))
3715 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3716 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3718 ses_num_end = self.nat64_get_ses_num()
3720 self.assertEqual(ses_num_end - ses_num_start, 3)
3722 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3723 def test_session_timeout(self):
3724 """ NAT64 session timeout """
3725 self.icmp_id_in = 1234
3726 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3728 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3729 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3730 self.vapi.nat64_set_timeouts(icmp=5)
3732 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3733 self.pg0.add_stream(pkts)
3734 self.pg_enable_capture(self.pg_interfaces)
3736 capture = self.pg1.get_capture(len(pkts))
3738 ses_num_before_timeout = self.nat64_get_ses_num()
3742 # ICMP session after timeout
3743 ses_num_after_timeout = self.nat64_get_ses_num()
3744 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
3746 def test_icmp_error(self):
3747 """ NAT64 ICMP Error message translation """
3748 self.tcp_port_in = 6303
3749 self.udp_port_in = 6304
3750 self.icmp_id_in = 6305
3752 ses_num_start = self.nat64_get_ses_num()
3754 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3756 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3757 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3759 # send some packets to create sessions
3760 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3761 self.pg0.add_stream(pkts)
3762 self.pg_enable_capture(self.pg_interfaces)
3764 capture_ip4 = self.pg1.get_capture(len(pkts))
3765 self.verify_capture_out(capture_ip4,
3766 nat_ip=self.nat_addr,
3767 dst_ip=self.pg1.remote_ip4)
3769 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3770 self.pg1.add_stream(pkts)
3771 self.pg_enable_capture(self.pg_interfaces)
3773 capture_ip6 = self.pg0.get_capture(len(pkts))
3774 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3775 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
3776 self.pg0.remote_ip6)
3779 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3780 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
3781 ICMPv6DestUnreach(code=1) /
3782 packet[IPv6] for packet in capture_ip6]
3783 self.pg0.add_stream(pkts)
3784 self.pg_enable_capture(self.pg_interfaces)
3786 capture = self.pg1.get_capture(len(pkts))
3787 for packet in capture:
3789 self.assertEqual(packet[IP].src, self.nat_addr)
3790 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3791 self.assertEqual(packet[ICMP].type, 3)
3792 self.assertEqual(packet[ICMP].code, 13)
3793 inner = packet[IPerror]
3794 self.assertEqual(inner.src, self.pg1.remote_ip4)
3795 self.assertEqual(inner.dst, self.nat_addr)
3796 self.check_icmp_checksum(packet)
3797 if inner.haslayer(TCPerror):
3798 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
3799 elif inner.haslayer(UDPerror):
3800 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
3802 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
3804 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3808 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3809 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3810 ICMP(type=3, code=13) /
3811 packet[IP] for packet in capture_ip4]
3812 self.pg1.add_stream(pkts)
3813 self.pg_enable_capture(self.pg_interfaces)
3815 capture = self.pg0.get_capture(len(pkts))
3816 for packet in capture:
3818 self.assertEqual(packet[IPv6].src, ip.src)
3819 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3820 icmp = packet[ICMPv6DestUnreach]
3821 self.assertEqual(icmp.code, 1)
3822 inner = icmp[IPerror6]
3823 self.assertEqual(inner.src, self.pg0.remote_ip6)
3824 self.assertEqual(inner.dst, ip.src)
3825 self.check_icmpv6_checksum(packet)
3826 if inner.haslayer(TCPerror):
3827 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
3828 elif inner.haslayer(UDPerror):
3829 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
3831 self.assertEqual(inner[ICMPv6EchoRequest].id,
3834 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3837 def test_hairpinning(self):
3838 """ NAT64 hairpinning """
3840 client = self.pg0.remote_hosts[0]
3841 server = self.pg0.remote_hosts[1]
3842 server_tcp_in_port = 22
3843 server_tcp_out_port = 4022
3844 server_udp_in_port = 23
3845 server_udp_out_port = 4023
3846 client_tcp_in_port = 1234
3847 client_udp_in_port = 1235
3848 client_tcp_out_port = 0
3849 client_udp_out_port = 0
3850 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
3851 nat_addr_ip6 = ip.src
3853 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3855 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3856 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3858 self.vapi.nat64_add_del_static_bib(server.ip6n,
3861 server_tcp_out_port,
3863 self.vapi.nat64_add_del_static_bib(server.ip6n,
3866 server_udp_out_port,
3871 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3872 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3873 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3875 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3876 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3877 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3879 self.pg0.add_stream(pkts)
3880 self.pg_enable_capture(self.pg_interfaces)
3882 capture = self.pg0.get_capture(len(pkts))
3883 for packet in capture:
3885 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3886 self.assertEqual(packet[IPv6].dst, server.ip6)
3887 if packet.haslayer(TCP):
3888 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3889 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3890 self.check_tcp_checksum(packet)
3891 client_tcp_out_port = packet[TCP].sport
3893 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3894 self.assertEqual(packet[UDP].dport, server_udp_in_port)
3895 self.check_udp_checksum(packet)
3896 client_udp_out_port = packet[UDP].sport
3898 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3903 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3904 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3905 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3907 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3908 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3909 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3911 self.pg0.add_stream(pkts)
3912 self.pg_enable_capture(self.pg_interfaces)
3914 capture = self.pg0.get_capture(len(pkts))
3915 for packet in capture:
3917 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3918 self.assertEqual(packet[IPv6].dst, client.ip6)
3919 if packet.haslayer(TCP):
3920 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3921 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3922 self.check_tcp_checksum(packet)
3924 self.assertEqual(packet[UDP].sport, server_udp_out_port)
3925 self.assertEqual(packet[UDP].dport, client_udp_in_port)
3926 self.check_udp_checksum(packet)
3928 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3933 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3934 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3935 ICMPv6DestUnreach(code=1) /
3936 packet[IPv6] for packet in capture]
3937 self.pg0.add_stream(pkts)
3938 self.pg_enable_capture(self.pg_interfaces)
3940 capture = self.pg0.get_capture(len(pkts))
3941 for packet in capture:
3943 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3944 self.assertEqual(packet[IPv6].dst, server.ip6)
3945 icmp = packet[ICMPv6DestUnreach]
3946 self.assertEqual(icmp.code, 1)
3947 inner = icmp[IPerror6]
3948 self.assertEqual(inner.src, server.ip6)
3949 self.assertEqual(inner.dst, nat_addr_ip6)
3950 self.check_icmpv6_checksum(packet)
3951 if inner.haslayer(TCPerror):
3952 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3953 self.assertEqual(inner[TCPerror].dport,
3954 client_tcp_out_port)
3956 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3957 self.assertEqual(inner[UDPerror].dport,
3958 client_udp_out_port)
3960 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3963 def test_prefix(self):
3964 """ NAT64 Network-Specific Prefix """
3966 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3968 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3969 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3970 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3971 self.vrf1_nat_addr_n,
3972 vrf_id=self.vrf1_id)
3973 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3976 global_pref64 = "2001:db8::"
3977 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
3978 global_pref64_len = 32
3979 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
3981 prefix = self.vapi.nat64_prefix_dump()
3982 self.assertEqual(len(prefix), 1)
3983 self.assertEqual(prefix[0].prefix, global_pref64_n)
3984 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
3985 self.assertEqual(prefix[0].vrf_id, 0)
3987 # Add tenant specific prefix
3988 vrf1_pref64 = "2001:db8:122:300::"
3989 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
3990 vrf1_pref64_len = 56
3991 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
3993 vrf_id=self.vrf1_id)
3994 prefix = self.vapi.nat64_prefix_dump()
3995 self.assertEqual(len(prefix), 2)
3998 pkts = self.create_stream_in_ip6(self.pg0,
4001 plen=global_pref64_len)
4002 self.pg0.add_stream(pkts)
4003 self.pg_enable_capture(self.pg_interfaces)
4005 capture = self.pg1.get_capture(len(pkts))
4006 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4007 dst_ip=self.pg1.remote_ip4)
4009 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4010 self.pg1.add_stream(pkts)
4011 self.pg_enable_capture(self.pg_interfaces)
4013 capture = self.pg0.get_capture(len(pkts))
4014 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4017 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
4019 # Tenant specific prefix
4020 pkts = self.create_stream_in_ip6(self.pg2,
4023 plen=vrf1_pref64_len)
4024 self.pg2.add_stream(pkts)
4025 self.pg_enable_capture(self.pg_interfaces)
4027 capture = self.pg1.get_capture(len(pkts))
4028 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4029 dst_ip=self.pg1.remote_ip4)
4031 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4032 self.pg1.add_stream(pkts)
4033 self.pg_enable_capture(self.pg_interfaces)
4035 capture = self.pg2.get_capture(len(pkts))
4036 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4039 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
4041 def test_unknown_proto(self):
4042 """ NAT64 translate packet with unknown protocol """
4044 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4046 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4047 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4048 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4051 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4052 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
4053 TCP(sport=self.tcp_port_in, dport=20))
4054 self.pg0.add_stream(p)
4055 self.pg_enable_capture(self.pg_interfaces)
4057 p = self.pg1.get_capture(1)
4059 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4060 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
4062 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4063 TCP(sport=1234, dport=1234))
4064 self.pg0.add_stream(p)
4065 self.pg_enable_capture(self.pg_interfaces)
4067 p = self.pg1.get_capture(1)
4070 self.assertEqual(packet[IP].src, self.nat_addr)
4071 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4072 self.assertTrue(packet.haslayer(GRE))
4073 self.check_ip_checksum(packet)
4075 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4079 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4080 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4082 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4083 TCP(sport=1234, dport=1234))
4084 self.pg1.add_stream(p)
4085 self.pg_enable_capture(self.pg_interfaces)
4087 p = self.pg0.get_capture(1)
4090 self.assertEqual(packet[IPv6].src, remote_ip6)
4091 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4092 self.assertEqual(packet[IPv6].nh, 47)
4094 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4097 def test_hairpinning_unknown_proto(self):
4098 """ NAT64 translate packet with unknown protocol - hairpinning """
4100 client = self.pg0.remote_hosts[0]
4101 server = self.pg0.remote_hosts[1]
4102 server_tcp_in_port = 22
4103 server_tcp_out_port = 4022
4104 client_tcp_in_port = 1234
4105 client_tcp_out_port = 1235
4106 server_nat_ip = "10.0.0.100"
4107 client_nat_ip = "10.0.0.110"
4108 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
4109 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
4110 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
4111 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
4113 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
4115 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4116 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4118 self.vapi.nat64_add_del_static_bib(server.ip6n,
4121 server_tcp_out_port,
4124 self.vapi.nat64_add_del_static_bib(server.ip6n,
4130 self.vapi.nat64_add_del_static_bib(client.ip6n,
4133 client_tcp_out_port,
4137 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4138 IPv6(src=client.ip6, dst=server_nat_ip6) /
4139 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4140 self.pg0.add_stream(p)
4141 self.pg_enable_capture(self.pg_interfaces)
4143 p = self.pg0.get_capture(1)
4145 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4146 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
4148 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4149 TCP(sport=1234, dport=1234))
4150 self.pg0.add_stream(p)
4151 self.pg_enable_capture(self.pg_interfaces)
4153 p = self.pg0.get_capture(1)
4156 self.assertEqual(packet[IPv6].src, client_nat_ip6)
4157 self.assertEqual(packet[IPv6].dst, server.ip6)
4158 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4160 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4164 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4165 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
4167 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4168 TCP(sport=1234, dport=1234))
4169 self.pg0.add_stream(p)
4170 self.pg_enable_capture(self.pg_interfaces)
4172 p = self.pg0.get_capture(1)
4175 self.assertEqual(packet[IPv6].src, server_nat_ip6)
4176 self.assertEqual(packet[IPv6].dst, client.ip6)
4177 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4179 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4182 def test_one_armed_nat64(self):
4183 """ One armed NAT64 """
4185 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
4189 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4191 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
4192 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
4195 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
4196 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
4197 TCP(sport=12345, dport=80))
4198 self.pg3.add_stream(p)
4199 self.pg_enable_capture(self.pg_interfaces)
4201 capture = self.pg3.get_capture(1)
4206 self.assertEqual(ip.src, self.nat_addr)
4207 self.assertEqual(ip.dst, self.pg3.remote_ip4)
4208 self.assertNotEqual(tcp.sport, 12345)
4209 external_port = tcp.sport
4210 self.assertEqual(tcp.dport, 80)
4211 self.check_tcp_checksum(p)
4212 self.check_ip_checksum(p)
4214 self.logger.error(ppp("Unexpected or invalid packet:", p))
4218 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
4219 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
4220 TCP(sport=80, dport=external_port))
4221 self.pg3.add_stream(p)
4222 self.pg_enable_capture(self.pg_interfaces)
4224 capture = self.pg3.get_capture(1)
4229 self.assertEqual(ip.src, remote_host_ip6)
4230 self.assertEqual(ip.dst, self.pg3.remote_ip6)
4231 self.assertEqual(tcp.sport, 80)
4232 self.assertEqual(tcp.dport, 12345)
4233 self.check_tcp_checksum(p)
4235 self.logger.error(ppp("Unexpected or invalid packet:", p))
4238 def test_frag_in_order(self):
4239 """ NAT64 translate fragments arriving in order """
4240 self.tcp_port_in = random.randint(1025, 65535)
4242 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4244 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4245 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4247 reass = self.vapi.nat_reass_dump()
4248 reass_n_start = len(reass)
4252 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
4253 self.tcp_port_in, 20, data)
4254 self.pg0.add_stream(pkts)
4255 self.pg_enable_capture(self.pg_interfaces)
4257 frags = self.pg1.get_capture(len(pkts))
4258 p = self.reass_frags_and_verify(frags,
4260 self.pg1.remote_ip4)
4261 self.assertEqual(p[TCP].dport, 20)
4262 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
4263 self.tcp_port_out = p[TCP].sport
4264 self.assertEqual(data, p[Raw].load)
4267 data = "A" * 4 + "b" * 16 + "C" * 3
4268 pkts = self.create_stream_frag(self.pg1,
4273 self.pg1.add_stream(pkts)
4274 self.pg_enable_capture(self.pg_interfaces)
4276 frags = self.pg0.get_capture(len(pkts))
4277 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4278 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
4279 self.assertEqual(p[TCP].sport, 20)
4280 self.assertEqual(p[TCP].dport, self.tcp_port_in)
4281 self.assertEqual(data, p[Raw].load)
4283 reass = self.vapi.nat_reass_dump()
4284 reass_n_end = len(reass)
4286 self.assertEqual(reass_n_end - reass_n_start, 2)
4288 def test_reass_hairpinning(self):
4289 """ NAT64 fragments hairpinning """
4291 client = self.pg0.remote_hosts[0]
4292 server = self.pg0.remote_hosts[1]
4293 server_in_port = random.randint(1025, 65535)
4294 server_out_port = random.randint(1025, 65535)
4295 client_in_port = random.randint(1025, 65535)
4296 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4297 nat_addr_ip6 = ip.src
4299 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4301 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4302 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4304 # add static BIB entry for server
4305 self.vapi.nat64_add_del_static_bib(server.ip6n,
4311 # send packet from host to server
4312 pkts = self.create_stream_frag_ip6(self.pg0,
4317 self.pg0.add_stream(pkts)
4318 self.pg_enable_capture(self.pg_interfaces)
4320 frags = self.pg0.get_capture(len(pkts))
4321 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
4322 self.assertNotEqual(p[TCP].sport, client_in_port)
4323 self.assertEqual(p[TCP].dport, server_in_port)
4324 self.assertEqual(data, p[Raw].load)
4326 def test_frag_out_of_order(self):
4327 """ NAT64 translate fragments arriving out of order """
4328 self.tcp_port_in = random.randint(1025, 65535)
4330 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4332 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4333 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4337 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
4338 self.tcp_port_in, 20, data)
4340 self.pg0.add_stream(pkts)
4341 self.pg_enable_capture(self.pg_interfaces)
4343 frags = self.pg1.get_capture(len(pkts))
4344 p = self.reass_frags_and_verify(frags,
4346 self.pg1.remote_ip4)
4347 self.assertEqual(p[TCP].dport, 20)
4348 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
4349 self.tcp_port_out = p[TCP].sport
4350 self.assertEqual(data, p[Raw].load)
4353 data = "A" * 4 + "B" * 16 + "C" * 3
4354 pkts = self.create_stream_frag(self.pg1,
4360 self.pg1.add_stream(pkts)
4361 self.pg_enable_capture(self.pg_interfaces)
4363 frags = self.pg0.get_capture(len(pkts))
4364 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4365 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
4366 self.assertEqual(p[TCP].sport, 20)
4367 self.assertEqual(p[TCP].dport, self.tcp_port_in)
4368 self.assertEqual(data, p[Raw].load)
4370 def test_interface_addr(self):
4371 """ Acquire NAT64 pool addresses from interface """
4372 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
4374 # no address in NAT64 pool
4375 adresses = self.vapi.nat44_address_dump()
4376 self.assertEqual(0, len(adresses))
4378 # configure interface address and check NAT64 address pool
4379 self.pg4.config_ip4()
4380 addresses = self.vapi.nat64_pool_addr_dump()
4381 self.assertEqual(len(addresses), 1)
4382 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
4384 # remove interface address and check NAT64 address pool
4385 self.pg4.unconfig_ip4()
4386 addresses = self.vapi.nat64_pool_addr_dump()
4387 self.assertEqual(0, len(adresses))
4389 def nat64_get_ses_num(self):
4391 Return number of active NAT64 sessions.
4393 st = self.vapi.nat64_st_dump()
4396 def clear_nat64(self):
4398 Clear NAT64 configuration.
4400 self.vapi.nat64_set_timeouts()
4402 interfaces = self.vapi.nat64_interface_dump()
4403 for intf in interfaces:
4404 if intf.is_inside > 1:
4405 self.vapi.nat64_add_del_interface(intf.sw_if_index,
4408 self.vapi.nat64_add_del_interface(intf.sw_if_index,
4412 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4415 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4423 bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
4426 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4434 bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
4437 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4445 adresses = self.vapi.nat64_pool_addr_dump()
4446 for addr in adresses:
4447 self.vapi.nat64_add_del_pool_addr_range(addr.address,
4452 prefixes = self.vapi.nat64_prefix_dump()
4453 for prefix in prefixes:
4454 self.vapi.nat64_add_del_prefix(prefix.prefix,
4456 vrf_id=prefix.vrf_id,
4460 super(TestNAT64, self).tearDown()
4461 if not self.vpp_dead:
4462 self.logger.info(self.vapi.cli("show nat64 pool"))
4463 self.logger.info(self.vapi.cli("show nat64 interfaces"))
4464 self.logger.info(self.vapi.cli("show nat64 prefix"))
4465 self.logger.info(self.vapi.cli("show nat64 bib all"))
4466 self.logger.info(self.vapi.cli("show nat64 session table all"))
4467 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
4471 class TestDSlite(MethodHolder):
4472 """ DS-Lite Test Cases """
4475 def setUpClass(cls):
4476 super(TestDSlite, cls).setUpClass()
4479 cls.nat_addr = '10.0.0.3'
4480 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4482 cls.create_pg_interfaces(range(2))
4484 cls.pg0.config_ip4()
4485 cls.pg0.resolve_arp()
4487 cls.pg1.config_ip6()
4488 cls.pg1.generate_remote_hosts(2)
4489 cls.pg1.configure_ipv6_neighbors()
4492 super(TestDSlite, cls).tearDownClass()
4495 def test_dslite(self):
4496 """ Test DS-Lite """
4497 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
4499 aftr_ip4 = '192.0.0.1'
4500 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
4501 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
4502 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
4503 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
4506 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4507 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
4508 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4509 UDP(sport=20000, dport=10000))
4510 self.pg1.add_stream(p)
4511 self.pg_enable_capture(self.pg_interfaces)
4513 capture = self.pg0.get_capture(1)
4514 capture = capture[0]
4515 self.assertFalse(capture.haslayer(IPv6))
4516 self.assertEqual(capture[IP].src, self.nat_addr)
4517 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4518 self.assertNotEqual(capture[UDP].sport, 20000)
4519 self.assertEqual(capture[UDP].dport, 10000)
4520 self.check_ip_checksum(capture)
4521 out_port = capture[UDP].sport
4523 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4524 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4525 UDP(sport=10000, dport=out_port))
4526 self.pg0.add_stream(p)
4527 self.pg_enable_capture(self.pg_interfaces)
4529 capture = self.pg1.get_capture(1)
4530 capture = capture[0]
4531 self.assertEqual(capture[IPv6].src, aftr_ip6)
4532 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
4533 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4534 self.assertEqual(capture[IP].dst, '192.168.1.1')
4535 self.assertEqual(capture[UDP].sport, 10000)
4536 self.assertEqual(capture[UDP].dport, 20000)
4537 self.check_ip_checksum(capture)
4540 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4541 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
4542 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4543 TCP(sport=20001, dport=10001))
4544 self.pg1.add_stream(p)
4545 self.pg_enable_capture(self.pg_interfaces)
4547 capture = self.pg0.get_capture(1)
4548 capture = capture[0]
4549 self.assertFalse(capture.haslayer(IPv6))
4550 self.assertEqual(capture[IP].src, self.nat_addr)
4551 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4552 self.assertNotEqual(capture[TCP].sport, 20001)
4553 self.assertEqual(capture[TCP].dport, 10001)
4554 self.check_ip_checksum(capture)
4555 self.check_tcp_checksum(capture)
4556 out_port = capture[TCP].sport
4558 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4559 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4560 TCP(sport=10001, dport=out_port))
4561 self.pg0.add_stream(p)
4562 self.pg_enable_capture(self.pg_interfaces)
4564 capture = self.pg1.get_capture(1)
4565 capture = capture[0]
4566 self.assertEqual(capture[IPv6].src, aftr_ip6)
4567 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4568 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4569 self.assertEqual(capture[IP].dst, '192.168.1.1')
4570 self.assertEqual(capture[TCP].sport, 10001)
4571 self.assertEqual(capture[TCP].dport, 20001)
4572 self.check_ip_checksum(capture)
4573 self.check_tcp_checksum(capture)
4576 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4577 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
4578 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4579 ICMP(id=4000, type='echo-request'))
4580 self.pg1.add_stream(p)
4581 self.pg_enable_capture(self.pg_interfaces)
4583 capture = self.pg0.get_capture(1)
4584 capture = capture[0]
4585 self.assertFalse(capture.haslayer(IPv6))
4586 self.assertEqual(capture[IP].src, self.nat_addr)
4587 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4588 self.assertNotEqual(capture[ICMP].id, 4000)
4589 self.check_ip_checksum(capture)
4590 self.check_icmp_checksum(capture)
4591 out_id = capture[ICMP].id
4593 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4594 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4595 ICMP(id=out_id, type='echo-reply'))
4596 self.pg0.add_stream(p)
4597 self.pg_enable_capture(self.pg_interfaces)
4599 capture = self.pg1.get_capture(1)
4600 capture = capture[0]
4601 self.assertEqual(capture[IPv6].src, aftr_ip6)
4602 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4603 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4604 self.assertEqual(capture[IP].dst, '192.168.1.1')
4605 self.assertEqual(capture[ICMP].id, 4000)
4606 self.check_ip_checksum(capture)
4607 self.check_icmp_checksum(capture)
4610 super(TestDSlite, self).tearDown()
4611 if not self.vpp_dead:
4612 self.logger.info(self.vapi.cli("show dslite pool"))
4614 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
4615 self.logger.info(self.vapi.cli("show dslite sessions"))
4617 if __name__ == '__main__':
4618 unittest.main(testRunner=VppTestRunner)