9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
31 super(MethodHolder, cls).setUpClass()
34 super(MethodHolder, self).tearDown()
36 def check_ip_checksum(self, pkt):
38 Check IP checksum of the packet
40 :param pkt: Packet to check IP checksum
42 new = pkt.__class__(str(pkt))
44 new = new.__class__(str(new))
45 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
47 def check_tcp_checksum(self, pkt):
49 Check TCP checksum in IP packet
51 :param pkt: Packet to check TCP checksum
53 new = pkt.__class__(str(pkt))
55 new = new.__class__(str(new))
56 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
58 def check_udp_checksum(self, pkt):
60 Check UDP checksum in IP packet
62 :param pkt: Packet to check UDP checksum
64 new = pkt.__class__(str(pkt))
66 new = new.__class__(str(new))
67 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
69 def check_icmp_errror_embedded(self, pkt):
71 Check ICMP error embeded packet checksum
73 :param pkt: Packet to check ICMP error embeded packet checksum
75 if pkt.haslayer(IPerror):
76 new = pkt.__class__(str(pkt))
77 del new['IPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
81 if pkt.haslayer(TCPerror):
82 new = pkt.__class__(str(pkt))
83 del new['TCPerror'].chksum
84 new = new.__class__(str(new))
85 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
87 if pkt.haslayer(UDPerror):
88 if pkt['UDPerror'].chksum != 0:
89 new = pkt.__class__(str(pkt))
90 del new['UDPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['UDPerror'].chksum,
93 pkt['UDPerror'].chksum)
95 if pkt.haslayer(ICMPerror):
96 del new['ICMPerror'].chksum
97 new = new.__class__(str(new))
98 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
100 def check_icmp_checksum(self, pkt):
102 Check ICMP checksum in IPv4 packet
104 :param pkt: Packet to check ICMP checksum
106 new = pkt.__class__(str(pkt))
107 del new['ICMP'].chksum
108 new = new.__class__(str(new))
109 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110 if pkt.haslayer(IPerror):
111 self.check_icmp_errror_embedded(pkt)
113 def check_icmpv6_checksum(self, pkt):
115 Check ICMPv6 checksum in IPv4 packet
117 :param pkt: Packet to check ICMPv6 checksum
119 new = pkt.__class__(str(pkt))
120 if pkt.haslayer(ICMPv6DestUnreach):
121 del new['ICMPv6DestUnreach'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124 pkt['ICMPv6DestUnreach'].cksum)
125 self.check_icmp_errror_embedded(pkt)
126 if pkt.haslayer(ICMPv6EchoRequest):
127 del new['ICMPv6EchoRequest'].cksum
128 new = new.__class__(str(new))
129 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130 pkt['ICMPv6EchoRequest'].cksum)
131 if pkt.haslayer(ICMPv6EchoReply):
132 del new['ICMPv6EchoReply'].cksum
133 new = new.__class__(str(new))
134 self.assertEqual(new['ICMPv6EchoReply'].cksum,
135 pkt['ICMPv6EchoReply'].cksum)
137 def create_stream_in(self, in_if, out_if, ttl=64):
139 Create packet stream for inside network
141 :param in_if: Inside interface
142 :param out_if: Outside interface
143 :param ttl: TTL of generated packets
147 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
148 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
149 TCP(sport=self.tcp_port_in, dport=20))
153 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
154 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
155 UDP(sport=self.udp_port_in, dport=20))
159 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
160 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
161 ICMP(id=self.icmp_id_in, type='echo-request'))
166 def compose_ip6(self, ip4, pref, plen):
168 Compose IPv4-embedded IPv6 addresses
170 :param ip4: IPv4 address
171 :param pref: IPv6 prefix
172 :param plen: IPv6 prefix length
173 :returns: IPv4-embedded IPv6 addresses
175 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
176 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
191 pref_n[10] = ip4_n[3]
195 pref_n[10] = ip4_n[2]
196 pref_n[11] = ip4_n[3]
199 pref_n[10] = ip4_n[1]
200 pref_n[11] = ip4_n[2]
201 pref_n[12] = ip4_n[3]
203 pref_n[12] = ip4_n[0]
204 pref_n[13] = ip4_n[1]
205 pref_n[14] = ip4_n[2]
206 pref_n[15] = ip4_n[3]
207 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
209 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
211 Create IPv6 packet stream for inside network
213 :param in_if: Inside interface
214 :param out_if: Outside interface
215 :param ttl: Hop Limit of generated packets
216 :param pref: NAT64 prefix
217 :param plen: NAT64 prefix length
221 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
223 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
226 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
227 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
228 TCP(sport=self.tcp_port_in, dport=20))
232 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
233 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
234 UDP(sport=self.udp_port_in, dport=20))
238 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
239 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
240 ICMPv6EchoRequest(id=self.icmp_id_in))
245 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
247 Create packet stream for outside network
249 :param out_if: Outside interface
250 :param dst_ip: Destination IP address (Default use global NAT address)
251 :param ttl: TTL of generated packets
254 dst_ip = self.nat_addr
257 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
258 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
259 TCP(dport=self.tcp_port_out, sport=20))
263 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
264 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
265 UDP(dport=self.udp_port_out, sport=20))
269 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
270 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
271 ICMP(id=self.icmp_id_out, type='echo-reply'))
276 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
277 packet_num=3, dst_ip=None):
279 Verify captured packets on outside network
281 :param capture: Captured packets
282 :param nat_ip: Translated IP address (Default use global NAT address)
283 :param same_port: Sorce port number is not translated (Default False)
284 :param packet_num: Expected number of packets (Default 3)
285 :param dst_ip: Destination IP address (Default do not verify)
288 nat_ip = self.nat_addr
289 self.assertEqual(packet_num, len(capture))
290 for packet in capture:
292 self.check_ip_checksum(packet)
293 self.assertEqual(packet[IP].src, nat_ip)
294 if dst_ip is not None:
295 self.assertEqual(packet[IP].dst, dst_ip)
296 if packet.haslayer(TCP):
298 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
301 packet[TCP].sport, self.tcp_port_in)
302 self.tcp_port_out = packet[TCP].sport
303 self.check_tcp_checksum(packet)
304 elif packet.haslayer(UDP):
306 self.assertEqual(packet[UDP].sport, self.udp_port_in)
309 packet[UDP].sport, self.udp_port_in)
310 self.udp_port_out = packet[UDP].sport
313 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
315 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
316 self.icmp_id_out = packet[ICMP].id
317 self.check_icmp_checksum(packet)
319 self.logger.error(ppp("Unexpected or invalid packet "
320 "(outside network):", packet))
323 def verify_capture_in(self, capture, in_if, packet_num=3):
325 Verify captured packets on inside network
327 :param capture: Captured packets
328 :param in_if: Inside interface
329 :param packet_num: Expected number of packets (Default 3)
331 self.assertEqual(packet_num, len(capture))
332 for packet in capture:
334 self.check_ip_checksum(packet)
335 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
336 if packet.haslayer(TCP):
337 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
338 self.check_tcp_checksum(packet)
339 elif packet.haslayer(UDP):
340 self.assertEqual(packet[UDP].dport, self.udp_port_in)
342 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
343 self.check_icmp_checksum(packet)
345 self.logger.error(ppp("Unexpected or invalid packet "
346 "(inside network):", packet))
349 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
351 Verify captured IPv6 packets on inside network
353 :param capture: Captured packets
354 :param src_ip: Source IP
355 :param dst_ip: Destination IP address
356 :param packet_num: Expected number of packets (Default 3)
358 self.assertEqual(packet_num, len(capture))
359 for packet in capture:
361 self.assertEqual(packet[IPv6].src, src_ip)
362 self.assertEqual(packet[IPv6].dst, dst_ip)
363 if packet.haslayer(TCP):
364 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
365 self.check_tcp_checksum(packet)
366 elif packet.haslayer(UDP):
367 self.assertEqual(packet[UDP].dport, self.udp_port_in)
368 self.check_udp_checksum(packet)
370 self.assertEqual(packet[ICMPv6EchoReply].id,
372 self.check_icmpv6_checksum(packet)
374 self.logger.error(ppp("Unexpected or invalid packet "
375 "(inside network):", packet))
378 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
380 Verify captured packet that don't have to be translated
382 :param capture: Captured packets
383 :param ingress_if: Ingress interface
384 :param egress_if: Egress interface
386 for packet in capture:
388 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
389 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
390 if packet.haslayer(TCP):
391 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
392 elif packet.haslayer(UDP):
393 self.assertEqual(packet[UDP].sport, self.udp_port_in)
395 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
397 self.logger.error(ppp("Unexpected or invalid packet "
398 "(inside network):", packet))
401 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
402 packet_num=3, icmp_type=11):
404 Verify captured packets with ICMP errors on outside network
406 :param capture: Captured packets
407 :param src_ip: Translated IP address or IP address of VPP
408 (Default use global NAT address)
409 :param packet_num: Expected number of packets (Default 3)
410 :param icmp_type: Type of error ICMP packet
411 we are expecting (Default 11)
414 src_ip = self.nat_addr
415 self.assertEqual(packet_num, len(capture))
416 for packet in capture:
418 self.assertEqual(packet[IP].src, src_ip)
419 self.assertTrue(packet.haslayer(ICMP))
421 self.assertEqual(icmp.type, icmp_type)
422 self.assertTrue(icmp.haslayer(IPerror))
423 inner_ip = icmp[IPerror]
424 if inner_ip.haslayer(TCPerror):
425 self.assertEqual(inner_ip[TCPerror].dport,
427 elif inner_ip.haslayer(UDPerror):
428 self.assertEqual(inner_ip[UDPerror].dport,
431 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
433 self.logger.error(ppp("Unexpected or invalid packet "
434 "(outside network):", packet))
437 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
440 Verify captured packets with ICMP errors on inside network
442 :param capture: Captured packets
443 :param in_if: Inside interface
444 :param packet_num: Expected number of packets (Default 3)
445 :param icmp_type: Type of error ICMP packet
446 we are expecting (Default 11)
448 self.assertEqual(packet_num, len(capture))
449 for packet in capture:
451 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
452 self.assertTrue(packet.haslayer(ICMP))
454 self.assertEqual(icmp.type, icmp_type)
455 self.assertTrue(icmp.haslayer(IPerror))
456 inner_ip = icmp[IPerror]
457 if inner_ip.haslayer(TCPerror):
458 self.assertEqual(inner_ip[TCPerror].sport,
460 elif inner_ip.haslayer(UDPerror):
461 self.assertEqual(inner_ip[UDPerror].sport,
464 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
466 self.logger.error(ppp("Unexpected or invalid packet "
467 "(inside network):", packet))
470 def create_stream_frag(self, src_if, dst, sport, dport, data):
472 Create fragmented packet stream
474 :param src_if: Source interface
475 :param dst: Destination IPv4 address
476 :param sport: Source TCP port
477 :param dport: Destination TCP port
478 :param data: Payload data
481 id = random.randint(0, 65535)
482 p = (IP(src=src_if.remote_ip4, dst=dst) /
483 TCP(sport=sport, dport=dport) /
485 p = p.__class__(str(p))
486 chksum = p['TCP'].chksum
488 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
489 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
490 TCP(sport=sport, dport=dport, chksum=chksum) /
493 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
494 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
495 proto=IP_PROTOS.tcp) /
498 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
499 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
505 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
506 pref=None, plen=0, frag_size=128):
508 Create fragmented packet stream
510 :param src_if: Source interface
511 :param dst: Destination IPv4 address
512 :param sport: Source TCP port
513 :param dport: Destination TCP port
514 :param data: Payload data
515 :param pref: NAT64 prefix
516 :param plen: NAT64 prefix length
517 :param fragsize: size of fragments
521 dst_ip6 = ''.join(['64:ff9b::', dst])
523 dst_ip6 = self.compose_ip6(dst, pref, plen)
525 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
526 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
527 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
528 TCP(sport=sport, dport=dport) /
531 return fragment6(p, frag_size)
533 def reass_frags_and_verify(self, frags, src, dst):
535 Reassemble and verify fragmented packet
537 :param frags: Captured fragments
538 :param src: Source IPv4 address to verify
539 :param dst: Destination IPv4 address to verify
541 :returns: Reassembled IPv4 packet
543 buffer = StringIO.StringIO()
545 self.assertEqual(p[IP].src, src)
546 self.assertEqual(p[IP].dst, dst)
547 self.check_ip_checksum(p)
548 buffer.seek(p[IP].frag * 8)
549 buffer.write(p[IP].payload)
550 ip = frags[0].getlayer(IP)
551 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
552 proto=frags[0][IP].proto)
553 if ip.proto == IP_PROTOS.tcp:
554 p = (ip / TCP(buffer.getvalue()))
555 self.check_tcp_checksum(p)
556 elif ip.proto == IP_PROTOS.udp:
557 p = (ip / UDP(buffer.getvalue()))
560 def reass_frags_and_verify_ip6(self, frags, src, dst):
562 Reassemble and verify fragmented packet
564 :param frags: Captured fragments
565 :param src: Source IPv6 address to verify
566 :param dst: Destination IPv6 address to verify
568 :returns: Reassembled IPv6 packet
570 buffer = StringIO.StringIO()
572 self.assertEqual(p[IPv6].src, src)
573 self.assertEqual(p[IPv6].dst, dst)
574 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
575 buffer.write(p[IPv6ExtHdrFragment].payload)
576 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
577 nh=frags[0][IPv6ExtHdrFragment].nh)
578 if ip.nh == IP_PROTOS.tcp:
579 p = (ip / TCP(buffer.getvalue()))
580 self.check_tcp_checksum(p)
581 elif ip.nh == IP_PROTOS.udp:
582 p = (ip / UDP(buffer.getvalue()))
585 def verify_ipfix_nat44_ses(self, data):
587 Verify IPFIX NAT44 session create/delete event
589 :param data: Decoded IPFIX data records
591 nat44_ses_create_num = 0
592 nat44_ses_delete_num = 0
593 self.assertEqual(6, len(data))
596 self.assertIn(ord(record[230]), [4, 5])
597 if ord(record[230]) == 4:
598 nat44_ses_create_num += 1
600 nat44_ses_delete_num += 1
602 self.assertEqual(self.pg0.remote_ip4n, record[8])
603 # postNATSourceIPv4Address
604 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
607 self.assertEqual(struct.pack("!I", 0), record[234])
608 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
609 if IP_PROTOS.icmp == ord(record[4]):
610 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
611 self.assertEqual(struct.pack("!H", self.icmp_id_out),
613 elif IP_PROTOS.tcp == ord(record[4]):
614 self.assertEqual(struct.pack("!H", self.tcp_port_in),
616 self.assertEqual(struct.pack("!H", self.tcp_port_out),
618 elif IP_PROTOS.udp == ord(record[4]):
619 self.assertEqual(struct.pack("!H", self.udp_port_in),
621 self.assertEqual(struct.pack("!H", self.udp_port_out),
624 self.fail("Invalid protocol")
625 self.assertEqual(3, nat44_ses_create_num)
626 self.assertEqual(3, nat44_ses_delete_num)
628 def verify_ipfix_addr_exhausted(self, data):
630 Verify IPFIX NAT addresses event
632 :param data: Decoded IPFIX data records
634 self.assertEqual(1, len(data))
637 self.assertEqual(ord(record[230]), 3)
639 self.assertEqual(struct.pack("!I", 0), record[283])
642 class TestNAT44(MethodHolder):
643 """ NAT44 Test Cases """
647 super(TestNAT44, cls).setUpClass()
650 cls.tcp_port_in = 6303
651 cls.tcp_port_out = 6303
652 cls.udp_port_in = 6304
653 cls.udp_port_out = 6304
654 cls.icmp_id_in = 6305
655 cls.icmp_id_out = 6305
656 cls.nat_addr = '10.0.0.3'
657 cls.ipfix_src_port = 4739
658 cls.ipfix_domain_id = 1
660 cls.create_pg_interfaces(range(10))
661 cls.interfaces = list(cls.pg_interfaces[0:4])
663 for i in cls.interfaces:
668 cls.pg0.generate_remote_hosts(3)
669 cls.pg0.configure_ipv4_neighbors()
671 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
672 cls.vapi.ip_table_add_del(10, is_add=1)
673 cls.vapi.ip_table_add_del(20, is_add=1)
675 cls.pg4._local_ip4 = "172.16.255.1"
676 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
677 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
678 cls.pg4.set_table_ip4(10)
679 cls.pg5._local_ip4 = "172.17.255.3"
680 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
681 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
682 cls.pg5.set_table_ip4(10)
683 cls.pg6._local_ip4 = "172.16.255.1"
684 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
685 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
686 cls.pg6.set_table_ip4(20)
687 for i in cls.overlapping_interfaces:
695 cls.pg9.generate_remote_hosts(2)
697 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
698 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
702 cls.pg9.resolve_arp()
703 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
704 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
705 cls.pg9.resolve_arp()
710 super(TestNAT44, cls).tearDownClass()
713 def clear_nat44(self):
715 Clear NAT44 configuration.
717 # I found no elegant way to do this
718 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
719 dst_address_length=32,
720 next_hop_address=self.pg7.remote_ip4n,
721 next_hop_sw_if_index=self.pg7.sw_if_index,
723 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
724 dst_address_length=32,
725 next_hop_address=self.pg8.remote_ip4n,
726 next_hop_sw_if_index=self.pg8.sw_if_index,
729 for intf in [self.pg7, self.pg8]:
730 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
732 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
737 if self.pg7.has_ip4_config:
738 self.pg7.unconfig_ip4()
740 interfaces = self.vapi.nat44_interface_addr_dump()
741 for intf in interfaces:
742 self.vapi.nat44_add_interface_addr(intf.sw_if_index, is_add=0)
744 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
745 domain_id=self.ipfix_domain_id)
746 self.ipfix_src_port = 4739
747 self.ipfix_domain_id = 1
749 interfaces = self.vapi.nat44_interface_dump()
750 for intf in interfaces:
751 if intf.is_inside > 1:
752 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
755 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
759 interfaces = self.vapi.nat44_interface_output_feature_dump()
760 for intf in interfaces:
761 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
765 static_mappings = self.vapi.nat44_static_mapping_dump()
766 for sm in static_mappings:
767 self.vapi.nat44_add_del_static_mapping(
769 sm.external_ip_address,
770 local_port=sm.local_port,
771 external_port=sm.external_port,
772 addr_only=sm.addr_only,
774 protocol=sm.protocol,
777 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
778 for lb_sm in lb_static_mappings:
779 self.vapi.nat44_add_del_lb_static_mapping(
788 adresses = self.vapi.nat44_address_dump()
789 for addr in adresses:
790 self.vapi.nat44_add_del_address_range(addr.ip_address,
794 self.vapi.nat_set_reass()
795 self.vapi.nat_set_reass(is_ip6=1)
797 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
798 local_port=0, external_port=0, vrf_id=0,
799 is_add=1, external_sw_if_index=0xFFFFFFFF,
802 Add/delete NAT44 static mapping
804 :param local_ip: Local IP address
805 :param external_ip: External IP address
806 :param local_port: Local port number (Optional)
807 :param external_port: External port number (Optional)
808 :param vrf_id: VRF ID (Default 0)
809 :param is_add: 1 if add, 0 if delete (Default add)
810 :param external_sw_if_index: External interface instead of IP address
811 :param proto: IP protocol (Mandatory if port specified)
814 if local_port and external_port:
816 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
817 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
818 self.vapi.nat44_add_del_static_mapping(
821 external_sw_if_index,
829 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
831 Add/delete NAT44 address
833 :param ip: IP address
834 :param is_add: 1 if add, 0 if delete (Default add)
836 nat_addr = socket.inet_pton(socket.AF_INET, ip)
837 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
840 def test_dynamic(self):
841 """ NAT44 dynamic translation test """
843 self.nat44_add_address(self.nat_addr)
844 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
845 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
849 pkts = self.create_stream_in(self.pg0, self.pg1)
850 self.pg0.add_stream(pkts)
851 self.pg_enable_capture(self.pg_interfaces)
853 capture = self.pg1.get_capture(len(pkts))
854 self.verify_capture_out(capture)
857 pkts = self.create_stream_out(self.pg1)
858 self.pg1.add_stream(pkts)
859 self.pg_enable_capture(self.pg_interfaces)
861 capture = self.pg0.get_capture(len(pkts))
862 self.verify_capture_in(capture, self.pg0)
864 def test_dynamic_icmp_errors_in2out_ttl_1(self):
865 """ NAT44 handling of client packets with TTL=1 """
867 self.nat44_add_address(self.nat_addr)
868 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
869 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
872 # Client side - generate traffic
873 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
874 self.pg0.add_stream(pkts)
875 self.pg_enable_capture(self.pg_interfaces)
878 # Client side - verify ICMP type 11 packets
879 capture = self.pg0.get_capture(len(pkts))
880 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
882 def test_dynamic_icmp_errors_out2in_ttl_1(self):
883 """ NAT44 handling of server packets with TTL=1 """
885 self.nat44_add_address(self.nat_addr)
886 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
887 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
890 # Client side - create sessions
891 pkts = self.create_stream_in(self.pg0, self.pg1)
892 self.pg0.add_stream(pkts)
893 self.pg_enable_capture(self.pg_interfaces)
896 # Server side - generate traffic
897 capture = self.pg1.get_capture(len(pkts))
898 self.verify_capture_out(capture)
899 pkts = self.create_stream_out(self.pg1, ttl=1)
900 self.pg1.add_stream(pkts)
901 self.pg_enable_capture(self.pg_interfaces)
904 # Server side - verify ICMP type 11 packets
905 capture = self.pg1.get_capture(len(pkts))
906 self.verify_capture_out_with_icmp_errors(capture,
907 src_ip=self.pg1.local_ip4)
909 def test_dynamic_icmp_errors_in2out_ttl_2(self):
910 """ NAT44 handling of error responses to client packets with TTL=2 """
912 self.nat44_add_address(self.nat_addr)
913 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
914 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
917 # Client side - generate traffic
918 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
919 self.pg0.add_stream(pkts)
920 self.pg_enable_capture(self.pg_interfaces)
923 # Server side - simulate ICMP type 11 response
924 capture = self.pg1.get_capture(len(pkts))
925 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
926 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
927 ICMP(type=11) / packet[IP] for packet in capture]
928 self.pg1.add_stream(pkts)
929 self.pg_enable_capture(self.pg_interfaces)
932 # Client side - verify ICMP type 11 packets
933 capture = self.pg0.get_capture(len(pkts))
934 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
936 def test_dynamic_icmp_errors_out2in_ttl_2(self):
937 """ NAT44 handling of error responses to server packets with TTL=2 """
939 self.nat44_add_address(self.nat_addr)
940 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
941 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
944 # Client side - create sessions
945 pkts = self.create_stream_in(self.pg0, self.pg1)
946 self.pg0.add_stream(pkts)
947 self.pg_enable_capture(self.pg_interfaces)
950 # Server side - generate traffic
951 capture = self.pg1.get_capture(len(pkts))
952 self.verify_capture_out(capture)
953 pkts = self.create_stream_out(self.pg1, ttl=2)
954 self.pg1.add_stream(pkts)
955 self.pg_enable_capture(self.pg_interfaces)
958 # Client side - simulate ICMP type 11 response
959 capture = self.pg0.get_capture(len(pkts))
960 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
961 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
962 ICMP(type=11) / packet[IP] for packet in capture]
963 self.pg0.add_stream(pkts)
964 self.pg_enable_capture(self.pg_interfaces)
967 # Server side - verify ICMP type 11 packets
968 capture = self.pg1.get_capture(len(pkts))
969 self.verify_capture_out_with_icmp_errors(capture)
971 def test_ping_out_interface_from_outside(self):
972 """ Ping NAT44 out interface from outside network """
974 self.nat44_add_address(self.nat_addr)
975 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
976 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
979 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
980 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
981 ICMP(id=self.icmp_id_out, type='echo-request'))
983 self.pg1.add_stream(pkts)
984 self.pg_enable_capture(self.pg_interfaces)
986 capture = self.pg1.get_capture(len(pkts))
987 self.assertEqual(1, len(capture))
990 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
991 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
992 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
993 self.assertEqual(packet[ICMP].type, 0) # echo reply
995 self.logger.error(ppp("Unexpected or invalid packet "
996 "(outside network):", packet))
999 def test_ping_internal_host_from_outside(self):
1000 """ Ping internal host from outside network """
1002 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1003 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1004 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1008 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1009 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1010 ICMP(id=self.icmp_id_out, type='echo-request'))
1011 self.pg1.add_stream(pkt)
1012 self.pg_enable_capture(self.pg_interfaces)
1014 capture = self.pg0.get_capture(1)
1015 self.verify_capture_in(capture, self.pg0, packet_num=1)
1016 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1019 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1020 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1021 ICMP(id=self.icmp_id_in, type='echo-reply'))
1022 self.pg0.add_stream(pkt)
1023 self.pg_enable_capture(self.pg_interfaces)
1025 capture = self.pg1.get_capture(1)
1026 self.verify_capture_out(capture, same_port=True, packet_num=1)
1027 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1029 def test_static_in(self):
1030 """ 1:1 NAT initialized from inside network """
1032 nat_ip = "10.0.0.10"
1033 self.tcp_port_out = 6303
1034 self.udp_port_out = 6304
1035 self.icmp_id_out = 6305
1037 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1038 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1039 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1043 pkts = self.create_stream_in(self.pg0, self.pg1)
1044 self.pg0.add_stream(pkts)
1045 self.pg_enable_capture(self.pg_interfaces)
1047 capture = self.pg1.get_capture(len(pkts))
1048 self.verify_capture_out(capture, nat_ip, True)
1051 pkts = self.create_stream_out(self.pg1, nat_ip)
1052 self.pg1.add_stream(pkts)
1053 self.pg_enable_capture(self.pg_interfaces)
1055 capture = self.pg0.get_capture(len(pkts))
1056 self.verify_capture_in(capture, self.pg0)
1058 def test_static_out(self):
1059 """ 1:1 NAT initialized from outside network """
1061 nat_ip = "10.0.0.20"
1062 self.tcp_port_out = 6303
1063 self.udp_port_out = 6304
1064 self.icmp_id_out = 6305
1066 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1067 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1068 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1072 pkts = self.create_stream_out(self.pg1, nat_ip)
1073 self.pg1.add_stream(pkts)
1074 self.pg_enable_capture(self.pg_interfaces)
1076 capture = self.pg0.get_capture(len(pkts))
1077 self.verify_capture_in(capture, self.pg0)
1080 pkts = self.create_stream_in(self.pg0, self.pg1)
1081 self.pg0.add_stream(pkts)
1082 self.pg_enable_capture(self.pg_interfaces)
1084 capture = self.pg1.get_capture(len(pkts))
1085 self.verify_capture_out(capture, nat_ip, True)
1087 def test_static_with_port_in(self):
1088 """ 1:1 NAPT initialized from inside network """
1090 self.tcp_port_out = 3606
1091 self.udp_port_out = 3607
1092 self.icmp_id_out = 3608
1094 self.nat44_add_address(self.nat_addr)
1095 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1096 self.tcp_port_in, self.tcp_port_out,
1097 proto=IP_PROTOS.tcp)
1098 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1099 self.udp_port_in, self.udp_port_out,
1100 proto=IP_PROTOS.udp)
1101 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1102 self.icmp_id_in, self.icmp_id_out,
1103 proto=IP_PROTOS.icmp)
1104 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1105 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1109 pkts = self.create_stream_in(self.pg0, self.pg1)
1110 self.pg0.add_stream(pkts)
1111 self.pg_enable_capture(self.pg_interfaces)
1113 capture = self.pg1.get_capture(len(pkts))
1114 self.verify_capture_out(capture)
1117 pkts = self.create_stream_out(self.pg1)
1118 self.pg1.add_stream(pkts)
1119 self.pg_enable_capture(self.pg_interfaces)
1121 capture = self.pg0.get_capture(len(pkts))
1122 self.verify_capture_in(capture, self.pg0)
1124 def test_static_with_port_out(self):
1125 """ 1:1 NAPT initialized from outside network """
1127 self.tcp_port_out = 30606
1128 self.udp_port_out = 30607
1129 self.icmp_id_out = 30608
1131 self.nat44_add_address(self.nat_addr)
1132 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1133 self.tcp_port_in, self.tcp_port_out,
1134 proto=IP_PROTOS.tcp)
1135 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1136 self.udp_port_in, self.udp_port_out,
1137 proto=IP_PROTOS.udp)
1138 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1139 self.icmp_id_in, self.icmp_id_out,
1140 proto=IP_PROTOS.icmp)
1141 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1142 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1146 pkts = self.create_stream_out(self.pg1)
1147 self.pg1.add_stream(pkts)
1148 self.pg_enable_capture(self.pg_interfaces)
1150 capture = self.pg0.get_capture(len(pkts))
1151 self.verify_capture_in(capture, self.pg0)
1154 pkts = self.create_stream_in(self.pg0, self.pg1)
1155 self.pg0.add_stream(pkts)
1156 self.pg_enable_capture(self.pg_interfaces)
1158 capture = self.pg1.get_capture(len(pkts))
1159 self.verify_capture_out(capture)
1161 def test_static_vrf_aware(self):
1162 """ 1:1 NAT VRF awareness """
1164 nat_ip1 = "10.0.0.30"
1165 nat_ip2 = "10.0.0.40"
1166 self.tcp_port_out = 6303
1167 self.udp_port_out = 6304
1168 self.icmp_id_out = 6305
1170 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1172 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1174 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1176 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1177 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1179 # inside interface VRF match NAT44 static mapping VRF
1180 pkts = self.create_stream_in(self.pg4, self.pg3)
1181 self.pg4.add_stream(pkts)
1182 self.pg_enable_capture(self.pg_interfaces)
1184 capture = self.pg3.get_capture(len(pkts))
1185 self.verify_capture_out(capture, nat_ip1, True)
1187 # inside interface VRF don't match NAT44 static mapping VRF (packets
1189 pkts = self.create_stream_in(self.pg0, self.pg3)
1190 self.pg0.add_stream(pkts)
1191 self.pg_enable_capture(self.pg_interfaces)
1193 self.pg3.assert_nothing_captured()
1195 def test_static_lb(self):
1196 """ NAT44 local service load balancing """
1197 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1200 server1 = self.pg0.remote_hosts[0]
1201 server2 = self.pg0.remote_hosts[1]
1203 locals = [{'addr': server1.ip4n,
1206 {'addr': server2.ip4n,
1210 self.nat44_add_address(self.nat_addr)
1211 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1214 local_num=len(locals),
1216 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1217 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1220 # from client to service
1221 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1222 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1223 TCP(sport=12345, dport=external_port))
1224 self.pg1.add_stream(p)
1225 self.pg_enable_capture(self.pg_interfaces)
1227 capture = self.pg0.get_capture(1)
1233 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1234 if ip.dst == server1.ip4:
1238 self.assertEqual(tcp.dport, local_port)
1239 self.check_tcp_checksum(p)
1240 self.check_ip_checksum(p)
1242 self.logger.error(ppp("Unexpected or invalid packet:", p))
1245 # from service back to client
1246 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1247 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1248 TCP(sport=local_port, dport=12345))
1249 self.pg0.add_stream(p)
1250 self.pg_enable_capture(self.pg_interfaces)
1252 capture = self.pg1.get_capture(1)
1257 self.assertEqual(ip.src, self.nat_addr)
1258 self.assertEqual(tcp.sport, external_port)
1259 self.check_tcp_checksum(p)
1260 self.check_ip_checksum(p)
1262 self.logger.error(ppp("Unexpected or invalid packet:", p))
1268 clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1270 for client in clients:
1271 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1272 IP(src=client, dst=self.nat_addr) /
1273 TCP(sport=12345, dport=external_port))
1275 self.pg1.add_stream(pkts)
1276 self.pg_enable_capture(self.pg_interfaces)
1278 capture = self.pg0.get_capture(len(pkts))
1280 if p[IP].dst == server1.ip4:
1284 self.assertTrue(server1_n > server2_n)
1286 def test_multiple_inside_interfaces(self):
1287 """ NAT44 multiple non-overlapping address space inside interfaces """
1289 self.nat44_add_address(self.nat_addr)
1290 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1291 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1292 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1295 # between two NAT44 inside interfaces (no translation)
1296 pkts = self.create_stream_in(self.pg0, self.pg1)
1297 self.pg0.add_stream(pkts)
1298 self.pg_enable_capture(self.pg_interfaces)
1300 capture = self.pg1.get_capture(len(pkts))
1301 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1303 # from NAT44 inside to interface without NAT44 feature (no translation)
1304 pkts = self.create_stream_in(self.pg0, self.pg2)
1305 self.pg0.add_stream(pkts)
1306 self.pg_enable_capture(self.pg_interfaces)
1308 capture = self.pg2.get_capture(len(pkts))
1309 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1311 # in2out 1st interface
1312 pkts = self.create_stream_in(self.pg0, self.pg3)
1313 self.pg0.add_stream(pkts)
1314 self.pg_enable_capture(self.pg_interfaces)
1316 capture = self.pg3.get_capture(len(pkts))
1317 self.verify_capture_out(capture)
1319 # out2in 1st interface
1320 pkts = self.create_stream_out(self.pg3)
1321 self.pg3.add_stream(pkts)
1322 self.pg_enable_capture(self.pg_interfaces)
1324 capture = self.pg0.get_capture(len(pkts))
1325 self.verify_capture_in(capture, self.pg0)
1327 # in2out 2nd interface
1328 pkts = self.create_stream_in(self.pg1, self.pg3)
1329 self.pg1.add_stream(pkts)
1330 self.pg_enable_capture(self.pg_interfaces)
1332 capture = self.pg3.get_capture(len(pkts))
1333 self.verify_capture_out(capture)
1335 # out2in 2nd interface
1336 pkts = self.create_stream_out(self.pg3)
1337 self.pg3.add_stream(pkts)
1338 self.pg_enable_capture(self.pg_interfaces)
1340 capture = self.pg1.get_capture(len(pkts))
1341 self.verify_capture_in(capture, self.pg1)
1343 def test_inside_overlapping_interfaces(self):
1344 """ NAT44 multiple inside interfaces with overlapping address space """
1346 static_nat_ip = "10.0.0.10"
1347 self.nat44_add_address(self.nat_addr)
1348 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1350 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1351 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1352 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1353 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1356 # between NAT44 inside interfaces with same VRF (no translation)
1357 pkts = self.create_stream_in(self.pg4, self.pg5)
1358 self.pg4.add_stream(pkts)
1359 self.pg_enable_capture(self.pg_interfaces)
1361 capture = self.pg5.get_capture(len(pkts))
1362 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1364 # between NAT44 inside interfaces with different VRF (hairpinning)
1365 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1366 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1367 TCP(sport=1234, dport=5678))
1368 self.pg4.add_stream(p)
1369 self.pg_enable_capture(self.pg_interfaces)
1371 capture = self.pg6.get_capture(1)
1376 self.assertEqual(ip.src, self.nat_addr)
1377 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1378 self.assertNotEqual(tcp.sport, 1234)
1379 self.assertEqual(tcp.dport, 5678)
1381 self.logger.error(ppp("Unexpected or invalid packet:", p))
1384 # in2out 1st interface
1385 pkts = self.create_stream_in(self.pg4, self.pg3)
1386 self.pg4.add_stream(pkts)
1387 self.pg_enable_capture(self.pg_interfaces)
1389 capture = self.pg3.get_capture(len(pkts))
1390 self.verify_capture_out(capture)
1392 # out2in 1st interface
1393 pkts = self.create_stream_out(self.pg3)
1394 self.pg3.add_stream(pkts)
1395 self.pg_enable_capture(self.pg_interfaces)
1397 capture = self.pg4.get_capture(len(pkts))
1398 self.verify_capture_in(capture, self.pg4)
1400 # in2out 2nd interface
1401 pkts = self.create_stream_in(self.pg5, self.pg3)
1402 self.pg5.add_stream(pkts)
1403 self.pg_enable_capture(self.pg_interfaces)
1405 capture = self.pg3.get_capture(len(pkts))
1406 self.verify_capture_out(capture)
1408 # out2in 2nd interface
1409 pkts = self.create_stream_out(self.pg3)
1410 self.pg3.add_stream(pkts)
1411 self.pg_enable_capture(self.pg_interfaces)
1413 capture = self.pg5.get_capture(len(pkts))
1414 self.verify_capture_in(capture, self.pg5)
1417 addresses = self.vapi.nat44_address_dump()
1418 self.assertEqual(len(addresses), 1)
1419 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1420 self.assertEqual(len(sessions), 3)
1421 for session in sessions:
1422 self.assertFalse(session.is_static)
1423 self.assertEqual(session.inside_ip_address[0:4],
1424 self.pg5.remote_ip4n)
1425 self.assertEqual(session.outside_ip_address,
1426 addresses[0].ip_address)
1427 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1428 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1429 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1430 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1431 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1432 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1433 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1434 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1435 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1437 # in2out 3rd interface
1438 pkts = self.create_stream_in(self.pg6, self.pg3)
1439 self.pg6.add_stream(pkts)
1440 self.pg_enable_capture(self.pg_interfaces)
1442 capture = self.pg3.get_capture(len(pkts))
1443 self.verify_capture_out(capture, static_nat_ip, True)
1445 # out2in 3rd interface
1446 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1447 self.pg3.add_stream(pkts)
1448 self.pg_enable_capture(self.pg_interfaces)
1450 capture = self.pg6.get_capture(len(pkts))
1451 self.verify_capture_in(capture, self.pg6)
1453 # general user and session dump verifications
1454 users = self.vapi.nat44_user_dump()
1455 self.assertTrue(len(users) >= 3)
1456 addresses = self.vapi.nat44_address_dump()
1457 self.assertEqual(len(addresses), 1)
1459 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1461 for session in sessions:
1462 self.assertEqual(user.ip_address, session.inside_ip_address)
1463 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1464 self.assertTrue(session.protocol in
1465 [IP_PROTOS.tcp, IP_PROTOS.udp,
1469 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1470 self.assertTrue(len(sessions) >= 4)
1471 for session in sessions:
1472 self.assertFalse(session.is_static)
1473 self.assertEqual(session.inside_ip_address[0:4],
1474 self.pg4.remote_ip4n)
1475 self.assertEqual(session.outside_ip_address,
1476 addresses[0].ip_address)
1479 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1480 self.assertTrue(len(sessions) >= 3)
1481 for session in sessions:
1482 self.assertTrue(session.is_static)
1483 self.assertEqual(session.inside_ip_address[0:4],
1484 self.pg6.remote_ip4n)
1485 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1486 map(int, static_nat_ip.split('.')))
1487 self.assertTrue(session.inside_port in
1488 [self.tcp_port_in, self.udp_port_in,
1491 def test_hairpinning(self):
1492 """ NAT44 hairpinning - 1:1 NAPT """
1494 host = self.pg0.remote_hosts[0]
1495 server = self.pg0.remote_hosts[1]
1498 server_in_port = 5678
1499 server_out_port = 8765
1501 self.nat44_add_address(self.nat_addr)
1502 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1503 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1505 # add static mapping for server
1506 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1507 server_in_port, server_out_port,
1508 proto=IP_PROTOS.tcp)
1510 # send packet from host to server
1511 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1512 IP(src=host.ip4, dst=self.nat_addr) /
1513 TCP(sport=host_in_port, dport=server_out_port))
1514 self.pg0.add_stream(p)
1515 self.pg_enable_capture(self.pg_interfaces)
1517 capture = self.pg0.get_capture(1)
1522 self.assertEqual(ip.src, self.nat_addr)
1523 self.assertEqual(ip.dst, server.ip4)
1524 self.assertNotEqual(tcp.sport, host_in_port)
1525 self.assertEqual(tcp.dport, server_in_port)
1526 self.check_tcp_checksum(p)
1527 host_out_port = tcp.sport
1529 self.logger.error(ppp("Unexpected or invalid packet:", p))
1532 # send reply from server to host
1533 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1534 IP(src=server.ip4, dst=self.nat_addr) /
1535 TCP(sport=server_in_port, dport=host_out_port))
1536 self.pg0.add_stream(p)
1537 self.pg_enable_capture(self.pg_interfaces)
1539 capture = self.pg0.get_capture(1)
1544 self.assertEqual(ip.src, self.nat_addr)
1545 self.assertEqual(ip.dst, host.ip4)
1546 self.assertEqual(tcp.sport, server_out_port)
1547 self.assertEqual(tcp.dport, host_in_port)
1548 self.check_tcp_checksum(p)
1550 self.logger.error(ppp("Unexpected or invalid packet:"), p)
1553 def test_hairpinning2(self):
1554 """ NAT44 hairpinning - 1:1 NAT"""
1556 server1_nat_ip = "10.0.0.10"
1557 server2_nat_ip = "10.0.0.11"
1558 host = self.pg0.remote_hosts[0]
1559 server1 = self.pg0.remote_hosts[1]
1560 server2 = self.pg0.remote_hosts[2]
1561 server_tcp_port = 22
1562 server_udp_port = 20
1564 self.nat44_add_address(self.nat_addr)
1565 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1566 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1569 # add static mapping for servers
1570 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1571 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1575 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1576 IP(src=host.ip4, dst=server1_nat_ip) /
1577 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1579 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1580 IP(src=host.ip4, dst=server1_nat_ip) /
1581 UDP(sport=self.udp_port_in, dport=server_udp_port))
1583 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1584 IP(src=host.ip4, dst=server1_nat_ip) /
1585 ICMP(id=self.icmp_id_in, type='echo-request'))
1587 self.pg0.add_stream(pkts)
1588 self.pg_enable_capture(self.pg_interfaces)
1590 capture = self.pg0.get_capture(len(pkts))
1591 for packet in capture:
1593 self.assertEqual(packet[IP].src, self.nat_addr)
1594 self.assertEqual(packet[IP].dst, server1.ip4)
1595 if packet.haslayer(TCP):
1596 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1597 self.assertEqual(packet[TCP].dport, server_tcp_port)
1598 self.tcp_port_out = packet[TCP].sport
1599 self.check_tcp_checksum(packet)
1600 elif packet.haslayer(UDP):
1601 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1602 self.assertEqual(packet[UDP].dport, server_udp_port)
1603 self.udp_port_out = packet[UDP].sport
1605 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1606 self.icmp_id_out = packet[ICMP].id
1608 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1613 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1614 IP(src=server1.ip4, dst=self.nat_addr) /
1615 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1617 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1618 IP(src=server1.ip4, dst=self.nat_addr) /
1619 UDP(sport=server_udp_port, dport=self.udp_port_out))
1621 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1622 IP(src=server1.ip4, dst=self.nat_addr) /
1623 ICMP(id=self.icmp_id_out, type='echo-reply'))
1625 self.pg0.add_stream(pkts)
1626 self.pg_enable_capture(self.pg_interfaces)
1628 capture = self.pg0.get_capture(len(pkts))
1629 for packet in capture:
1631 self.assertEqual(packet[IP].src, server1_nat_ip)
1632 self.assertEqual(packet[IP].dst, host.ip4)
1633 if packet.haslayer(TCP):
1634 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1635 self.assertEqual(packet[TCP].sport, server_tcp_port)
1636 self.check_tcp_checksum(packet)
1637 elif packet.haslayer(UDP):
1638 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1639 self.assertEqual(packet[UDP].sport, server_udp_port)
1641 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1643 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1646 # server2 to server1
1648 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1649 IP(src=server2.ip4, dst=server1_nat_ip) /
1650 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1652 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1653 IP(src=server2.ip4, dst=server1_nat_ip) /
1654 UDP(sport=self.udp_port_in, dport=server_udp_port))
1656 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1657 IP(src=server2.ip4, dst=server1_nat_ip) /
1658 ICMP(id=self.icmp_id_in, type='echo-request'))
1660 self.pg0.add_stream(pkts)
1661 self.pg_enable_capture(self.pg_interfaces)
1663 capture = self.pg0.get_capture(len(pkts))
1664 for packet in capture:
1666 self.assertEqual(packet[IP].src, server2_nat_ip)
1667 self.assertEqual(packet[IP].dst, server1.ip4)
1668 if packet.haslayer(TCP):
1669 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1670 self.assertEqual(packet[TCP].dport, server_tcp_port)
1671 self.tcp_port_out = packet[TCP].sport
1672 self.check_tcp_checksum(packet)
1673 elif packet.haslayer(UDP):
1674 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1675 self.assertEqual(packet[UDP].dport, server_udp_port)
1676 self.udp_port_out = packet[UDP].sport
1678 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1679 self.icmp_id_out = packet[ICMP].id
1681 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1684 # server1 to server2
1686 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1687 IP(src=server1.ip4, dst=server2_nat_ip) /
1688 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1690 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1691 IP(src=server1.ip4, dst=server2_nat_ip) /
1692 UDP(sport=server_udp_port, dport=self.udp_port_out))
1694 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1695 IP(src=server1.ip4, dst=server2_nat_ip) /
1696 ICMP(id=self.icmp_id_out, type='echo-reply'))
1698 self.pg0.add_stream(pkts)
1699 self.pg_enable_capture(self.pg_interfaces)
1701 capture = self.pg0.get_capture(len(pkts))
1702 for packet in capture:
1704 self.assertEqual(packet[IP].src, server1_nat_ip)
1705 self.assertEqual(packet[IP].dst, server2.ip4)
1706 if packet.haslayer(TCP):
1707 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1708 self.assertEqual(packet[TCP].sport, server_tcp_port)
1709 self.check_tcp_checksum(packet)
1710 elif packet.haslayer(UDP):
1711 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1712 self.assertEqual(packet[UDP].sport, server_udp_port)
1714 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1716 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1719 def test_max_translations_per_user(self):
1720 """ MAX translations per user - recycle the least recently used """
1722 self.nat44_add_address(self.nat_addr)
1723 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1724 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1727 # get maximum number of translations per user
1728 nat44_config = self.vapi.nat_show_config()
1730 # send more than maximum number of translations per user packets
1731 pkts_num = nat44_config.max_translations_per_user + 5
1733 for port in range(0, pkts_num):
1734 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1735 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1736 TCP(sport=1025 + port))
1738 self.pg0.add_stream(pkts)
1739 self.pg_enable_capture(self.pg_interfaces)
1742 # verify number of translated packet
1743 self.pg1.get_capture(pkts_num)
1745 def test_interface_addr(self):
1746 """ Acquire NAT44 addresses from interface """
1747 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1749 # no address in NAT pool
1750 adresses = self.vapi.nat44_address_dump()
1751 self.assertEqual(0, len(adresses))
1753 # configure interface address and check NAT address pool
1754 self.pg7.config_ip4()
1755 adresses = self.vapi.nat44_address_dump()
1756 self.assertEqual(1, len(adresses))
1757 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1759 # remove interface address and check NAT address pool
1760 self.pg7.unconfig_ip4()
1761 adresses = self.vapi.nat44_address_dump()
1762 self.assertEqual(0, len(adresses))
1764 def test_interface_addr_static_mapping(self):
1765 """ Static mapping with addresses from interface """
1766 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1767 self.nat44_add_static_mapping(
1769 external_sw_if_index=self.pg7.sw_if_index)
1771 # static mappings with external interface
1772 static_mappings = self.vapi.nat44_static_mapping_dump()
1773 self.assertEqual(1, len(static_mappings))
1774 self.assertEqual(self.pg7.sw_if_index,
1775 static_mappings[0].external_sw_if_index)
1777 # configure interface address and check static mappings
1778 self.pg7.config_ip4()
1779 static_mappings = self.vapi.nat44_static_mapping_dump()
1780 self.assertEqual(1, len(static_mappings))
1781 self.assertEqual(static_mappings[0].external_ip_address[0:4],
1782 self.pg7.local_ip4n)
1783 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1785 # remove interface address and check static mappings
1786 self.pg7.unconfig_ip4()
1787 static_mappings = self.vapi.nat44_static_mapping_dump()
1788 self.assertEqual(0, len(static_mappings))
1790 def test_ipfix_nat44_sess(self):
1791 """ IPFIX logging NAT44 session created/delted """
1792 self.ipfix_domain_id = 10
1793 self.ipfix_src_port = 20202
1794 colector_port = 30303
1795 bind_layers(UDP, IPFIX, dport=30303)
1796 self.nat44_add_address(self.nat_addr)
1797 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1798 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1800 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1801 src_address=self.pg3.local_ip4n,
1803 template_interval=10,
1804 collector_port=colector_port)
1805 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1806 src_port=self.ipfix_src_port)
1808 pkts = self.create_stream_in(self.pg0, self.pg1)
1809 self.pg0.add_stream(pkts)
1810 self.pg_enable_capture(self.pg_interfaces)
1812 capture = self.pg1.get_capture(len(pkts))
1813 self.verify_capture_out(capture)
1814 self.nat44_add_address(self.nat_addr, is_add=0)
1815 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1816 capture = self.pg3.get_capture(3)
1817 ipfix = IPFIXDecoder()
1818 # first load template
1820 self.assertTrue(p.haslayer(IPFIX))
1821 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1822 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1823 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1824 self.assertEqual(p[UDP].dport, colector_port)
1825 self.assertEqual(p[IPFIX].observationDomainID,
1826 self.ipfix_domain_id)
1827 if p.haslayer(Template):
1828 ipfix.add_template(p.getlayer(Template))
1829 # verify events in data set
1831 if p.haslayer(Data):
1832 data = ipfix.decode_data_set(p.getlayer(Set))
1833 self.verify_ipfix_nat44_ses(data)
1835 def test_ipfix_addr_exhausted(self):
1836 """ IPFIX logging NAT addresses exhausted """
1837 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1838 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1840 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1841 src_address=self.pg3.local_ip4n,
1843 template_interval=10)
1844 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1845 src_port=self.ipfix_src_port)
1847 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1848 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1850 self.pg0.add_stream(p)
1851 self.pg_enable_capture(self.pg_interfaces)
1853 capture = self.pg1.get_capture(0)
1854 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1855 capture = self.pg3.get_capture(3)
1856 ipfix = IPFIXDecoder()
1857 # first load template
1859 self.assertTrue(p.haslayer(IPFIX))
1860 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1861 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1862 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1863 self.assertEqual(p[UDP].dport, 4739)
1864 self.assertEqual(p[IPFIX].observationDomainID,
1865 self.ipfix_domain_id)
1866 if p.haslayer(Template):
1867 ipfix.add_template(p.getlayer(Template))
1868 # verify events in data set
1870 if p.haslayer(Data):
1871 data = ipfix.decode_data_set(p.getlayer(Set))
1872 self.verify_ipfix_addr_exhausted(data)
1874 def test_pool_addr_fib(self):
1875 """ NAT44 add pool addresses to FIB """
1876 static_addr = '10.0.0.10'
1877 self.nat44_add_address(self.nat_addr)
1878 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1879 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1881 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
1884 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1885 ARP(op=ARP.who_has, pdst=self.nat_addr,
1886 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1887 self.pg1.add_stream(p)
1888 self.pg_enable_capture(self.pg_interfaces)
1890 capture = self.pg1.get_capture(1)
1891 self.assertTrue(capture[0].haslayer(ARP))
1892 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1895 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1896 ARP(op=ARP.who_has, pdst=static_addr,
1897 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1898 self.pg1.add_stream(p)
1899 self.pg_enable_capture(self.pg_interfaces)
1901 capture = self.pg1.get_capture(1)
1902 self.assertTrue(capture[0].haslayer(ARP))
1903 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1905 # send ARP to non-NAT44 interface
1906 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1907 ARP(op=ARP.who_has, pdst=self.nat_addr,
1908 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1909 self.pg2.add_stream(p)
1910 self.pg_enable_capture(self.pg_interfaces)
1912 capture = self.pg1.get_capture(0)
1914 # remove addresses and verify
1915 self.nat44_add_address(self.nat_addr, is_add=0)
1916 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
1919 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1920 ARP(op=ARP.who_has, pdst=self.nat_addr,
1921 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1922 self.pg1.add_stream(p)
1923 self.pg_enable_capture(self.pg_interfaces)
1925 capture = self.pg1.get_capture(0)
1927 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1928 ARP(op=ARP.who_has, pdst=static_addr,
1929 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1930 self.pg1.add_stream(p)
1931 self.pg_enable_capture(self.pg_interfaces)
1933 capture = self.pg1.get_capture(0)
1935 def test_vrf_mode(self):
1936 """ NAT44 tenant VRF aware address pool mode """
1940 nat_ip1 = "10.0.0.10"
1941 nat_ip2 = "10.0.0.11"
1943 self.pg0.unconfig_ip4()
1944 self.pg1.unconfig_ip4()
1945 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
1946 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
1947 self.pg0.set_table_ip4(vrf_id1)
1948 self.pg1.set_table_ip4(vrf_id2)
1949 self.pg0.config_ip4()
1950 self.pg1.config_ip4()
1952 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
1953 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
1954 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1955 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1956 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1960 pkts = self.create_stream_in(self.pg0, self.pg2)
1961 self.pg0.add_stream(pkts)
1962 self.pg_enable_capture(self.pg_interfaces)
1964 capture = self.pg2.get_capture(len(pkts))
1965 self.verify_capture_out(capture, nat_ip1)
1968 pkts = self.create_stream_in(self.pg1, self.pg2)
1969 self.pg1.add_stream(pkts)
1970 self.pg_enable_capture(self.pg_interfaces)
1972 capture = self.pg2.get_capture(len(pkts))
1973 self.verify_capture_out(capture, nat_ip2)
1975 self.pg0.unconfig_ip4()
1976 self.pg1.unconfig_ip4()
1977 self.pg0.set_table_ip4(0)
1978 self.pg1.set_table_ip4(0)
1979 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
1980 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
1982 def test_vrf_feature_independent(self):
1983 """ NAT44 tenant VRF independent address pool mode """
1985 nat_ip1 = "10.0.0.10"
1986 nat_ip2 = "10.0.0.11"
1988 self.nat44_add_address(nat_ip1)
1989 self.nat44_add_address(nat_ip2)
1990 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1991 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1992 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1996 pkts = self.create_stream_in(self.pg0, self.pg2)
1997 self.pg0.add_stream(pkts)
1998 self.pg_enable_capture(self.pg_interfaces)
2000 capture = self.pg2.get_capture(len(pkts))
2001 self.verify_capture_out(capture, nat_ip1)
2004 pkts = self.create_stream_in(self.pg1, self.pg2)
2005 self.pg1.add_stream(pkts)
2006 self.pg_enable_capture(self.pg_interfaces)
2008 capture = self.pg2.get_capture(len(pkts))
2009 self.verify_capture_out(capture, nat_ip1)
2011 def test_dynamic_ipless_interfaces(self):
2012 """ NAT44 interfaces without configured IP address """
2014 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2015 mactobinary(self.pg7.remote_mac),
2016 self.pg7.remote_ip4n,
2018 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2019 mactobinary(self.pg8.remote_mac),
2020 self.pg8.remote_ip4n,
2023 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2024 dst_address_length=32,
2025 next_hop_address=self.pg7.remote_ip4n,
2026 next_hop_sw_if_index=self.pg7.sw_if_index)
2027 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2028 dst_address_length=32,
2029 next_hop_address=self.pg8.remote_ip4n,
2030 next_hop_sw_if_index=self.pg8.sw_if_index)
2032 self.nat44_add_address(self.nat_addr)
2033 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2034 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2038 pkts = self.create_stream_in(self.pg7, self.pg8)
2039 self.pg7.add_stream(pkts)
2040 self.pg_enable_capture(self.pg_interfaces)
2042 capture = self.pg8.get_capture(len(pkts))
2043 self.verify_capture_out(capture)
2046 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2047 self.pg8.add_stream(pkts)
2048 self.pg_enable_capture(self.pg_interfaces)
2050 capture = self.pg7.get_capture(len(pkts))
2051 self.verify_capture_in(capture, self.pg7)
2053 def test_static_ipless_interfaces(self):
2054 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2056 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2057 mactobinary(self.pg7.remote_mac),
2058 self.pg7.remote_ip4n,
2060 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2061 mactobinary(self.pg8.remote_mac),
2062 self.pg8.remote_ip4n,
2065 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2066 dst_address_length=32,
2067 next_hop_address=self.pg7.remote_ip4n,
2068 next_hop_sw_if_index=self.pg7.sw_if_index)
2069 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2070 dst_address_length=32,
2071 next_hop_address=self.pg8.remote_ip4n,
2072 next_hop_sw_if_index=self.pg8.sw_if_index)
2074 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2075 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2076 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2080 pkts = self.create_stream_out(self.pg8)
2081 self.pg8.add_stream(pkts)
2082 self.pg_enable_capture(self.pg_interfaces)
2084 capture = self.pg7.get_capture(len(pkts))
2085 self.verify_capture_in(capture, self.pg7)
2088 pkts = self.create_stream_in(self.pg7, self.pg8)
2089 self.pg7.add_stream(pkts)
2090 self.pg_enable_capture(self.pg_interfaces)
2092 capture = self.pg8.get_capture(len(pkts))
2093 self.verify_capture_out(capture, self.nat_addr, True)
2095 def test_static_with_port_ipless_interfaces(self):
2096 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2098 self.tcp_port_out = 30606
2099 self.udp_port_out = 30607
2100 self.icmp_id_out = 30608
2102 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2103 mactobinary(self.pg7.remote_mac),
2104 self.pg7.remote_ip4n,
2106 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2107 mactobinary(self.pg8.remote_mac),
2108 self.pg8.remote_ip4n,
2111 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2112 dst_address_length=32,
2113 next_hop_address=self.pg7.remote_ip4n,
2114 next_hop_sw_if_index=self.pg7.sw_if_index)
2115 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2116 dst_address_length=32,
2117 next_hop_address=self.pg8.remote_ip4n,
2118 next_hop_sw_if_index=self.pg8.sw_if_index)
2120 self.nat44_add_address(self.nat_addr)
2121 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2122 self.tcp_port_in, self.tcp_port_out,
2123 proto=IP_PROTOS.tcp)
2124 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2125 self.udp_port_in, self.udp_port_out,
2126 proto=IP_PROTOS.udp)
2127 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2128 self.icmp_id_in, self.icmp_id_out,
2129 proto=IP_PROTOS.icmp)
2130 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2131 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2135 pkts = self.create_stream_out(self.pg8)
2136 self.pg8.add_stream(pkts)
2137 self.pg_enable_capture(self.pg_interfaces)
2139 capture = self.pg7.get_capture(len(pkts))
2140 self.verify_capture_in(capture, self.pg7)
2143 pkts = self.create_stream_in(self.pg7, self.pg8)
2144 self.pg7.add_stream(pkts)
2145 self.pg_enable_capture(self.pg_interfaces)
2147 capture = self.pg8.get_capture(len(pkts))
2148 self.verify_capture_out(capture)
2150 def test_static_unknown_proto(self):
2151 """ 1:1 NAT translate packet with unknown protocol """
2152 nat_ip = "10.0.0.10"
2153 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2154 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2155 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2159 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2160 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2162 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2163 TCP(sport=1234, dport=1234))
2164 self.pg0.add_stream(p)
2165 self.pg_enable_capture(self.pg_interfaces)
2167 p = self.pg1.get_capture(1)
2170 self.assertEqual(packet[IP].src, nat_ip)
2171 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2172 self.assertTrue(packet.haslayer(GRE))
2173 self.check_ip_checksum(packet)
2175 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2179 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2180 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2182 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2183 TCP(sport=1234, dport=1234))
2184 self.pg1.add_stream(p)
2185 self.pg_enable_capture(self.pg_interfaces)
2187 p = self.pg0.get_capture(1)
2190 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2191 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2192 self.assertTrue(packet.haslayer(GRE))
2193 self.check_ip_checksum(packet)
2195 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2198 def test_hairpinning_static_unknown_proto(self):
2199 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2201 host = self.pg0.remote_hosts[0]
2202 server = self.pg0.remote_hosts[1]
2204 host_nat_ip = "10.0.0.10"
2205 server_nat_ip = "10.0.0.11"
2207 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2208 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2209 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2210 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2214 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2215 IP(src=host.ip4, dst=server_nat_ip) /
2217 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2218 TCP(sport=1234, dport=1234))
2219 self.pg0.add_stream(p)
2220 self.pg_enable_capture(self.pg_interfaces)
2222 p = self.pg0.get_capture(1)
2225 self.assertEqual(packet[IP].src, host_nat_ip)
2226 self.assertEqual(packet[IP].dst, server.ip4)
2227 self.assertTrue(packet.haslayer(GRE))
2228 self.check_ip_checksum(packet)
2230 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2234 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2235 IP(src=server.ip4, dst=host_nat_ip) /
2237 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2238 TCP(sport=1234, dport=1234))
2239 self.pg0.add_stream(p)
2240 self.pg_enable_capture(self.pg_interfaces)
2242 p = self.pg0.get_capture(1)
2245 self.assertEqual(packet[IP].src, server_nat_ip)
2246 self.assertEqual(packet[IP].dst, host.ip4)
2247 self.assertTrue(packet.haslayer(GRE))
2248 self.check_ip_checksum(packet)
2250 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2253 def test_unknown_proto(self):
2254 """ NAT44 translate packet with unknown protocol """
2255 self.nat44_add_address(self.nat_addr)
2256 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2257 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2261 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2262 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2263 TCP(sport=self.tcp_port_in, dport=20))
2264 self.pg0.add_stream(p)
2265 self.pg_enable_capture(self.pg_interfaces)
2267 p = self.pg1.get_capture(1)
2269 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2270 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2272 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2273 TCP(sport=1234, dport=1234))
2274 self.pg0.add_stream(p)
2275 self.pg_enable_capture(self.pg_interfaces)
2277 p = self.pg1.get_capture(1)
2280 self.assertEqual(packet[IP].src, self.nat_addr)
2281 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2282 self.assertTrue(packet.haslayer(GRE))
2283 self.check_ip_checksum(packet)
2285 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2289 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2290 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2292 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2293 TCP(sport=1234, dport=1234))
2294 self.pg1.add_stream(p)
2295 self.pg_enable_capture(self.pg_interfaces)
2297 p = self.pg0.get_capture(1)
2300 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2301 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2302 self.assertTrue(packet.haslayer(GRE))
2303 self.check_ip_checksum(packet)
2305 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2308 def test_hairpinning_unknown_proto(self):
2309 """ NAT44 translate packet with unknown protocol - hairpinning """
2310 host = self.pg0.remote_hosts[0]
2311 server = self.pg0.remote_hosts[1]
2314 server_in_port = 5678
2315 server_out_port = 8765
2316 server_nat_ip = "10.0.0.11"
2318 self.nat44_add_address(self.nat_addr)
2319 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2320 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2323 # add static mapping for server
2324 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2327 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2328 IP(src=host.ip4, dst=server_nat_ip) /
2329 TCP(sport=host_in_port, dport=server_out_port))
2330 self.pg0.add_stream(p)
2331 self.pg_enable_capture(self.pg_interfaces)
2333 capture = self.pg0.get_capture(1)
2335 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2336 IP(src=host.ip4, dst=server_nat_ip) /
2338 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2339 TCP(sport=1234, dport=1234))
2340 self.pg0.add_stream(p)
2341 self.pg_enable_capture(self.pg_interfaces)
2343 p = self.pg0.get_capture(1)
2346 self.assertEqual(packet[IP].src, self.nat_addr)
2347 self.assertEqual(packet[IP].dst, server.ip4)
2348 self.assertTrue(packet.haslayer(GRE))
2349 self.check_ip_checksum(packet)
2351 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2355 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2356 IP(src=server.ip4, dst=self.nat_addr) /
2358 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2359 TCP(sport=1234, dport=1234))
2360 self.pg0.add_stream(p)
2361 self.pg_enable_capture(self.pg_interfaces)
2363 p = self.pg0.get_capture(1)
2366 self.assertEqual(packet[IP].src, server_nat_ip)
2367 self.assertEqual(packet[IP].dst, host.ip4)
2368 self.assertTrue(packet.haslayer(GRE))
2369 self.check_ip_checksum(packet)
2371 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2374 def test_output_feature(self):
2375 """ NAT44 interface output feature (in2out postrouting) """
2376 self.nat44_add_address(self.nat_addr)
2377 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2378 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2379 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2383 pkts = self.create_stream_in(self.pg0, self.pg3)
2384 self.pg0.add_stream(pkts)
2385 self.pg_enable_capture(self.pg_interfaces)
2387 capture = self.pg3.get_capture(len(pkts))
2388 self.verify_capture_out(capture)
2391 pkts = self.create_stream_out(self.pg3)
2392 self.pg3.add_stream(pkts)
2393 self.pg_enable_capture(self.pg_interfaces)
2395 capture = self.pg0.get_capture(len(pkts))
2396 self.verify_capture_in(capture, self.pg0)
2398 # from non-NAT interface to NAT inside interface
2399 pkts = self.create_stream_in(self.pg2, self.pg0)
2400 self.pg2.add_stream(pkts)
2401 self.pg_enable_capture(self.pg_interfaces)
2403 capture = self.pg0.get_capture(len(pkts))
2404 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2406 def test_output_feature_vrf_aware(self):
2407 """ NAT44 interface output feature VRF aware (in2out postrouting) """
2408 nat_ip_vrf10 = "10.0.0.10"
2409 nat_ip_vrf20 = "10.0.0.20"
2411 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2412 dst_address_length=32,
2413 next_hop_address=self.pg3.remote_ip4n,
2414 next_hop_sw_if_index=self.pg3.sw_if_index,
2416 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2417 dst_address_length=32,
2418 next_hop_address=self.pg3.remote_ip4n,
2419 next_hop_sw_if_index=self.pg3.sw_if_index,
2422 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2423 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2424 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2425 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2426 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2430 pkts = self.create_stream_in(self.pg4, self.pg3)
2431 self.pg4.add_stream(pkts)
2432 self.pg_enable_capture(self.pg_interfaces)
2434 capture = self.pg3.get_capture(len(pkts))
2435 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2438 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2439 self.pg3.add_stream(pkts)
2440 self.pg_enable_capture(self.pg_interfaces)
2442 capture = self.pg4.get_capture(len(pkts))
2443 self.verify_capture_in(capture, self.pg4)
2446 pkts = self.create_stream_in(self.pg6, self.pg3)
2447 self.pg6.add_stream(pkts)
2448 self.pg_enable_capture(self.pg_interfaces)
2450 capture = self.pg3.get_capture(len(pkts))
2451 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2454 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2455 self.pg3.add_stream(pkts)
2456 self.pg_enable_capture(self.pg_interfaces)
2458 capture = self.pg6.get_capture(len(pkts))
2459 self.verify_capture_in(capture, self.pg6)
2461 def test_output_feature_hairpinning(self):
2462 """ NAT44 interface output feature hairpinning (in2out postrouting) """
2463 host = self.pg0.remote_hosts[0]
2464 server = self.pg0.remote_hosts[1]
2467 server_in_port = 5678
2468 server_out_port = 8765
2470 self.nat44_add_address(self.nat_addr)
2471 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2472 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2475 # add static mapping for server
2476 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2477 server_in_port, server_out_port,
2478 proto=IP_PROTOS.tcp)
2480 # send packet from host to server
2481 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2482 IP(src=host.ip4, dst=self.nat_addr) /
2483 TCP(sport=host_in_port, dport=server_out_port))
2484 self.pg0.add_stream(p)
2485 self.pg_enable_capture(self.pg_interfaces)
2487 capture = self.pg0.get_capture(1)
2492 self.assertEqual(ip.src, self.nat_addr)
2493 self.assertEqual(ip.dst, server.ip4)
2494 self.assertNotEqual(tcp.sport, host_in_port)
2495 self.assertEqual(tcp.dport, server_in_port)
2496 self.check_tcp_checksum(p)
2497 host_out_port = tcp.sport
2499 self.logger.error(ppp("Unexpected or invalid packet:", p))
2502 # send reply from server to host
2503 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2504 IP(src=server.ip4, dst=self.nat_addr) /
2505 TCP(sport=server_in_port, dport=host_out_port))
2506 self.pg0.add_stream(p)
2507 self.pg_enable_capture(self.pg_interfaces)
2509 capture = self.pg0.get_capture(1)
2514 self.assertEqual(ip.src, self.nat_addr)
2515 self.assertEqual(ip.dst, host.ip4)
2516 self.assertEqual(tcp.sport, server_out_port)
2517 self.assertEqual(tcp.dport, host_in_port)
2518 self.check_tcp_checksum(p)
2520 self.logger.error(ppp("Unexpected or invalid packet:"), p)
2523 def test_one_armed_nat44(self):
2524 """ One armed NAT44 """
2525 remote_host = self.pg9.remote_hosts[0]
2526 local_host = self.pg9.remote_hosts[1]
2529 self.nat44_add_address(self.nat_addr)
2530 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2531 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2535 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2536 IP(src=local_host.ip4, dst=remote_host.ip4) /
2537 TCP(sport=12345, dport=80))
2538 self.pg9.add_stream(p)
2539 self.pg_enable_capture(self.pg_interfaces)
2541 capture = self.pg9.get_capture(1)
2546 self.assertEqual(ip.src, self.nat_addr)
2547 self.assertEqual(ip.dst, remote_host.ip4)
2548 self.assertNotEqual(tcp.sport, 12345)
2549 external_port = tcp.sport
2550 self.assertEqual(tcp.dport, 80)
2551 self.check_tcp_checksum(p)
2552 self.check_ip_checksum(p)
2554 self.logger.error(ppp("Unexpected or invalid packet:", p))
2558 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2559 IP(src=remote_host.ip4, dst=self.nat_addr) /
2560 TCP(sport=80, dport=external_port))
2561 self.pg9.add_stream(p)
2562 self.pg_enable_capture(self.pg_interfaces)
2564 capture = self.pg9.get_capture(1)
2569 self.assertEqual(ip.src, remote_host.ip4)
2570 self.assertEqual(ip.dst, local_host.ip4)
2571 self.assertEqual(tcp.sport, 80)
2572 self.assertEqual(tcp.dport, 12345)
2573 self.check_tcp_checksum(p)
2574 self.check_ip_checksum(p)
2576 self.logger.error(ppp("Unexpected or invalid packet:", p))
2579 def test_del_session(self):
2580 """ Delete NAT44 session """
2581 self.nat44_add_address(self.nat_addr)
2582 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2583 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2586 pkts = self.create_stream_in(self.pg0, self.pg1)
2587 self.pg0.add_stream(pkts)
2588 self.pg_enable_capture(self.pg_interfaces)
2590 capture = self.pg1.get_capture(len(pkts))
2592 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2593 nsessions = len(sessions)
2595 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2596 sessions[0].inside_port,
2597 sessions[0].protocol)
2598 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2599 sessions[1].outside_port,
2600 sessions[1].protocol,
2603 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2604 self.assertEqual(nsessions - len(sessions), 2)
2606 def test_set_get_reass(self):
2607 """ NAT44 set/get virtual fragmentation reassembly """
2608 reas_cfg1 = self.vapi.nat_get_reass()
2610 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
2611 max_reass=reas_cfg1.ip4_max_reass * 2,
2612 max_frag=reas_cfg1.ip4_max_frag * 2)
2614 reas_cfg2 = self.vapi.nat_get_reass()
2616 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
2617 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
2618 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
2620 self.vapi.nat_set_reass(drop_frag=1)
2621 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
2623 def test_frag_in_order(self):
2624 """ NAT44 translate fragments arriving in order """
2625 self.nat44_add_address(self.nat_addr)
2626 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2627 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2630 data = "A" * 4 + "B" * 16 + "C" * 3
2631 self.tcp_port_in = random.randint(1025, 65535)
2633 reass = self.vapi.nat_reass_dump()
2634 reass_n_start = len(reass)
2637 pkts = self.create_stream_frag(self.pg0,
2638 self.pg1.remote_ip4,
2642 self.pg0.add_stream(pkts)
2643 self.pg_enable_capture(self.pg_interfaces)
2645 frags = self.pg1.get_capture(len(pkts))
2646 p = self.reass_frags_and_verify(frags,
2648 self.pg1.remote_ip4)
2649 self.assertEqual(p[TCP].dport, 20)
2650 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2651 self.tcp_port_out = p[TCP].sport
2652 self.assertEqual(data, p[Raw].load)
2655 pkts = self.create_stream_frag(self.pg1,
2660 self.pg1.add_stream(pkts)
2661 self.pg_enable_capture(self.pg_interfaces)
2663 frags = self.pg0.get_capture(len(pkts))
2664 p = self.reass_frags_and_verify(frags,
2665 self.pg1.remote_ip4,
2666 self.pg0.remote_ip4)
2667 self.assertEqual(p[TCP].sport, 20)
2668 self.assertEqual(p[TCP].dport, self.tcp_port_in)
2669 self.assertEqual(data, p[Raw].load)
2671 reass = self.vapi.nat_reass_dump()
2672 reass_n_end = len(reass)
2674 self.assertEqual(reass_n_end - reass_n_start, 2)
2676 def test_reass_hairpinning(self):
2677 """ NAT44 fragments hairpinning """
2678 host = self.pg0.remote_hosts[0]
2679 server = self.pg0.remote_hosts[1]
2680 host_in_port = random.randint(1025, 65535)
2682 server_in_port = random.randint(1025, 65535)
2683 server_out_port = random.randint(1025, 65535)
2684 data = "A" * 4 + "B" * 16 + "C" * 3
2686 self.nat44_add_address(self.nat_addr)
2687 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2688 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2690 # add static mapping for server
2691 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2692 server_in_port, server_out_port,
2693 proto=IP_PROTOS.tcp)
2695 # send packet from host to server
2696 pkts = self.create_stream_frag(self.pg0,
2701 self.pg0.add_stream(pkts)
2702 self.pg_enable_capture(self.pg_interfaces)
2704 frags = self.pg0.get_capture(len(pkts))
2705 p = self.reass_frags_and_verify(frags,
2708 self.assertNotEqual(p[TCP].sport, host_in_port)
2709 self.assertEqual(p[TCP].dport, server_in_port)
2710 self.assertEqual(data, p[Raw].load)
2712 def test_frag_out_of_order(self):
2713 """ NAT44 translate fragments arriving out of order """
2714 self.nat44_add_address(self.nat_addr)
2715 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2716 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2719 data = "A" * 4 + "B" * 16 + "C" * 3
2720 random.randint(1025, 65535)
2723 pkts = self.create_stream_frag(self.pg0,
2724 self.pg1.remote_ip4,
2729 self.pg0.add_stream(pkts)
2730 self.pg_enable_capture(self.pg_interfaces)
2732 frags = self.pg1.get_capture(len(pkts))
2733 p = self.reass_frags_and_verify(frags,
2735 self.pg1.remote_ip4)
2736 self.assertEqual(p[TCP].dport, 20)
2737 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2738 self.tcp_port_out = p[TCP].sport
2739 self.assertEqual(data, p[Raw].load)
2742 pkts = self.create_stream_frag(self.pg1,
2748 self.pg1.add_stream(pkts)
2749 self.pg_enable_capture(self.pg_interfaces)
2751 frags = self.pg0.get_capture(len(pkts))
2752 p = self.reass_frags_and_verify(frags,
2753 self.pg1.remote_ip4,
2754 self.pg0.remote_ip4)
2755 self.assertEqual(p[TCP].sport, 20)
2756 self.assertEqual(p[TCP].dport, self.tcp_port_in)
2757 self.assertEqual(data, p[Raw].load)
2760 super(TestNAT44, self).tearDown()
2761 if not self.vpp_dead:
2762 self.logger.info(self.vapi.cli("show nat44 verbose"))
2763 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
2767 class TestDeterministicNAT(MethodHolder):
2768 """ Deterministic NAT Test Cases """
2771 def setUpConstants(cls):
2772 super(TestDeterministicNAT, cls).setUpConstants()
2773 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
2776 def setUpClass(cls):
2777 super(TestDeterministicNAT, cls).setUpClass()
2780 cls.tcp_port_in = 6303
2781 cls.tcp_external_port = 6303
2782 cls.udp_port_in = 6304
2783 cls.udp_external_port = 6304
2784 cls.icmp_id_in = 6305
2785 cls.nat_addr = '10.0.0.3'
2787 cls.create_pg_interfaces(range(3))
2788 cls.interfaces = list(cls.pg_interfaces)
2790 for i in cls.interfaces:
2795 cls.pg0.generate_remote_hosts(2)
2796 cls.pg0.configure_ipv4_neighbors()
2799 super(TestDeterministicNAT, cls).tearDownClass()
2802 def create_stream_in(self, in_if, out_if, ttl=64):
2804 Create packet stream for inside network
2806 :param in_if: Inside interface
2807 :param out_if: Outside interface
2808 :param ttl: TTL of generated packets
2812 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2813 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2814 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
2818 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2819 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2820 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2824 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2825 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2826 ICMP(id=self.icmp_id_in, type='echo-request'))
2831 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2833 Create packet stream for outside network
2835 :param out_if: Outside interface
2836 :param dst_ip: Destination IP address (Default use global NAT address)
2837 :param ttl: TTL of generated packets
2840 dst_ip = self.nat_addr
2843 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2844 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2845 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2849 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2850 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2851 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2855 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2856 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2857 ICMP(id=self.icmp_external_id, type='echo-reply'))
2862 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2864 Verify captured packets on outside network
2866 :param capture: Captured packets
2867 :param nat_ip: Translated IP address (Default use global NAT address)
2868 :param same_port: Sorce port number is not translated (Default False)
2869 :param packet_num: Expected number of packets (Default 3)
2872 nat_ip = self.nat_addr
2873 self.assertEqual(packet_num, len(capture))
2874 for packet in capture:
2876 self.assertEqual(packet[IP].src, nat_ip)
2877 if packet.haslayer(TCP):
2878 self.tcp_port_out = packet[TCP].sport
2879 elif packet.haslayer(UDP):
2880 self.udp_port_out = packet[UDP].sport
2882 self.icmp_external_id = packet[ICMP].id
2884 self.logger.error(ppp("Unexpected or invalid packet "
2885 "(outside network):", packet))
2888 def initiate_tcp_session(self, in_if, out_if):
2890 Initiates TCP session
2892 :param in_if: Inside interface
2893 :param out_if: Outside interface
2896 # SYN packet in->out
2897 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2898 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2899 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2902 self.pg_enable_capture(self.pg_interfaces)
2904 capture = out_if.get_capture(1)
2906 self.tcp_port_out = p[TCP].sport
2908 # SYN + ACK packet out->in
2909 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2910 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
2911 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2913 out_if.add_stream(p)
2914 self.pg_enable_capture(self.pg_interfaces)
2916 in_if.get_capture(1)
2918 # ACK packet in->out
2919 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2920 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2921 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2924 self.pg_enable_capture(self.pg_interfaces)
2926 out_if.get_capture(1)
2929 self.logger.error("TCP 3 way handshake failed")
2932 def verify_ipfix_max_entries_per_user(self, data):
2934 Verify IPFIX maximum entries per user exceeded event
2936 :param data: Decoded IPFIX data records
2938 self.assertEqual(1, len(data))
2941 self.assertEqual(ord(record[230]), 13)
2942 # natQuotaExceededEvent
2943 self.assertEqual('\x03\x00\x00\x00', record[466])
2945 self.assertEqual(self.pg0.remote_ip4n, record[8])
2947 def test_deterministic_mode(self):
2948 """ NAT plugin run deterministic mode """
2949 in_addr = '172.16.255.0'
2950 out_addr = '172.17.255.50'
2951 in_addr_t = '172.16.255.20'
2952 in_addr_n = socket.inet_aton(in_addr)
2953 out_addr_n = socket.inet_aton(out_addr)
2954 in_addr_t_n = socket.inet_aton(in_addr_t)
2958 nat_config = self.vapi.nat_show_config()
2959 self.assertEqual(1, nat_config.deterministic)
2961 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
2963 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
2964 self.assertEqual(rep1.out_addr[:4], out_addr_n)
2965 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
2966 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2968 deterministic_mappings = self.vapi.nat_det_map_dump()
2969 self.assertEqual(len(deterministic_mappings), 1)
2970 dsm = deterministic_mappings[0]
2971 self.assertEqual(in_addr_n, dsm.in_addr[:4])
2972 self.assertEqual(in_plen, dsm.in_plen)
2973 self.assertEqual(out_addr_n, dsm.out_addr[:4])
2974 self.assertEqual(out_plen, dsm.out_plen)
2976 self.clear_nat_det()
2977 deterministic_mappings = self.vapi.nat_det_map_dump()
2978 self.assertEqual(len(deterministic_mappings), 0)
2980 def test_set_timeouts(self):
2981 """ Set deterministic NAT timeouts """
2982 timeouts_before = self.vapi.nat_det_get_timeouts()
2984 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
2985 timeouts_before.tcp_established + 10,
2986 timeouts_before.tcp_transitory + 10,
2987 timeouts_before.icmp + 10)
2989 timeouts_after = self.vapi.nat_det_get_timeouts()
2991 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
2992 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
2993 self.assertNotEqual(timeouts_before.tcp_established,
2994 timeouts_after.tcp_established)
2995 self.assertNotEqual(timeouts_before.tcp_transitory,
2996 timeouts_after.tcp_transitory)
2998 def test_det_in(self):
2999 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
3001 nat_ip = "10.0.0.10"
3003 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3005 socket.inet_aton(nat_ip),
3007 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3008 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3012 pkts = self.create_stream_in(self.pg0, self.pg1)
3013 self.pg0.add_stream(pkts)
3014 self.pg_enable_capture(self.pg_interfaces)
3016 capture = self.pg1.get_capture(len(pkts))
3017 self.verify_capture_out(capture, nat_ip)
3020 pkts = self.create_stream_out(self.pg1, nat_ip)
3021 self.pg1.add_stream(pkts)
3022 self.pg_enable_capture(self.pg_interfaces)
3024 capture = self.pg0.get_capture(len(pkts))
3025 self.verify_capture_in(capture, self.pg0)
3028 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
3029 self.assertEqual(len(sessions), 3)
3033 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3034 self.assertEqual(s.in_port, self.tcp_port_in)
3035 self.assertEqual(s.out_port, self.tcp_port_out)
3036 self.assertEqual(s.ext_port, self.tcp_external_port)
3040 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3041 self.assertEqual(s.in_port, self.udp_port_in)
3042 self.assertEqual(s.out_port, self.udp_port_out)
3043 self.assertEqual(s.ext_port, self.udp_external_port)
3047 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3048 self.assertEqual(s.in_port, self.icmp_id_in)
3049 self.assertEqual(s.out_port, self.icmp_external_id)
3051 def test_multiple_users(self):
3052 """ Deterministic NAT multiple users """
3054 nat_ip = "10.0.0.10"
3056 external_port = 6303
3058 host0 = self.pg0.remote_hosts[0]
3059 host1 = self.pg0.remote_hosts[1]
3061 self.vapi.nat_det_add_del_map(host0.ip4n,
3063 socket.inet_aton(nat_ip),
3065 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3066 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3070 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
3071 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
3072 TCP(sport=port_in, dport=external_port))
3073 self.pg0.add_stream(p)
3074 self.pg_enable_capture(self.pg_interfaces)
3076 capture = self.pg1.get_capture(1)
3081 self.assertEqual(ip.src, nat_ip)
3082 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3083 self.assertEqual(tcp.dport, external_port)
3084 port_out0 = tcp.sport
3086 self.logger.error(ppp("Unexpected or invalid packet:", p))
3090 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
3091 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
3092 TCP(sport=port_in, dport=external_port))
3093 self.pg0.add_stream(p)
3094 self.pg_enable_capture(self.pg_interfaces)
3096 capture = self.pg1.get_capture(1)
3101 self.assertEqual(ip.src, nat_ip)
3102 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3103 self.assertEqual(tcp.dport, external_port)
3104 port_out1 = tcp.sport
3106 self.logger.error(ppp("Unexpected or invalid packet:", p))
3109 dms = self.vapi.nat_det_map_dump()
3110 self.assertEqual(1, len(dms))
3111 self.assertEqual(2, dms[0].ses_num)
3114 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3115 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3116 TCP(sport=external_port, dport=port_out0))
3117 self.pg1.add_stream(p)
3118 self.pg_enable_capture(self.pg_interfaces)
3120 capture = self.pg0.get_capture(1)
3125 self.assertEqual(ip.src, self.pg1.remote_ip4)
3126 self.assertEqual(ip.dst, host0.ip4)
3127 self.assertEqual(tcp.dport, port_in)
3128 self.assertEqual(tcp.sport, external_port)
3130 self.logger.error(ppp("Unexpected or invalid packet:", p))
3134 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3135 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3136 TCP(sport=external_port, dport=port_out1))
3137 self.pg1.add_stream(p)
3138 self.pg_enable_capture(self.pg_interfaces)
3140 capture = self.pg0.get_capture(1)
3145 self.assertEqual(ip.src, self.pg1.remote_ip4)
3146 self.assertEqual(ip.dst, host1.ip4)
3147 self.assertEqual(tcp.dport, port_in)
3148 self.assertEqual(tcp.sport, external_port)
3150 self.logger.error(ppp("Unexpected or invalid packet", p))
3153 # session close api test
3154 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
3156 self.pg1.remote_ip4n,
3158 dms = self.vapi.nat_det_map_dump()
3159 self.assertEqual(dms[0].ses_num, 1)
3161 self.vapi.nat_det_close_session_in(host0.ip4n,
3163 self.pg1.remote_ip4n,
3165 dms = self.vapi.nat_det_map_dump()
3166 self.assertEqual(dms[0].ses_num, 0)
3168 def test_tcp_session_close_detection_in(self):
3169 """ Deterministic NAT TCP session close from inside network """
3170 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3172 socket.inet_aton(self.nat_addr),
3174 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3175 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3178 self.initiate_tcp_session(self.pg0, self.pg1)
3180 # close the session from inside
3182 # FIN packet in -> out
3183 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3184 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3185 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3187 self.pg0.add_stream(p)
3188 self.pg_enable_capture(self.pg_interfaces)
3190 self.pg1.get_capture(1)
3194 # ACK packet out -> in
3195 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3196 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3197 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3201 # FIN packet out -> in
3202 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3203 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3204 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3208 self.pg1.add_stream(pkts)
3209 self.pg_enable_capture(self.pg_interfaces)
3211 self.pg0.get_capture(2)
3213 # ACK packet in -> out
3214 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3215 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3216 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3218 self.pg0.add_stream(p)
3219 self.pg_enable_capture(self.pg_interfaces)
3221 self.pg1.get_capture(1)
3223 # Check if deterministic NAT44 closed the session
3224 dms = self.vapi.nat_det_map_dump()
3225 self.assertEqual(0, dms[0].ses_num)
3227 self.logger.error("TCP session termination failed")
3230 def test_tcp_session_close_detection_out(self):
3231 """ Deterministic NAT TCP session close from outside network """
3232 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3234 socket.inet_aton(self.nat_addr),
3236 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3237 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3240 self.initiate_tcp_session(self.pg0, self.pg1)
3242 # close the session from outside
3244 # FIN packet out -> in
3245 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3246 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3247 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3249 self.pg1.add_stream(p)
3250 self.pg_enable_capture(self.pg_interfaces)
3252 self.pg0.get_capture(1)
3256 # ACK packet in -> out
3257 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3258 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3259 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3263 # ACK packet in -> out
3264 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3265 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3266 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3270 self.pg0.add_stream(pkts)
3271 self.pg_enable_capture(self.pg_interfaces)
3273 self.pg1.get_capture(2)
3275 # ACK packet out -> in
3276 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3277 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3278 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3280 self.pg1.add_stream(p)
3281 self.pg_enable_capture(self.pg_interfaces)
3283 self.pg0.get_capture(1)
3285 # Check if deterministic NAT44 closed the session
3286 dms = self.vapi.nat_det_map_dump()
3287 self.assertEqual(0, dms[0].ses_num)
3289 self.logger.error("TCP session termination failed")
3292 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3293 def test_session_timeout(self):
3294 """ Deterministic NAT session timeouts """
3295 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3297 socket.inet_aton(self.nat_addr),
3299 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3300 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3303 self.initiate_tcp_session(self.pg0, self.pg1)
3304 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
3305 pkts = self.create_stream_in(self.pg0, self.pg1)
3306 self.pg0.add_stream(pkts)
3307 self.pg_enable_capture(self.pg_interfaces)
3309 capture = self.pg1.get_capture(len(pkts))
3312 dms = self.vapi.nat_det_map_dump()
3313 self.assertEqual(0, dms[0].ses_num)
3315 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3316 def test_session_limit_per_user(self):
3317 """ Deterministic NAT maximum sessions per user limit """
3318 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3320 socket.inet_aton(self.nat_addr),
3322 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3323 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3325 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
3326 src_address=self.pg2.local_ip4n,
3328 template_interval=10)
3329 self.vapi.nat_ipfix()
3332 for port in range(1025, 2025):
3333 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3334 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3335 UDP(sport=port, dport=port))
3338 self.pg0.add_stream(pkts)
3339 self.pg_enable_capture(self.pg_interfaces)
3341 capture = self.pg1.get_capture(len(pkts))
3343 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3344 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3345 UDP(sport=3001, dport=3002))
3346 self.pg0.add_stream(p)
3347 self.pg_enable_capture(self.pg_interfaces)
3349 capture = self.pg1.assert_nothing_captured()
3351 # verify ICMP error packet
3352 capture = self.pg0.get_capture(1)
3354 self.assertTrue(p.haslayer(ICMP))
3356 self.assertEqual(icmp.type, 3)
3357 self.assertEqual(icmp.code, 1)
3358 self.assertTrue(icmp.haslayer(IPerror))
3359 inner_ip = icmp[IPerror]
3360 self.assertEqual(inner_ip[UDPerror].sport, 3001)
3361 self.assertEqual(inner_ip[UDPerror].dport, 3002)
3363 dms = self.vapi.nat_det_map_dump()
3365 self.assertEqual(1000, dms[0].ses_num)
3367 # verify IPFIX logging
3368 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3370 capture = self.pg2.get_capture(2)
3371 ipfix = IPFIXDecoder()
3372 # first load template
3374 self.assertTrue(p.haslayer(IPFIX))
3375 if p.haslayer(Template):
3376 ipfix.add_template(p.getlayer(Template))
3377 # verify events in data set
3379 if p.haslayer(Data):
3380 data = ipfix.decode_data_set(p.getlayer(Set))
3381 self.verify_ipfix_max_entries_per_user(data)
3383 def clear_nat_det(self):
3385 Clear deterministic NAT configuration.
3387 self.vapi.nat_ipfix(enable=0)
3388 self.vapi.nat_det_set_timeouts()
3389 deterministic_mappings = self.vapi.nat_det_map_dump()
3390 for dsm in deterministic_mappings:
3391 self.vapi.nat_det_add_del_map(dsm.in_addr,
3397 interfaces = self.vapi.nat44_interface_dump()
3398 for intf in interfaces:
3399 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
3404 super(TestDeterministicNAT, self).tearDown()
3405 if not self.vpp_dead:
3406 self.logger.info(self.vapi.cli("show nat44 detail"))
3407 self.clear_nat_det()
3410 class TestNAT64(MethodHolder):
3411 """ NAT64 Test Cases """
3414 def setUpClass(cls):
3415 super(TestNAT64, cls).setUpClass()
3418 cls.tcp_port_in = 6303
3419 cls.tcp_port_out = 6303
3420 cls.udp_port_in = 6304
3421 cls.udp_port_out = 6304
3422 cls.icmp_id_in = 6305
3423 cls.icmp_id_out = 6305
3424 cls.nat_addr = '10.0.0.3'
3425 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3427 cls.vrf1_nat_addr = '10.0.10.3'
3428 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
3431 cls.create_pg_interfaces(range(5))
3432 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
3433 cls.ip6_interfaces.append(cls.pg_interfaces[2])
3434 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
3436 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
3438 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
3440 cls.pg0.generate_remote_hosts(2)
3442 for i in cls.ip6_interfaces:
3445 i.configure_ipv6_neighbors()
3447 for i in cls.ip4_interfaces:
3453 cls.pg3.config_ip4()
3454 cls.pg3.resolve_arp()
3455 cls.pg3.config_ip6()
3456 cls.pg3.configure_ipv6_neighbors()
3459 super(TestNAT64, cls).tearDownClass()
3462 def test_pool(self):
3463 """ Add/delete address to NAT64 pool """
3464 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
3466 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
3468 addresses = self.vapi.nat64_pool_addr_dump()
3469 self.assertEqual(len(addresses), 1)
3470 self.assertEqual(addresses[0].address, nat_addr)
3472 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
3474 addresses = self.vapi.nat64_pool_addr_dump()
3475 self.assertEqual(len(addresses), 0)
3477 def test_interface(self):
3478 """ Enable/disable NAT64 feature on the interface """
3479 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3480 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3482 interfaces = self.vapi.nat64_interface_dump()
3483 self.assertEqual(len(interfaces), 2)
3486 for intf in interfaces:
3487 if intf.sw_if_index == self.pg0.sw_if_index:
3488 self.assertEqual(intf.is_inside, 1)
3490 elif intf.sw_if_index == self.pg1.sw_if_index:
3491 self.assertEqual(intf.is_inside, 0)
3493 self.assertTrue(pg0_found)
3494 self.assertTrue(pg1_found)
3496 features = self.vapi.cli("show interface features pg0")
3497 self.assertNotEqual(features.find('nat64-in2out'), -1)
3498 features = self.vapi.cli("show interface features pg1")
3499 self.assertNotEqual(features.find('nat64-out2in'), -1)
3501 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
3502 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
3504 interfaces = self.vapi.nat64_interface_dump()
3505 self.assertEqual(len(interfaces), 0)
3507 def test_static_bib(self):
3508 """ Add/delete static BIB entry """
3509 in_addr = socket.inet_pton(socket.AF_INET6,
3510 '2001:db8:85a3::8a2e:370:7334')
3511 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
3514 proto = IP_PROTOS.tcp
3516 self.vapi.nat64_add_del_static_bib(in_addr,
3521 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3526 self.assertEqual(bibe.i_addr, in_addr)
3527 self.assertEqual(bibe.o_addr, out_addr)
3528 self.assertEqual(bibe.i_port, in_port)
3529 self.assertEqual(bibe.o_port, out_port)
3530 self.assertEqual(static_bib_num, 1)
3532 self.vapi.nat64_add_del_static_bib(in_addr,
3538 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3543 self.assertEqual(static_bib_num, 0)
3545 def test_set_timeouts(self):
3546 """ Set NAT64 timeouts """
3547 # verify default values
3548 timeouts = self.vapi.nat64_get_timeouts()
3549 self.assertEqual(timeouts.udp, 300)
3550 self.assertEqual(timeouts.icmp, 60)
3551 self.assertEqual(timeouts.tcp_trans, 240)
3552 self.assertEqual(timeouts.tcp_est, 7440)
3553 self.assertEqual(timeouts.tcp_incoming_syn, 6)
3555 # set and verify custom values
3556 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
3557 tcp_est=7450, tcp_incoming_syn=10)
3558 timeouts = self.vapi.nat64_get_timeouts()
3559 self.assertEqual(timeouts.udp, 200)
3560 self.assertEqual(timeouts.icmp, 30)
3561 self.assertEqual(timeouts.tcp_trans, 250)
3562 self.assertEqual(timeouts.tcp_est, 7450)
3563 self.assertEqual(timeouts.tcp_incoming_syn, 10)
3565 def test_dynamic(self):
3566 """ NAT64 dynamic translation test """
3567 self.tcp_port_in = 6303
3568 self.udp_port_in = 6304
3569 self.icmp_id_in = 6305
3571 ses_num_start = self.nat64_get_ses_num()
3573 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3575 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3576 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3579 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3580 self.pg0.add_stream(pkts)
3581 self.pg_enable_capture(self.pg_interfaces)
3583 capture = self.pg1.get_capture(len(pkts))
3584 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3585 dst_ip=self.pg1.remote_ip4)
3588 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3589 self.pg1.add_stream(pkts)
3590 self.pg_enable_capture(self.pg_interfaces)
3592 capture = self.pg0.get_capture(len(pkts))
3593 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3594 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3597 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3598 self.pg0.add_stream(pkts)
3599 self.pg_enable_capture(self.pg_interfaces)
3601 capture = self.pg1.get_capture(len(pkts))
3602 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3603 dst_ip=self.pg1.remote_ip4)
3606 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3607 self.pg1.add_stream(pkts)
3608 self.pg_enable_capture(self.pg_interfaces)
3610 capture = self.pg0.get_capture(len(pkts))
3611 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3613 ses_num_end = self.nat64_get_ses_num()
3615 self.assertEqual(ses_num_end - ses_num_start, 3)
3617 # tenant with specific VRF
3618 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3619 self.vrf1_nat_addr_n,
3620 vrf_id=self.vrf1_id)
3621 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3623 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
3624 self.pg2.add_stream(pkts)
3625 self.pg_enable_capture(self.pg_interfaces)
3627 capture = self.pg1.get_capture(len(pkts))
3628 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3629 dst_ip=self.pg1.remote_ip4)
3631 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3632 self.pg1.add_stream(pkts)
3633 self.pg_enable_capture(self.pg_interfaces)
3635 capture = self.pg2.get_capture(len(pkts))
3636 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
3638 def test_static(self):
3639 """ NAT64 static translation test """
3640 self.tcp_port_in = 60303
3641 self.udp_port_in = 60304
3642 self.icmp_id_in = 60305
3643 self.tcp_port_out = 60303
3644 self.udp_port_out = 60304
3645 self.icmp_id_out = 60305
3647 ses_num_start = self.nat64_get_ses_num()
3649 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3651 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3652 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3654 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3659 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3664 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3671 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3672 self.pg0.add_stream(pkts)
3673 self.pg_enable_capture(self.pg_interfaces)
3675 capture = self.pg1.get_capture(len(pkts))
3676 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3677 dst_ip=self.pg1.remote_ip4, same_port=True)
3680 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3681 self.pg1.add_stream(pkts)
3682 self.pg_enable_capture(self.pg_interfaces)
3684 capture = self.pg0.get_capture(len(pkts))
3685 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3686 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3688 ses_num_end = self.nat64_get_ses_num()
3690 self.assertEqual(ses_num_end - ses_num_start, 3)
3692 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3693 def test_session_timeout(self):
3694 """ NAT64 session timeout """
3695 self.icmp_id_in = 1234
3696 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3698 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3699 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3700 self.vapi.nat64_set_timeouts(icmp=5)
3702 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3703 self.pg0.add_stream(pkts)
3704 self.pg_enable_capture(self.pg_interfaces)
3706 capture = self.pg1.get_capture(len(pkts))
3708 ses_num_before_timeout = self.nat64_get_ses_num()
3712 # ICMP session after timeout
3713 ses_num_after_timeout = self.nat64_get_ses_num()
3714 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
3716 def test_icmp_error(self):
3717 """ NAT64 ICMP Error message translation """
3718 self.tcp_port_in = 6303
3719 self.udp_port_in = 6304
3720 self.icmp_id_in = 6305
3722 ses_num_start = self.nat64_get_ses_num()
3724 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3726 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3727 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3729 # send some packets to create sessions
3730 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3731 self.pg0.add_stream(pkts)
3732 self.pg_enable_capture(self.pg_interfaces)
3734 capture_ip4 = self.pg1.get_capture(len(pkts))
3735 self.verify_capture_out(capture_ip4,
3736 nat_ip=self.nat_addr,
3737 dst_ip=self.pg1.remote_ip4)
3739 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3740 self.pg1.add_stream(pkts)
3741 self.pg_enable_capture(self.pg_interfaces)
3743 capture_ip6 = self.pg0.get_capture(len(pkts))
3744 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3745 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
3746 self.pg0.remote_ip6)
3749 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3750 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
3751 ICMPv6DestUnreach(code=1) /
3752 packet[IPv6] for packet in capture_ip6]
3753 self.pg0.add_stream(pkts)
3754 self.pg_enable_capture(self.pg_interfaces)
3756 capture = self.pg1.get_capture(len(pkts))
3757 for packet in capture:
3759 self.assertEqual(packet[IP].src, self.nat_addr)
3760 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3761 self.assertEqual(packet[ICMP].type, 3)
3762 self.assertEqual(packet[ICMP].code, 13)
3763 inner = packet[IPerror]
3764 self.assertEqual(inner.src, self.pg1.remote_ip4)
3765 self.assertEqual(inner.dst, self.nat_addr)
3766 self.check_icmp_checksum(packet)
3767 if inner.haslayer(TCPerror):
3768 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
3769 elif inner.haslayer(UDPerror):
3770 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
3772 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
3774 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3778 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3779 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3780 ICMP(type=3, code=13) /
3781 packet[IP] for packet in capture_ip4]
3782 self.pg1.add_stream(pkts)
3783 self.pg_enable_capture(self.pg_interfaces)
3785 capture = self.pg0.get_capture(len(pkts))
3786 for packet in capture:
3788 self.assertEqual(packet[IPv6].src, ip.src)
3789 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3790 icmp = packet[ICMPv6DestUnreach]
3791 self.assertEqual(icmp.code, 1)
3792 inner = icmp[IPerror6]
3793 self.assertEqual(inner.src, self.pg0.remote_ip6)
3794 self.assertEqual(inner.dst, ip.src)
3795 self.check_icmpv6_checksum(packet)
3796 if inner.haslayer(TCPerror):
3797 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
3798 elif inner.haslayer(UDPerror):
3799 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
3801 self.assertEqual(inner[ICMPv6EchoRequest].id,
3804 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3807 def test_hairpinning(self):
3808 """ NAT64 hairpinning """
3810 client = self.pg0.remote_hosts[0]
3811 server = self.pg0.remote_hosts[1]
3812 server_tcp_in_port = 22
3813 server_tcp_out_port = 4022
3814 server_udp_in_port = 23
3815 server_udp_out_port = 4023
3816 client_tcp_in_port = 1234
3817 client_udp_in_port = 1235
3818 client_tcp_out_port = 0
3819 client_udp_out_port = 0
3820 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
3821 nat_addr_ip6 = ip.src
3823 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3825 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3826 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3828 self.vapi.nat64_add_del_static_bib(server.ip6n,
3831 server_tcp_out_port,
3833 self.vapi.nat64_add_del_static_bib(server.ip6n,
3836 server_udp_out_port,
3841 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3842 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3843 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3845 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3846 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3847 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3849 self.pg0.add_stream(pkts)
3850 self.pg_enable_capture(self.pg_interfaces)
3852 capture = self.pg0.get_capture(len(pkts))
3853 for packet in capture:
3855 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3856 self.assertEqual(packet[IPv6].dst, server.ip6)
3857 if packet.haslayer(TCP):
3858 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3859 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3860 self.check_tcp_checksum(packet)
3861 client_tcp_out_port = packet[TCP].sport
3863 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3864 self.assertEqual(packet[UDP].dport, server_udp_in_port)
3865 self.check_udp_checksum(packet)
3866 client_udp_out_port = packet[UDP].sport
3868 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3873 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3874 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3875 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3877 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3878 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3879 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3881 self.pg0.add_stream(pkts)
3882 self.pg_enable_capture(self.pg_interfaces)
3884 capture = self.pg0.get_capture(len(pkts))
3885 for packet in capture:
3887 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3888 self.assertEqual(packet[IPv6].dst, client.ip6)
3889 if packet.haslayer(TCP):
3890 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3891 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3892 self.check_tcp_checksum(packet)
3894 self.assertEqual(packet[UDP].sport, server_udp_out_port)
3895 self.assertEqual(packet[UDP].dport, client_udp_in_port)
3896 self.check_udp_checksum(packet)
3898 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3903 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3904 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3905 ICMPv6DestUnreach(code=1) /
3906 packet[IPv6] for packet in capture]
3907 self.pg0.add_stream(pkts)
3908 self.pg_enable_capture(self.pg_interfaces)
3910 capture = self.pg0.get_capture(len(pkts))
3911 for packet in capture:
3913 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3914 self.assertEqual(packet[IPv6].dst, server.ip6)
3915 icmp = packet[ICMPv6DestUnreach]
3916 self.assertEqual(icmp.code, 1)
3917 inner = icmp[IPerror6]
3918 self.assertEqual(inner.src, server.ip6)
3919 self.assertEqual(inner.dst, nat_addr_ip6)
3920 self.check_icmpv6_checksum(packet)
3921 if inner.haslayer(TCPerror):
3922 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3923 self.assertEqual(inner[TCPerror].dport,
3924 client_tcp_out_port)
3926 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3927 self.assertEqual(inner[UDPerror].dport,
3928 client_udp_out_port)
3930 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3933 def test_prefix(self):
3934 """ NAT64 Network-Specific Prefix """
3936 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3938 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3939 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3940 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3941 self.vrf1_nat_addr_n,
3942 vrf_id=self.vrf1_id)
3943 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3946 global_pref64 = "2001:db8::"
3947 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
3948 global_pref64_len = 32
3949 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
3951 prefix = self.vapi.nat64_prefix_dump()
3952 self.assertEqual(len(prefix), 1)
3953 self.assertEqual(prefix[0].prefix, global_pref64_n)
3954 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
3955 self.assertEqual(prefix[0].vrf_id, 0)
3957 # Add tenant specific prefix
3958 vrf1_pref64 = "2001:db8:122:300::"
3959 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
3960 vrf1_pref64_len = 56
3961 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
3963 vrf_id=self.vrf1_id)
3964 prefix = self.vapi.nat64_prefix_dump()
3965 self.assertEqual(len(prefix), 2)
3968 pkts = self.create_stream_in_ip6(self.pg0,
3971 plen=global_pref64_len)
3972 self.pg0.add_stream(pkts)
3973 self.pg_enable_capture(self.pg_interfaces)
3975 capture = self.pg1.get_capture(len(pkts))
3976 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3977 dst_ip=self.pg1.remote_ip4)
3979 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3980 self.pg1.add_stream(pkts)
3981 self.pg_enable_capture(self.pg_interfaces)
3983 capture = self.pg0.get_capture(len(pkts))
3984 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3987 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
3989 # Tenant specific prefix
3990 pkts = self.create_stream_in_ip6(self.pg2,
3993 plen=vrf1_pref64_len)
3994 self.pg2.add_stream(pkts)
3995 self.pg_enable_capture(self.pg_interfaces)
3997 capture = self.pg1.get_capture(len(pkts))
3998 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3999 dst_ip=self.pg1.remote_ip4)
4001 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4002 self.pg1.add_stream(pkts)
4003 self.pg_enable_capture(self.pg_interfaces)
4005 capture = self.pg2.get_capture(len(pkts))
4006 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4009 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
4011 def test_unknown_proto(self):
4012 """ NAT64 translate packet with unknown protocol """
4014 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4016 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4017 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4018 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4021 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4022 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
4023 TCP(sport=self.tcp_port_in, dport=20))
4024 self.pg0.add_stream(p)
4025 self.pg_enable_capture(self.pg_interfaces)
4027 p = self.pg1.get_capture(1)
4029 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4030 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
4032 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4033 TCP(sport=1234, dport=1234))
4034 self.pg0.add_stream(p)
4035 self.pg_enable_capture(self.pg_interfaces)
4037 p = self.pg1.get_capture(1)
4040 self.assertEqual(packet[IP].src, self.nat_addr)
4041 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4042 self.assertTrue(packet.haslayer(GRE))
4043 self.check_ip_checksum(packet)
4045 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4049 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4050 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4052 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4053 TCP(sport=1234, dport=1234))
4054 self.pg1.add_stream(p)
4055 self.pg_enable_capture(self.pg_interfaces)
4057 p = self.pg0.get_capture(1)
4060 self.assertEqual(packet[IPv6].src, remote_ip6)
4061 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4062 self.assertEqual(packet[IPv6].nh, 47)
4064 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4067 def test_hairpinning_unknown_proto(self):
4068 """ NAT64 translate packet with unknown protocol - hairpinning """
4070 client = self.pg0.remote_hosts[0]
4071 server = self.pg0.remote_hosts[1]
4072 server_tcp_in_port = 22
4073 server_tcp_out_port = 4022
4074 client_tcp_in_port = 1234
4075 client_tcp_out_port = 1235
4076 server_nat_ip = "10.0.0.100"
4077 client_nat_ip = "10.0.0.110"
4078 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
4079 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
4080 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
4081 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
4083 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
4085 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4086 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4088 self.vapi.nat64_add_del_static_bib(server.ip6n,
4091 server_tcp_out_port,
4094 self.vapi.nat64_add_del_static_bib(server.ip6n,
4100 self.vapi.nat64_add_del_static_bib(client.ip6n,
4103 client_tcp_out_port,
4107 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4108 IPv6(src=client.ip6, dst=server_nat_ip6) /
4109 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4110 self.pg0.add_stream(p)
4111 self.pg_enable_capture(self.pg_interfaces)
4113 p = self.pg0.get_capture(1)
4115 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4116 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
4118 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4119 TCP(sport=1234, dport=1234))
4120 self.pg0.add_stream(p)
4121 self.pg_enable_capture(self.pg_interfaces)
4123 p = self.pg0.get_capture(1)
4126 self.assertEqual(packet[IPv6].src, client_nat_ip6)
4127 self.assertEqual(packet[IPv6].dst, server.ip6)
4128 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4130 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4134 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4135 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
4137 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4138 TCP(sport=1234, dport=1234))
4139 self.pg0.add_stream(p)
4140 self.pg_enable_capture(self.pg_interfaces)
4142 p = self.pg0.get_capture(1)
4145 self.assertEqual(packet[IPv6].src, server_nat_ip6)
4146 self.assertEqual(packet[IPv6].dst, client.ip6)
4147 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4149 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4152 def test_one_armed_nat64(self):
4153 """ One armed NAT64 """
4155 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
4159 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4161 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
4162 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
4165 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
4166 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
4167 TCP(sport=12345, dport=80))
4168 self.pg3.add_stream(p)
4169 self.pg_enable_capture(self.pg_interfaces)
4171 capture = self.pg3.get_capture(1)
4176 self.assertEqual(ip.src, self.nat_addr)
4177 self.assertEqual(ip.dst, self.pg3.remote_ip4)
4178 self.assertNotEqual(tcp.sport, 12345)
4179 external_port = tcp.sport
4180 self.assertEqual(tcp.dport, 80)
4181 self.check_tcp_checksum(p)
4182 self.check_ip_checksum(p)
4184 self.logger.error(ppp("Unexpected or invalid packet:", p))
4188 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
4189 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
4190 TCP(sport=80, dport=external_port))
4191 self.pg3.add_stream(p)
4192 self.pg_enable_capture(self.pg_interfaces)
4194 capture = self.pg3.get_capture(1)
4199 self.assertEqual(ip.src, remote_host_ip6)
4200 self.assertEqual(ip.dst, self.pg3.remote_ip6)
4201 self.assertEqual(tcp.sport, 80)
4202 self.assertEqual(tcp.dport, 12345)
4203 self.check_tcp_checksum(p)
4205 self.logger.error(ppp("Unexpected or invalid packet:", p))
4208 def test_frag_in_order(self):
4209 """ NAT64 translate fragments arriving in order """
4210 self.tcp_port_in = random.randint(1025, 65535)
4212 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4214 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4215 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4217 reass = self.vapi.nat_reass_dump()
4218 reass_n_start = len(reass)
4222 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
4223 self.tcp_port_in, 20, data)
4224 self.pg0.add_stream(pkts)
4225 self.pg_enable_capture(self.pg_interfaces)
4227 frags = self.pg1.get_capture(len(pkts))
4228 p = self.reass_frags_and_verify(frags,
4230 self.pg1.remote_ip4)
4231 self.assertEqual(p[TCP].dport, 20)
4232 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
4233 self.tcp_port_out = p[TCP].sport
4234 self.assertEqual(data, p[Raw].load)
4237 data = "A" * 4 + "b" * 16 + "C" * 3
4238 pkts = self.create_stream_frag(self.pg1,
4243 self.pg1.add_stream(pkts)
4244 self.pg_enable_capture(self.pg_interfaces)
4246 frags = self.pg0.get_capture(len(pkts))
4247 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4248 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
4249 self.assertEqual(p[TCP].sport, 20)
4250 self.assertEqual(p[TCP].dport, self.tcp_port_in)
4251 self.assertEqual(data, p[Raw].load)
4253 reass = self.vapi.nat_reass_dump()
4254 reass_n_end = len(reass)
4256 self.assertEqual(reass_n_end - reass_n_start, 2)
4258 def test_reass_hairpinning(self):
4259 """ NAT64 fragments hairpinning """
4261 client = self.pg0.remote_hosts[0]
4262 server = self.pg0.remote_hosts[1]
4263 server_in_port = random.randint(1025, 65535)
4264 server_out_port = random.randint(1025, 65535)
4265 client_in_port = random.randint(1025, 65535)
4266 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4267 nat_addr_ip6 = ip.src
4269 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4271 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4272 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4274 # add static BIB entry for server
4275 self.vapi.nat64_add_del_static_bib(server.ip6n,
4281 # send packet from host to server
4282 pkts = self.create_stream_frag_ip6(self.pg0,
4287 self.pg0.add_stream(pkts)
4288 self.pg_enable_capture(self.pg_interfaces)
4290 frags = self.pg0.get_capture(len(pkts))
4291 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
4292 self.assertNotEqual(p[TCP].sport, client_in_port)
4293 self.assertEqual(p[TCP].dport, server_in_port)
4294 self.assertEqual(data, p[Raw].load)
4296 def test_frag_out_of_order(self):
4297 """ NAT64 translate fragments arriving out of order """
4298 self.tcp_port_in = random.randint(1025, 65535)
4300 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4302 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4303 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4307 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
4308 self.tcp_port_in, 20, data)
4310 self.pg0.add_stream(pkts)
4311 self.pg_enable_capture(self.pg_interfaces)
4313 frags = self.pg1.get_capture(len(pkts))
4314 p = self.reass_frags_and_verify(frags,
4316 self.pg1.remote_ip4)
4317 self.assertEqual(p[TCP].dport, 20)
4318 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
4319 self.tcp_port_out = p[TCP].sport
4320 self.assertEqual(data, p[Raw].load)
4323 data = "A" * 4 + "B" * 16 + "C" * 3
4324 pkts = self.create_stream_frag(self.pg1,
4330 self.pg1.add_stream(pkts)
4331 self.pg_enable_capture(self.pg_interfaces)
4333 frags = self.pg0.get_capture(len(pkts))
4334 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4335 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
4336 self.assertEqual(p[TCP].sport, 20)
4337 self.assertEqual(p[TCP].dport, self.tcp_port_in)
4338 self.assertEqual(data, p[Raw].load)
4340 def test_interface_addr(self):
4341 """ Acquire NAT64 pool addresses from interface """
4342 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
4344 # no address in NAT64 pool
4345 adresses = self.vapi.nat44_address_dump()
4346 self.assertEqual(0, len(adresses))
4348 # configure interface address and check NAT64 address pool
4349 self.pg4.config_ip4()
4350 addresses = self.vapi.nat64_pool_addr_dump()
4351 self.assertEqual(len(addresses), 1)
4352 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
4354 # remove interface address and check NAT64 address pool
4355 self.pg4.unconfig_ip4()
4356 addresses = self.vapi.nat64_pool_addr_dump()
4357 self.assertEqual(0, len(adresses))
4359 def nat64_get_ses_num(self):
4361 Return number of active NAT64 sessions.
4363 st = self.vapi.nat64_st_dump()
4366 def clear_nat64(self):
4368 Clear NAT64 configuration.
4370 self.vapi.nat64_set_timeouts()
4372 interfaces = self.vapi.nat64_interface_dump()
4373 for intf in interfaces:
4374 if intf.is_inside > 1:
4375 self.vapi.nat64_add_del_interface(intf.sw_if_index,
4378 self.vapi.nat64_add_del_interface(intf.sw_if_index,
4382 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4385 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4393 bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
4396 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4404 bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
4407 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4415 adresses = self.vapi.nat64_pool_addr_dump()
4416 for addr in adresses:
4417 self.vapi.nat64_add_del_pool_addr_range(addr.address,
4422 prefixes = self.vapi.nat64_prefix_dump()
4423 for prefix in prefixes:
4424 self.vapi.nat64_add_del_prefix(prefix.prefix,
4426 vrf_id=prefix.vrf_id,
4430 super(TestNAT64, self).tearDown()
4431 if not self.vpp_dead:
4432 self.logger.info(self.vapi.cli("show nat64 pool"))
4433 self.logger.info(self.vapi.cli("show nat64 interfaces"))
4434 self.logger.info(self.vapi.cli("show nat64 prefix"))
4435 self.logger.info(self.vapi.cli("show nat64 bib all"))
4436 self.logger.info(self.vapi.cli("show nat64 session table all"))
4437 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
4441 class TestDSlite(MethodHolder):
4442 """ DS-Lite Test Cases """
4445 def setUpClass(cls):
4446 super(TestDSlite, cls).setUpClass()
4449 cls.nat_addr = '10.0.0.3'
4450 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4452 cls.create_pg_interfaces(range(2))
4454 cls.pg0.config_ip4()
4455 cls.pg0.resolve_arp()
4457 cls.pg1.config_ip6()
4458 cls.pg1.generate_remote_hosts(2)
4459 cls.pg1.configure_ipv6_neighbors()
4462 super(TestDSlite, cls).tearDownClass()
4465 def test_dslite(self):
4466 """ Test DS-Lite """
4467 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
4469 aftr_ip4 = '192.0.0.1'
4470 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
4471 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
4472 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
4473 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
4476 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4477 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
4478 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4479 UDP(sport=20000, dport=10000))
4480 self.pg1.add_stream(p)
4481 self.pg_enable_capture(self.pg_interfaces)
4483 capture = self.pg0.get_capture(1)
4484 capture = capture[0]
4485 self.assertFalse(capture.haslayer(IPv6))
4486 self.assertEqual(capture[IP].src, self.nat_addr)
4487 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4488 self.assertNotEqual(capture[UDP].sport, 20000)
4489 self.assertEqual(capture[UDP].dport, 10000)
4490 self.check_ip_checksum(capture)
4491 out_port = capture[UDP].sport
4493 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4494 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4495 UDP(sport=10000, dport=out_port))
4496 self.pg0.add_stream(p)
4497 self.pg_enable_capture(self.pg_interfaces)
4499 capture = self.pg1.get_capture(1)
4500 capture = capture[0]
4501 self.assertEqual(capture[IPv6].src, aftr_ip6)
4502 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
4503 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4504 self.assertEqual(capture[IP].dst, '192.168.1.1')
4505 self.assertEqual(capture[UDP].sport, 10000)
4506 self.assertEqual(capture[UDP].dport, 20000)
4507 self.check_ip_checksum(capture)
4510 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4511 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
4512 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4513 TCP(sport=20001, dport=10001))
4514 self.pg1.add_stream(p)
4515 self.pg_enable_capture(self.pg_interfaces)
4517 capture = self.pg0.get_capture(1)
4518 capture = capture[0]
4519 self.assertFalse(capture.haslayer(IPv6))
4520 self.assertEqual(capture[IP].src, self.nat_addr)
4521 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4522 self.assertNotEqual(capture[TCP].sport, 20001)
4523 self.assertEqual(capture[TCP].dport, 10001)
4524 self.check_ip_checksum(capture)
4525 self.check_tcp_checksum(capture)
4526 out_port = capture[TCP].sport
4528 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4529 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4530 TCP(sport=10001, dport=out_port))
4531 self.pg0.add_stream(p)
4532 self.pg_enable_capture(self.pg_interfaces)
4534 capture = self.pg1.get_capture(1)
4535 capture = capture[0]
4536 self.assertEqual(capture[IPv6].src, aftr_ip6)
4537 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4538 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4539 self.assertEqual(capture[IP].dst, '192.168.1.1')
4540 self.assertEqual(capture[TCP].sport, 10001)
4541 self.assertEqual(capture[TCP].dport, 20001)
4542 self.check_ip_checksum(capture)
4543 self.check_tcp_checksum(capture)
4546 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4547 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
4548 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4549 ICMP(id=4000, type='echo-request'))
4550 self.pg1.add_stream(p)
4551 self.pg_enable_capture(self.pg_interfaces)
4553 capture = self.pg0.get_capture(1)
4554 capture = capture[0]
4555 self.assertFalse(capture.haslayer(IPv6))
4556 self.assertEqual(capture[IP].src, self.nat_addr)
4557 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4558 self.assertNotEqual(capture[ICMP].id, 4000)
4559 self.check_ip_checksum(capture)
4560 self.check_icmp_checksum(capture)
4561 out_id = capture[ICMP].id
4563 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4564 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4565 ICMP(id=out_id, type='echo-reply'))
4566 self.pg0.add_stream(p)
4567 self.pg_enable_capture(self.pg_interfaces)
4569 capture = self.pg1.get_capture(1)
4570 capture = capture[0]
4571 self.assertEqual(capture[IPv6].src, aftr_ip6)
4572 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4573 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4574 self.assertEqual(capture[IP].dst, '192.168.1.1')
4575 self.assertEqual(capture[ICMP].id, 4000)
4576 self.check_ip_checksum(capture)
4577 self.check_icmp_checksum(capture)
4580 super(TestDSlite, self).tearDown()
4581 if not self.vpp_dead:
4582 self.logger.info(self.vapi.cli("show dslite pool"))
4584 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
4585 self.logger.info(self.vapi.cli("show dslite sessions"))
4587 if __name__ == '__main__':
4588 unittest.main(testRunner=VppTestRunner)