7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.l2 import Ether, ARP
11 from scapy.data import IP_PROTOS
12 from scapy.packet import bind_layers
14 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
15 from time import sleep
18 class MethodHolder(VppTestCase):
19 """ SNAT create capture and verify method holder """
23 super(MethodHolder, cls).setUpClass()
26 super(MethodHolder, self).tearDown()
28 def create_stream_in(self, in_if, out_if, ttl=64):
30 Create packet stream for inside network
32 :param in_if: Inside interface
33 :param out_if: Outside interface
34 :param ttl: TTL of generated packets
38 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
39 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
40 TCP(sport=self.tcp_port_in))
44 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
45 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
46 UDP(sport=self.udp_port_in))
50 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
51 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
52 ICMP(id=self.icmp_id_in, type='echo-request'))
57 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
59 Create packet stream for outside network
61 :param out_if: Outside interface
62 :param dst_ip: Destination IP address (Default use global SNAT address)
63 :param ttl: TTL of generated packets
66 dst_ip = self.snat_addr
69 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
70 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
71 TCP(dport=self.tcp_port_out))
75 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
76 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
77 UDP(dport=self.udp_port_out))
81 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
82 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
83 ICMP(id=self.icmp_id_out, type='echo-reply'))
88 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
91 Verify captured packets on outside network
93 :param capture: Captured packets
94 :param nat_ip: Translated IP address (Default use global SNAT address)
95 :param same_port: Sorce port number is not translated (Default False)
96 :param packet_num: Expected number of packets (Default 3)
99 nat_ip = self.snat_addr
100 self.assertEqual(packet_num, len(capture))
101 for packet in capture:
103 self.assertEqual(packet[IP].src, nat_ip)
104 if packet.haslayer(TCP):
106 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
109 packet[TCP].sport, self.tcp_port_in)
110 self.tcp_port_out = packet[TCP].sport
111 elif packet.haslayer(UDP):
113 self.assertEqual(packet[UDP].sport, self.udp_port_in)
116 packet[UDP].sport, self.udp_port_in)
117 self.udp_port_out = packet[UDP].sport
120 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
122 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
123 self.icmp_id_out = packet[ICMP].id
125 self.logger.error(ppp("Unexpected or invalid packet "
126 "(outside network):", packet))
129 def verify_capture_in(self, capture, in_if, packet_num=3):
131 Verify captured packets on inside network
133 :param capture: Captured packets
134 :param in_if: Inside interface
135 :param packet_num: Expected number of packets (Default 3)
137 self.assertEqual(packet_num, len(capture))
138 for packet in capture:
140 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
141 if packet.haslayer(TCP):
142 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
143 elif packet.haslayer(UDP):
144 self.assertEqual(packet[UDP].dport, self.udp_port_in)
146 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
148 self.logger.error(ppp("Unexpected or invalid packet "
149 "(inside network):", packet))
152 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
154 Verify captured packet that don't have to be translated
156 :param capture: Captured packets
157 :param ingress_if: Ingress interface
158 :param egress_if: Egress interface
160 for packet in capture:
162 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
163 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
164 if packet.haslayer(TCP):
165 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
166 elif packet.haslayer(UDP):
167 self.assertEqual(packet[UDP].sport, self.udp_port_in)
169 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
171 self.logger.error(ppp("Unexpected or invalid packet "
172 "(inside network):", packet))
175 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
176 packet_num=3, icmp_type=11):
178 Verify captured packets with ICMP errors on outside network
180 :param capture: Captured packets
181 :param src_ip: Translated IP address or IP address of VPP
182 (Default use global SNAT address)
183 :param packet_num: Expected number of packets (Default 3)
184 :param icmp_type: Type of error ICMP packet
185 we are expecting (Default 11)
188 src_ip = self.snat_addr
189 self.assertEqual(packet_num, len(capture))
190 for packet in capture:
192 self.assertEqual(packet[IP].src, src_ip)
193 self.assertTrue(packet.haslayer(ICMP))
195 self.assertEqual(icmp.type, icmp_type)
196 self.assertTrue(icmp.haslayer(IPerror))
197 inner_ip = icmp[IPerror]
198 if inner_ip.haslayer(TCPerror):
199 self.assertEqual(inner_ip[TCPerror].dport,
201 elif inner_ip.haslayer(UDPerror):
202 self.assertEqual(inner_ip[UDPerror].dport,
205 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
207 self.logger.error(ppp("Unexpected or invalid packet "
208 "(outside network):", packet))
211 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
214 Verify captured packets with ICMP errors on inside network
216 :param capture: Captured packets
217 :param in_if: Inside interface
218 :param packet_num: Expected number of packets (Default 3)
219 :param icmp_type: Type of error ICMP packet
220 we are expecting (Default 11)
222 self.assertEqual(packet_num, len(capture))
223 for packet in capture:
225 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
226 self.assertTrue(packet.haslayer(ICMP))
228 self.assertEqual(icmp.type, icmp_type)
229 self.assertTrue(icmp.haslayer(IPerror))
230 inner_ip = icmp[IPerror]
231 if inner_ip.haslayer(TCPerror):
232 self.assertEqual(inner_ip[TCPerror].sport,
234 elif inner_ip.haslayer(UDPerror):
235 self.assertEqual(inner_ip[UDPerror].sport,
238 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
240 self.logger.error(ppp("Unexpected or invalid packet "
241 "(inside network):", packet))
244 def verify_ipfix_nat44_ses(self, data):
246 Verify IPFIX NAT44 session create/delete event
248 :param data: Decoded IPFIX data records
250 nat44_ses_create_num = 0
251 nat44_ses_delete_num = 0
252 self.assertEqual(6, len(data))
255 self.assertIn(ord(record[230]), [4, 5])
256 if ord(record[230]) == 4:
257 nat44_ses_create_num += 1
259 nat44_ses_delete_num += 1
261 self.assertEqual(self.pg0.remote_ip4n, record[8])
262 # postNATSourceIPv4Address
263 self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
266 self.assertEqual(struct.pack("!I", 0), record[234])
267 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
268 if IP_PROTOS.icmp == ord(record[4]):
269 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
270 self.assertEqual(struct.pack("!H", self.icmp_id_out),
272 elif IP_PROTOS.tcp == ord(record[4]):
273 self.assertEqual(struct.pack("!H", self.tcp_port_in),
275 self.assertEqual(struct.pack("!H", self.tcp_port_out),
277 elif IP_PROTOS.udp == ord(record[4]):
278 self.assertEqual(struct.pack("!H", self.udp_port_in),
280 self.assertEqual(struct.pack("!H", self.udp_port_out),
283 self.fail("Invalid protocol")
284 self.assertEqual(3, nat44_ses_create_num)
285 self.assertEqual(3, nat44_ses_delete_num)
287 def verify_ipfix_addr_exhausted(self, data):
289 Verify IPFIX NAT addresses event
291 :param data: Decoded IPFIX data records
293 self.assertEqual(1, len(data))
296 self.assertEqual(ord(record[230]), 3)
298 self.assertEqual(struct.pack("!I", 0), record[283])
301 class TestSNAT(MethodHolder):
302 """ SNAT Test Cases """
306 super(TestSNAT, cls).setUpClass()
309 cls.tcp_port_in = 6303
310 cls.tcp_port_out = 6303
311 cls.udp_port_in = 6304
312 cls.udp_port_out = 6304
313 cls.icmp_id_in = 6305
314 cls.icmp_id_out = 6305
315 cls.snat_addr = '10.0.0.3'
316 cls.ipfix_src_port = 4739
317 cls.ipfix_domain_id = 1
319 cls.create_pg_interfaces(range(9))
320 cls.interfaces = list(cls.pg_interfaces[0:4])
322 for i in cls.interfaces:
327 cls.pg0.generate_remote_hosts(3)
328 cls.pg0.configure_ipv4_neighbors()
330 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
332 cls.pg4._local_ip4 = "172.16.255.1"
333 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
334 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
335 cls.pg4.set_table_ip4(10)
336 cls.pg5._local_ip4 = "172.16.255.3"
337 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
338 cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
339 cls.pg5.set_table_ip4(10)
340 cls.pg6._local_ip4 = "172.16.255.1"
341 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
342 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
343 cls.pg6.set_table_ip4(20)
344 for i in cls.overlapping_interfaces:
353 super(TestSNAT, cls).tearDownClass()
356 def clear_snat(self):
358 Clear SNAT configuration.
360 # I found no elegant way to do this
361 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
362 dst_address_length=32,
363 next_hop_address=self.pg7.remote_ip4n,
364 next_hop_sw_if_index=self.pg7.sw_if_index,
366 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
367 dst_address_length=32,
368 next_hop_address=self.pg8.remote_ip4n,
369 next_hop_sw_if_index=self.pg8.sw_if_index,
372 for intf in [self.pg7, self.pg8]:
373 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
375 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
380 if self.pg7.has_ip4_config:
381 self.pg7.unconfig_ip4()
383 interfaces = self.vapi.snat_interface_addr_dump()
384 for intf in interfaces:
385 self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
387 self.vapi.snat_ipfix(enable=0, src_port=self.ipfix_src_port,
388 domain_id=self.ipfix_domain_id)
389 self.ipfix_src_port = 4739
390 self.ipfix_domain_id = 1
392 interfaces = self.vapi.snat_interface_dump()
393 for intf in interfaces:
394 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
398 static_mappings = self.vapi.snat_static_mapping_dump()
399 for sm in static_mappings:
400 self.vapi.snat_add_static_mapping(sm.local_ip_address,
401 sm.external_ip_address,
402 local_port=sm.local_port,
403 external_port=sm.external_port,
404 addr_only=sm.addr_only,
406 protocol=sm.protocol,
409 adresses = self.vapi.snat_address_dump()
410 for addr in adresses:
411 self.vapi.snat_add_address_range(addr.ip_address,
415 def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
416 local_port=0, external_port=0, vrf_id=0,
417 is_add=1, external_sw_if_index=0xFFFFFFFF,
420 Add/delete S-NAT static mapping
422 :param local_ip: Local IP address
423 :param external_ip: External IP address
424 :param local_port: Local port number (Optional)
425 :param external_port: External port number (Optional)
426 :param vrf_id: VRF ID (Default 0)
427 :param is_add: 1 if add, 0 if delete (Default add)
428 :param external_sw_if_index: External interface instead of IP address
429 :param proto: IP protocol (Mandatory if port specified)
432 if local_port and external_port:
434 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
435 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
436 self.vapi.snat_add_static_mapping(
439 external_sw_if_index,
447 def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
449 Add/delete S-NAT address
451 :param ip: IP address
452 :param is_add: 1 if add, 0 if delete (Default add)
454 snat_addr = socket.inet_pton(socket.AF_INET, ip)
455 self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
458 def test_dynamic(self):
459 """ SNAT dynamic translation test """
461 self.snat_add_address(self.snat_addr)
462 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
463 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
467 pkts = self.create_stream_in(self.pg0, self.pg1)
468 self.pg0.add_stream(pkts)
469 self.pg_enable_capture(self.pg_interfaces)
471 capture = self.pg1.get_capture(len(pkts))
472 self.verify_capture_out(capture)
475 pkts = self.create_stream_out(self.pg1)
476 self.pg1.add_stream(pkts)
477 self.pg_enable_capture(self.pg_interfaces)
479 capture = self.pg0.get_capture(len(pkts))
480 self.verify_capture_in(capture, self.pg0)
482 def test_dynamic_icmp_errors_in2out_ttl_1(self):
483 """ SNAT handling of client packets with TTL=1 """
485 self.snat_add_address(self.snat_addr)
486 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
487 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
490 # Client side - generate traffic
491 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
492 self.pg0.add_stream(pkts)
493 self.pg_enable_capture(self.pg_interfaces)
496 # Client side - verify ICMP type 11 packets
497 capture = self.pg0.get_capture(len(pkts))
498 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
500 def test_dynamic_icmp_errors_out2in_ttl_1(self):
501 """ SNAT handling of server packets with TTL=1 """
503 self.snat_add_address(self.snat_addr)
504 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
505 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
508 # Client side - create sessions
509 pkts = self.create_stream_in(self.pg0, self.pg1)
510 self.pg0.add_stream(pkts)
511 self.pg_enable_capture(self.pg_interfaces)
514 # Server side - generate traffic
515 capture = self.pg1.get_capture(len(pkts))
516 self.verify_capture_out(capture)
517 pkts = self.create_stream_out(self.pg1, ttl=1)
518 self.pg1.add_stream(pkts)
519 self.pg_enable_capture(self.pg_interfaces)
522 # Server side - verify ICMP type 11 packets
523 capture = self.pg1.get_capture(len(pkts))
524 self.verify_capture_out_with_icmp_errors(capture,
525 src_ip=self.pg1.local_ip4)
527 def test_dynamic_icmp_errors_in2out_ttl_2(self):
528 """ SNAT handling of error responses to client packets with TTL=2 """
530 self.snat_add_address(self.snat_addr)
531 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
532 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
535 # Client side - generate traffic
536 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
537 self.pg0.add_stream(pkts)
538 self.pg_enable_capture(self.pg_interfaces)
541 # Server side - simulate ICMP type 11 response
542 capture = self.pg1.get_capture(len(pkts))
543 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
544 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
545 ICMP(type=11) / packet[IP] for packet in capture]
546 self.pg1.add_stream(pkts)
547 self.pg_enable_capture(self.pg_interfaces)
550 # Client side - verify ICMP type 11 packets
551 capture = self.pg0.get_capture(len(pkts))
552 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
554 def test_dynamic_icmp_errors_out2in_ttl_2(self):
555 """ SNAT handling of error responses to server packets with TTL=2 """
557 self.snat_add_address(self.snat_addr)
558 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
559 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
562 # Client side - create sessions
563 pkts = self.create_stream_in(self.pg0, self.pg1)
564 self.pg0.add_stream(pkts)
565 self.pg_enable_capture(self.pg_interfaces)
568 # Server side - generate traffic
569 capture = self.pg1.get_capture(len(pkts))
570 self.verify_capture_out(capture)
571 pkts = self.create_stream_out(self.pg1, ttl=2)
572 self.pg1.add_stream(pkts)
573 self.pg_enable_capture(self.pg_interfaces)
576 # Client side - simulate ICMP type 11 response
577 capture = self.pg0.get_capture(len(pkts))
578 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
579 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
580 ICMP(type=11) / packet[IP] for packet in capture]
581 self.pg0.add_stream(pkts)
582 self.pg_enable_capture(self.pg_interfaces)
585 # Server side - verify ICMP type 11 packets
586 capture = self.pg1.get_capture(len(pkts))
587 self.verify_capture_out_with_icmp_errors(capture)
589 def test_ping_out_interface_from_outside(self):
590 """ Ping SNAT out interface from outside network """
592 self.snat_add_address(self.snat_addr)
593 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
594 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
597 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
598 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
599 ICMP(id=self.icmp_id_out, type='echo-request'))
601 self.pg1.add_stream(pkts)
602 self.pg_enable_capture(self.pg_interfaces)
604 capture = self.pg1.get_capture(len(pkts))
605 self.assertEqual(1, len(capture))
608 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
609 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
610 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
611 self.assertEqual(packet[ICMP].type, 0) # echo reply
613 self.logger.error(ppp("Unexpected or invalid packet "
614 "(outside network):", packet))
617 def test_ping_internal_host_from_outside(self):
618 """ Ping internal host from outside network """
620 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr)
621 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
622 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
626 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
627 IP(src=self.pg1.remote_ip4, dst=self.snat_addr, ttl=64) /
628 ICMP(id=self.icmp_id_out, type='echo-request'))
629 self.pg1.add_stream(pkt)
630 self.pg_enable_capture(self.pg_interfaces)
632 capture = self.pg0.get_capture(1)
633 self.verify_capture_in(capture, self.pg0, packet_num=1)
634 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
637 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
638 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
639 ICMP(id=self.icmp_id_in, type='echo-reply'))
640 self.pg0.add_stream(pkt)
641 self.pg_enable_capture(self.pg_interfaces)
643 capture = self.pg1.get_capture(1)
644 self.verify_capture_out(capture, same_port=True, packet_num=1)
645 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
647 def test_static_in(self):
648 """ SNAT 1:1 NAT initialized from inside network """
651 self.tcp_port_out = 6303
652 self.udp_port_out = 6304
653 self.icmp_id_out = 6305
655 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
656 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
657 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
661 pkts = self.create_stream_in(self.pg0, self.pg1)
662 self.pg0.add_stream(pkts)
663 self.pg_enable_capture(self.pg_interfaces)
665 capture = self.pg1.get_capture(len(pkts))
666 self.verify_capture_out(capture, nat_ip, True)
669 pkts = self.create_stream_out(self.pg1, nat_ip)
670 self.pg1.add_stream(pkts)
671 self.pg_enable_capture(self.pg_interfaces)
673 capture = self.pg0.get_capture(len(pkts))
674 self.verify_capture_in(capture, self.pg0)
676 def test_static_out(self):
677 """ SNAT 1:1 NAT initialized from outside network """
680 self.tcp_port_out = 6303
681 self.udp_port_out = 6304
682 self.icmp_id_out = 6305
684 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
685 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
686 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
690 pkts = self.create_stream_out(self.pg1, nat_ip)
691 self.pg1.add_stream(pkts)
692 self.pg_enable_capture(self.pg_interfaces)
694 capture = self.pg0.get_capture(len(pkts))
695 self.verify_capture_in(capture, self.pg0)
698 pkts = self.create_stream_in(self.pg0, self.pg1)
699 self.pg0.add_stream(pkts)
700 self.pg_enable_capture(self.pg_interfaces)
702 capture = self.pg1.get_capture(len(pkts))
703 self.verify_capture_out(capture, nat_ip, True)
705 def test_static_with_port_in(self):
706 """ SNAT 1:1 NAT with port initialized from inside network """
708 self.tcp_port_out = 3606
709 self.udp_port_out = 3607
710 self.icmp_id_out = 3608
712 self.snat_add_address(self.snat_addr)
713 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
714 self.tcp_port_in, self.tcp_port_out,
716 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
717 self.udp_port_in, self.udp_port_out,
719 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
720 self.icmp_id_in, self.icmp_id_out,
721 proto=IP_PROTOS.icmp)
722 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
723 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
727 pkts = self.create_stream_in(self.pg0, self.pg1)
728 self.pg0.add_stream(pkts)
729 self.pg_enable_capture(self.pg_interfaces)
731 capture = self.pg1.get_capture(len(pkts))
732 self.verify_capture_out(capture)
735 pkts = self.create_stream_out(self.pg1)
736 self.pg1.add_stream(pkts)
737 self.pg_enable_capture(self.pg_interfaces)
739 capture = self.pg0.get_capture(len(pkts))
740 self.verify_capture_in(capture, self.pg0)
742 def test_static_with_port_out(self):
743 """ SNAT 1:1 NAT with port initialized from outside network """
745 self.tcp_port_out = 30606
746 self.udp_port_out = 30607
747 self.icmp_id_out = 30608
749 self.snat_add_address(self.snat_addr)
750 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
751 self.tcp_port_in, self.tcp_port_out,
753 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
754 self.udp_port_in, self.udp_port_out,
756 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
757 self.icmp_id_in, self.icmp_id_out,
758 proto=IP_PROTOS.icmp)
759 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
760 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
764 pkts = self.create_stream_out(self.pg1)
765 self.pg1.add_stream(pkts)
766 self.pg_enable_capture(self.pg_interfaces)
768 capture = self.pg0.get_capture(len(pkts))
769 self.verify_capture_in(capture, self.pg0)
772 pkts = self.create_stream_in(self.pg0, self.pg1)
773 self.pg0.add_stream(pkts)
774 self.pg_enable_capture(self.pg_interfaces)
776 capture = self.pg1.get_capture(len(pkts))
777 self.verify_capture_out(capture)
779 def test_static_vrf_aware(self):
780 """ SNAT 1:1 NAT VRF awareness """
782 nat_ip1 = "10.0.0.30"
783 nat_ip2 = "10.0.0.40"
784 self.tcp_port_out = 6303
785 self.udp_port_out = 6304
786 self.icmp_id_out = 6305
788 self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
790 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
792 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
794 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
795 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
797 # inside interface VRF match SNAT static mapping VRF
798 pkts = self.create_stream_in(self.pg4, self.pg3)
799 self.pg4.add_stream(pkts)
800 self.pg_enable_capture(self.pg_interfaces)
802 capture = self.pg3.get_capture(len(pkts))
803 self.verify_capture_out(capture, nat_ip1, True)
805 # inside interface VRF don't match SNAT static mapping VRF (packets
807 pkts = self.create_stream_in(self.pg0, self.pg3)
808 self.pg0.add_stream(pkts)
809 self.pg_enable_capture(self.pg_interfaces)
811 self.pg3.assert_nothing_captured()
813 def test_multiple_inside_interfaces(self):
814 """ SNAT multiple inside interfaces (non-overlapping address space) """
816 self.snat_add_address(self.snat_addr)
817 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
818 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
819 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
822 # between two S-NAT inside interfaces (no translation)
823 pkts = self.create_stream_in(self.pg0, self.pg1)
824 self.pg0.add_stream(pkts)
825 self.pg_enable_capture(self.pg_interfaces)
827 capture = self.pg1.get_capture(len(pkts))
828 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
830 # from S-NAT inside to interface without S-NAT feature (no translation)
831 pkts = self.create_stream_in(self.pg0, self.pg2)
832 self.pg0.add_stream(pkts)
833 self.pg_enable_capture(self.pg_interfaces)
835 capture = self.pg2.get_capture(len(pkts))
836 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
838 # in2out 1st interface
839 pkts = self.create_stream_in(self.pg0, self.pg3)
840 self.pg0.add_stream(pkts)
841 self.pg_enable_capture(self.pg_interfaces)
843 capture = self.pg3.get_capture(len(pkts))
844 self.verify_capture_out(capture)
846 # out2in 1st interface
847 pkts = self.create_stream_out(self.pg3)
848 self.pg3.add_stream(pkts)
849 self.pg_enable_capture(self.pg_interfaces)
851 capture = self.pg0.get_capture(len(pkts))
852 self.verify_capture_in(capture, self.pg0)
854 # in2out 2nd interface
855 pkts = self.create_stream_in(self.pg1, self.pg3)
856 self.pg1.add_stream(pkts)
857 self.pg_enable_capture(self.pg_interfaces)
859 capture = self.pg3.get_capture(len(pkts))
860 self.verify_capture_out(capture)
862 # out2in 2nd interface
863 pkts = self.create_stream_out(self.pg3)
864 self.pg3.add_stream(pkts)
865 self.pg_enable_capture(self.pg_interfaces)
867 capture = self.pg1.get_capture(len(pkts))
868 self.verify_capture_in(capture, self.pg1)
870 def test_inside_overlapping_interfaces(self):
871 """ SNAT multiple inside interfaces with overlapping address space """
873 static_nat_ip = "10.0.0.10"
874 self.snat_add_address(self.snat_addr)
875 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
877 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
878 self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
879 self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
880 self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
883 # between S-NAT inside interfaces with same VRF (no translation)
884 pkts = self.create_stream_in(self.pg4, self.pg5)
885 self.pg4.add_stream(pkts)
886 self.pg_enable_capture(self.pg_interfaces)
888 capture = self.pg5.get_capture(len(pkts))
889 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
891 # between S-NAT inside interfaces with different VRF (hairpinning)
892 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
893 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
894 TCP(sport=1234, dport=5678))
895 self.pg4.add_stream(p)
896 self.pg_enable_capture(self.pg_interfaces)
898 capture = self.pg6.get_capture(1)
903 self.assertEqual(ip.src, self.snat_addr)
904 self.assertEqual(ip.dst, self.pg6.remote_ip4)
905 self.assertNotEqual(tcp.sport, 1234)
906 self.assertEqual(tcp.dport, 5678)
908 self.logger.error(ppp("Unexpected or invalid packet:", p))
911 # in2out 1st interface
912 pkts = self.create_stream_in(self.pg4, self.pg3)
913 self.pg4.add_stream(pkts)
914 self.pg_enable_capture(self.pg_interfaces)
916 capture = self.pg3.get_capture(len(pkts))
917 self.verify_capture_out(capture)
919 # out2in 1st interface
920 pkts = self.create_stream_out(self.pg3)
921 self.pg3.add_stream(pkts)
922 self.pg_enable_capture(self.pg_interfaces)
924 capture = self.pg4.get_capture(len(pkts))
925 self.verify_capture_in(capture, self.pg4)
927 # in2out 2nd interface
928 pkts = self.create_stream_in(self.pg5, self.pg3)
929 self.pg5.add_stream(pkts)
930 self.pg_enable_capture(self.pg_interfaces)
932 capture = self.pg3.get_capture(len(pkts))
933 self.verify_capture_out(capture)
935 # out2in 2nd interface
936 pkts = self.create_stream_out(self.pg3)
937 self.pg3.add_stream(pkts)
938 self.pg_enable_capture(self.pg_interfaces)
940 capture = self.pg5.get_capture(len(pkts))
941 self.verify_capture_in(capture, self.pg5)
944 addresses = self.vapi.snat_address_dump()
945 self.assertEqual(len(addresses), 1)
946 sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
947 self.assertEqual(len(sessions), 3)
948 for session in sessions:
949 self.assertFalse(session.is_static)
950 self.assertEqual(session.inside_ip_address[0:4],
951 self.pg5.remote_ip4n)
952 self.assertEqual(session.outside_ip_address,
953 addresses[0].ip_address)
954 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
955 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
956 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
957 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
958 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
959 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
960 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
961 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
962 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
964 # in2out 3rd interface
965 pkts = self.create_stream_in(self.pg6, self.pg3)
966 self.pg6.add_stream(pkts)
967 self.pg_enable_capture(self.pg_interfaces)
969 capture = self.pg3.get_capture(len(pkts))
970 self.verify_capture_out(capture, static_nat_ip, True)
972 # out2in 3rd interface
973 pkts = self.create_stream_out(self.pg3, static_nat_ip)
974 self.pg3.add_stream(pkts)
975 self.pg_enable_capture(self.pg_interfaces)
977 capture = self.pg6.get_capture(len(pkts))
978 self.verify_capture_in(capture, self.pg6)
980 # general user and session dump verifications
981 users = self.vapi.snat_user_dump()
982 self.assertTrue(len(users) >= 3)
983 addresses = self.vapi.snat_address_dump()
984 self.assertEqual(len(addresses), 1)
986 sessions = self.vapi.snat_user_session_dump(user.ip_address,
988 for session in sessions:
989 self.assertEqual(user.ip_address, session.inside_ip_address)
990 self.assertTrue(session.total_bytes > session.total_pkts > 0)
991 self.assertTrue(session.protocol in
992 [IP_PROTOS.tcp, IP_PROTOS.udp,
996 sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
997 self.assertTrue(len(sessions) >= 4)
998 for session in sessions:
999 self.assertFalse(session.is_static)
1000 self.assertEqual(session.inside_ip_address[0:4],
1001 self.pg4.remote_ip4n)
1002 self.assertEqual(session.outside_ip_address,
1003 addresses[0].ip_address)
1006 sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
1007 self.assertTrue(len(sessions) >= 3)
1008 for session in sessions:
1009 self.assertTrue(session.is_static)
1010 self.assertEqual(session.inside_ip_address[0:4],
1011 self.pg6.remote_ip4n)
1012 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1013 map(int, static_nat_ip.split('.')))
1014 self.assertTrue(session.inside_port in
1015 [self.tcp_port_in, self.udp_port_in,
1018 def test_hairpinning(self):
1019 """ SNAT hairpinning - 1:1 NAT with port"""
1021 host = self.pg0.remote_hosts[0]
1022 server = self.pg0.remote_hosts[1]
1025 server_in_port = 5678
1026 server_out_port = 8765
1028 self.snat_add_address(self.snat_addr)
1029 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1030 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1032 # add static mapping for server
1033 self.snat_add_static_mapping(server.ip4, self.snat_addr,
1034 server_in_port, server_out_port,
1035 proto=IP_PROTOS.tcp)
1037 # send packet from host to server
1038 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1039 IP(src=host.ip4, dst=self.snat_addr) /
1040 TCP(sport=host_in_port, dport=server_out_port))
1041 self.pg0.add_stream(p)
1042 self.pg_enable_capture(self.pg_interfaces)
1044 capture = self.pg0.get_capture(1)
1049 self.assertEqual(ip.src, self.snat_addr)
1050 self.assertEqual(ip.dst, server.ip4)
1051 self.assertNotEqual(tcp.sport, host_in_port)
1052 self.assertEqual(tcp.dport, server_in_port)
1053 host_out_port = tcp.sport
1055 self.logger.error(ppp("Unexpected or invalid packet:", p))
1058 # send reply from server to host
1059 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1060 IP(src=server.ip4, dst=self.snat_addr) /
1061 TCP(sport=server_in_port, dport=host_out_port))
1062 self.pg0.add_stream(p)
1063 self.pg_enable_capture(self.pg_interfaces)
1065 capture = self.pg0.get_capture(1)
1070 self.assertEqual(ip.src, self.snat_addr)
1071 self.assertEqual(ip.dst, host.ip4)
1072 self.assertEqual(tcp.sport, server_out_port)
1073 self.assertEqual(tcp.dport, host_in_port)
1075 self.logger.error(ppp("Unexpected or invalid packet:"), p)
1078 def test_hairpinning2(self):
1079 """ SNAT hairpinning - 1:1 NAT"""
1081 server1_nat_ip = "10.0.0.10"
1082 server2_nat_ip = "10.0.0.11"
1083 host = self.pg0.remote_hosts[0]
1084 server1 = self.pg0.remote_hosts[1]
1085 server2 = self.pg0.remote_hosts[2]
1086 server_tcp_port = 22
1087 server_udp_port = 20
1089 self.snat_add_address(self.snat_addr)
1090 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1091 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1094 # add static mapping for servers
1095 self.snat_add_static_mapping(server1.ip4, server1_nat_ip)
1096 self.snat_add_static_mapping(server2.ip4, server2_nat_ip)
1100 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1101 IP(src=host.ip4, dst=server1_nat_ip) /
1102 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1104 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1105 IP(src=host.ip4, dst=server1_nat_ip) /
1106 UDP(sport=self.udp_port_in, dport=server_udp_port))
1108 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1109 IP(src=host.ip4, dst=server1_nat_ip) /
1110 ICMP(id=self.icmp_id_in, type='echo-request'))
1112 self.pg0.add_stream(pkts)
1113 self.pg_enable_capture(self.pg_interfaces)
1115 capture = self.pg0.get_capture(len(pkts))
1116 for packet in capture:
1118 self.assertEqual(packet[IP].src, self.snat_addr)
1119 self.assertEqual(packet[IP].dst, server1.ip4)
1120 if packet.haslayer(TCP):
1121 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1122 self.assertEqual(packet[TCP].dport, server_tcp_port)
1123 self.tcp_port_out = packet[TCP].sport
1124 elif packet.haslayer(UDP):
1125 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1126 self.assertEqual(packet[UDP].dport, server_udp_port)
1127 self.udp_port_out = packet[UDP].sport
1129 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1130 self.icmp_id_out = packet[ICMP].id
1132 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1137 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1138 IP(src=server1.ip4, dst=self.snat_addr) /
1139 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1141 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1142 IP(src=server1.ip4, dst=self.snat_addr) /
1143 UDP(sport=server_udp_port, dport=self.udp_port_out))
1145 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1146 IP(src=server1.ip4, dst=self.snat_addr) /
1147 ICMP(id=self.icmp_id_out, type='echo-reply'))
1149 self.pg0.add_stream(pkts)
1150 self.pg_enable_capture(self.pg_interfaces)
1152 capture = self.pg0.get_capture(len(pkts))
1153 for packet in capture:
1155 self.assertEqual(packet[IP].src, server1_nat_ip)
1156 self.assertEqual(packet[IP].dst, host.ip4)
1157 if packet.haslayer(TCP):
1158 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1159 self.assertEqual(packet[TCP].sport, server_tcp_port)
1160 elif packet.haslayer(UDP):
1161 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1162 self.assertEqual(packet[UDP].sport, server_udp_port)
1164 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1166 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1169 # server2 to server1
1171 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1172 IP(src=server2.ip4, dst=server1_nat_ip) /
1173 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1175 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1176 IP(src=server2.ip4, dst=server1_nat_ip) /
1177 UDP(sport=self.udp_port_in, dport=server_udp_port))
1179 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1180 IP(src=server2.ip4, dst=server1_nat_ip) /
1181 ICMP(id=self.icmp_id_in, type='echo-request'))
1183 self.pg0.add_stream(pkts)
1184 self.pg_enable_capture(self.pg_interfaces)
1186 capture = self.pg0.get_capture(len(pkts))
1187 for packet in capture:
1189 self.assertEqual(packet[IP].src, server2_nat_ip)
1190 self.assertEqual(packet[IP].dst, server1.ip4)
1191 if packet.haslayer(TCP):
1192 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1193 self.assertEqual(packet[TCP].dport, server_tcp_port)
1194 self.tcp_port_out = packet[TCP].sport
1195 elif packet.haslayer(UDP):
1196 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1197 self.assertEqual(packet[UDP].dport, server_udp_port)
1198 self.udp_port_out = packet[UDP].sport
1200 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1201 self.icmp_id_out = packet[ICMP].id
1203 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1206 # server1 to server2
1208 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1209 IP(src=server1.ip4, dst=server2_nat_ip) /
1210 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1212 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1213 IP(src=server1.ip4, dst=server2_nat_ip) /
1214 UDP(sport=server_udp_port, dport=self.udp_port_out))
1216 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1217 IP(src=server1.ip4, dst=server2_nat_ip) /
1218 ICMP(id=self.icmp_id_out, type='echo-reply'))
1220 self.pg0.add_stream(pkts)
1221 self.pg_enable_capture(self.pg_interfaces)
1223 capture = self.pg0.get_capture(len(pkts))
1224 for packet in capture:
1226 self.assertEqual(packet[IP].src, server1_nat_ip)
1227 self.assertEqual(packet[IP].dst, server2.ip4)
1228 if packet.haslayer(TCP):
1229 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1230 self.assertEqual(packet[TCP].sport, server_tcp_port)
1231 elif packet.haslayer(UDP):
1232 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1233 self.assertEqual(packet[UDP].sport, server_udp_port)
1235 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1237 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1240 def test_max_translations_per_user(self):
1241 """ MAX translations per user - recycle the least recently used """
1243 self.snat_add_address(self.snat_addr)
1244 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1245 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1248 # get maximum number of translations per user
1249 snat_config = self.vapi.snat_show_config()
1251 # send more than maximum number of translations per user packets
1252 pkts_num = snat_config.max_translations_per_user + 5
1254 for port in range(0, pkts_num):
1255 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1256 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1257 TCP(sport=1025 + port))
1259 self.pg0.add_stream(pkts)
1260 self.pg_enable_capture(self.pg_interfaces)
1263 # verify number of translated packet
1264 self.pg1.get_capture(pkts_num)
1266 def test_interface_addr(self):
1267 """ Acquire SNAT addresses from interface """
1268 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1270 # no address in NAT pool
1271 adresses = self.vapi.snat_address_dump()
1272 self.assertEqual(0, len(adresses))
1274 # configure interface address and check NAT address pool
1275 self.pg7.config_ip4()
1276 adresses = self.vapi.snat_address_dump()
1277 self.assertEqual(1, len(adresses))
1278 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1280 # remove interface address and check NAT address pool
1281 self.pg7.unconfig_ip4()
1282 adresses = self.vapi.snat_address_dump()
1283 self.assertEqual(0, len(adresses))
1285 def test_interface_addr_static_mapping(self):
1286 """ Static mapping with addresses from interface """
1287 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1288 self.snat_add_static_mapping('1.2.3.4',
1289 external_sw_if_index=self.pg7.sw_if_index)
1291 # static mappings with external interface
1292 static_mappings = self.vapi.snat_static_mapping_dump()
1293 self.assertEqual(1, len(static_mappings))
1294 self.assertEqual(self.pg7.sw_if_index,
1295 static_mappings[0].external_sw_if_index)
1297 # configure interface address and check static mappings
1298 self.pg7.config_ip4()
1299 static_mappings = self.vapi.snat_static_mapping_dump()
1300 self.assertEqual(1, len(static_mappings))
1301 self.assertEqual(static_mappings[0].external_ip_address[0:4],
1302 self.pg7.local_ip4n)
1303 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1305 # remove interface address and check static mappings
1306 self.pg7.unconfig_ip4()
1307 static_mappings = self.vapi.snat_static_mapping_dump()
1308 self.assertEqual(0, len(static_mappings))
1310 def test_ipfix_nat44_sess(self):
1311 """ S-NAT IPFIX logging NAT44 session created/delted """
1312 self.ipfix_domain_id = 10
1313 self.ipfix_src_port = 20202
1314 colector_port = 30303
1315 bind_layers(UDP, IPFIX, dport=30303)
1316 self.snat_add_address(self.snat_addr)
1317 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1318 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1320 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1321 src_address=self.pg3.local_ip4n,
1323 template_interval=10,
1324 collector_port=colector_port)
1325 self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1326 src_port=self.ipfix_src_port)
1328 pkts = self.create_stream_in(self.pg0, self.pg1)
1329 self.pg0.add_stream(pkts)
1330 self.pg_enable_capture(self.pg_interfaces)
1332 capture = self.pg1.get_capture(len(pkts))
1333 self.verify_capture_out(capture)
1334 self.snat_add_address(self.snat_addr, is_add=0)
1335 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1336 capture = self.pg3.get_capture(3)
1337 ipfix = IPFIXDecoder()
1338 # first load template
1340 self.assertTrue(p.haslayer(IPFIX))
1341 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1342 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1343 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1344 self.assertEqual(p[UDP].dport, colector_port)
1345 self.assertEqual(p[IPFIX].observationDomainID,
1346 self.ipfix_domain_id)
1347 if p.haslayer(Template):
1348 ipfix.add_template(p.getlayer(Template))
1349 # verify events in data set
1351 if p.haslayer(Data):
1352 data = ipfix.decode_data_set(p.getlayer(Set))
1353 self.verify_ipfix_nat44_ses(data)
1355 def test_ipfix_addr_exhausted(self):
1356 """ S-NAT IPFIX logging NAT addresses exhausted """
1357 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1358 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1360 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1361 src_address=self.pg3.local_ip4n,
1363 template_interval=10)
1364 self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1365 src_port=self.ipfix_src_port)
1367 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1368 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1370 self.pg0.add_stream(p)
1371 self.pg_enable_capture(self.pg_interfaces)
1373 capture = self.pg1.get_capture(0)
1374 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1375 capture = self.pg3.get_capture(3)
1376 ipfix = IPFIXDecoder()
1377 # first load template
1379 self.assertTrue(p.haslayer(IPFIX))
1380 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1381 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1382 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1383 self.assertEqual(p[UDP].dport, 4739)
1384 self.assertEqual(p[IPFIX].observationDomainID,
1385 self.ipfix_domain_id)
1386 if p.haslayer(Template):
1387 ipfix.add_template(p.getlayer(Template))
1388 # verify events in data set
1390 if p.haslayer(Data):
1391 data = ipfix.decode_data_set(p.getlayer(Set))
1392 self.verify_ipfix_addr_exhausted(data)
1394 def test_pool_addr_fib(self):
1395 """ S-NAT add pool addresses to FIB """
1396 static_addr = '10.0.0.10'
1397 self.snat_add_address(self.snat_addr)
1398 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1399 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1401 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
1404 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1405 ARP(op=ARP.who_has, pdst=self.snat_addr,
1406 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1407 self.pg1.add_stream(p)
1408 self.pg_enable_capture(self.pg_interfaces)
1410 capture = self.pg1.get_capture(1)
1411 self.assertTrue(capture[0].haslayer(ARP))
1412 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1415 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1416 ARP(op=ARP.who_has, pdst=static_addr,
1417 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1418 self.pg1.add_stream(p)
1419 self.pg_enable_capture(self.pg_interfaces)
1421 capture = self.pg1.get_capture(1)
1422 self.assertTrue(capture[0].haslayer(ARP))
1423 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1425 # send ARP to non-SNAT interface
1426 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1427 ARP(op=ARP.who_has, pdst=self.snat_addr,
1428 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1429 self.pg2.add_stream(p)
1430 self.pg_enable_capture(self.pg_interfaces)
1432 capture = self.pg1.get_capture(0)
1434 # remove addresses and verify
1435 self.snat_add_address(self.snat_addr, is_add=0)
1436 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
1439 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1440 ARP(op=ARP.who_has, pdst=self.snat_addr,
1441 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1442 self.pg1.add_stream(p)
1443 self.pg_enable_capture(self.pg_interfaces)
1445 capture = self.pg1.get_capture(0)
1447 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1448 ARP(op=ARP.who_has, pdst=static_addr,
1449 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1450 self.pg1.add_stream(p)
1451 self.pg_enable_capture(self.pg_interfaces)
1453 capture = self.pg1.get_capture(0)
1455 def test_vrf_mode(self):
1456 """ S-NAT tenant VRF aware address pool mode """
1460 nat_ip1 = "10.0.0.10"
1461 nat_ip2 = "10.0.0.11"
1463 self.pg0.unconfig_ip4()
1464 self.pg1.unconfig_ip4()
1465 self.pg0.set_table_ip4(vrf_id1)
1466 self.pg1.set_table_ip4(vrf_id2)
1467 self.pg0.config_ip4()
1468 self.pg1.config_ip4()
1470 self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
1471 self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
1472 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1473 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1474 self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1478 pkts = self.create_stream_in(self.pg0, self.pg2)
1479 self.pg0.add_stream(pkts)
1480 self.pg_enable_capture(self.pg_interfaces)
1482 capture = self.pg2.get_capture(len(pkts))
1483 self.verify_capture_out(capture, nat_ip1)
1486 pkts = self.create_stream_in(self.pg1, self.pg2)
1487 self.pg1.add_stream(pkts)
1488 self.pg_enable_capture(self.pg_interfaces)
1490 capture = self.pg2.get_capture(len(pkts))
1491 self.verify_capture_out(capture, nat_ip2)
1493 def test_vrf_feature_independent(self):
1494 """ S-NAT tenant VRF independent address pool mode """
1496 nat_ip1 = "10.0.0.10"
1497 nat_ip2 = "10.0.0.11"
1499 self.snat_add_address(nat_ip1)
1500 self.snat_add_address(nat_ip2)
1501 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1502 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1503 self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1507 pkts = self.create_stream_in(self.pg0, self.pg2)
1508 self.pg0.add_stream(pkts)
1509 self.pg_enable_capture(self.pg_interfaces)
1511 capture = self.pg2.get_capture(len(pkts))
1512 self.verify_capture_out(capture, nat_ip1)
1515 pkts = self.create_stream_in(self.pg1, self.pg2)
1516 self.pg1.add_stream(pkts)
1517 self.pg_enable_capture(self.pg_interfaces)
1519 capture = self.pg2.get_capture(len(pkts))
1520 self.verify_capture_out(capture, nat_ip1)
1522 def test_dynamic_ipless_interfaces(self):
1523 """ SNAT interfaces without configured ip dynamic map """
1525 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1526 self.pg7.remote_mac,
1527 self.pg7.remote_ip4n,
1529 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1530 self.pg8.remote_mac,
1531 self.pg8.remote_ip4n,
1534 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1535 dst_address_length=32,
1536 next_hop_address=self.pg7.remote_ip4n,
1537 next_hop_sw_if_index=self.pg7.sw_if_index)
1538 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1539 dst_address_length=32,
1540 next_hop_address=self.pg8.remote_ip4n,
1541 next_hop_sw_if_index=self.pg8.sw_if_index)
1543 self.snat_add_address(self.snat_addr)
1544 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1545 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1549 pkts = self.create_stream_in(self.pg7, self.pg8)
1550 self.pg7.add_stream(pkts)
1551 self.pg_enable_capture(self.pg_interfaces)
1553 capture = self.pg8.get_capture(len(pkts))
1554 self.verify_capture_out(capture)
1557 pkts = self.create_stream_out(self.pg8, self.snat_addr)
1558 self.pg8.add_stream(pkts)
1559 self.pg_enable_capture(self.pg_interfaces)
1561 capture = self.pg7.get_capture(len(pkts))
1562 self.verify_capture_in(capture, self.pg7)
1564 def test_static_ipless_interfaces(self):
1565 """ SNAT 1:1 NAT interfaces without configured ip """
1567 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1568 self.pg7.remote_mac,
1569 self.pg7.remote_ip4n,
1571 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1572 self.pg8.remote_mac,
1573 self.pg8.remote_ip4n,
1576 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1577 dst_address_length=32,
1578 next_hop_address=self.pg7.remote_ip4n,
1579 next_hop_sw_if_index=self.pg7.sw_if_index)
1580 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1581 dst_address_length=32,
1582 next_hop_address=self.pg8.remote_ip4n,
1583 next_hop_sw_if_index=self.pg8.sw_if_index)
1585 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr)
1586 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1587 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1591 pkts = self.create_stream_out(self.pg8)
1592 self.pg8.add_stream(pkts)
1593 self.pg_enable_capture(self.pg_interfaces)
1595 capture = self.pg7.get_capture(len(pkts))
1596 self.verify_capture_in(capture, self.pg7)
1599 pkts = self.create_stream_in(self.pg7, self.pg8)
1600 self.pg7.add_stream(pkts)
1601 self.pg_enable_capture(self.pg_interfaces)
1603 capture = self.pg8.get_capture(len(pkts))
1604 self.verify_capture_out(capture, self.snat_addr, True)
1606 def test_static_with_port_ipless_interfaces(self):
1607 """ SNAT 1:1 NAT with port interfaces without configured ip """
1609 self.tcp_port_out = 30606
1610 self.udp_port_out = 30607
1611 self.icmp_id_out = 30608
1613 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1614 self.pg7.remote_mac,
1615 self.pg7.remote_ip4n,
1617 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1618 self.pg8.remote_mac,
1619 self.pg8.remote_ip4n,
1622 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1623 dst_address_length=32,
1624 next_hop_address=self.pg7.remote_ip4n,
1625 next_hop_sw_if_index=self.pg7.sw_if_index)
1626 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1627 dst_address_length=32,
1628 next_hop_address=self.pg8.remote_ip4n,
1629 next_hop_sw_if_index=self.pg8.sw_if_index)
1631 self.snat_add_address(self.snat_addr)
1632 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1633 self.tcp_port_in, self.tcp_port_out,
1634 proto=IP_PROTOS.tcp)
1635 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1636 self.udp_port_in, self.udp_port_out,
1637 proto=IP_PROTOS.udp)
1638 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1639 self.icmp_id_in, self.icmp_id_out,
1640 proto=IP_PROTOS.icmp)
1641 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1642 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1646 pkts = self.create_stream_out(self.pg8)
1647 self.pg8.add_stream(pkts)
1648 self.pg_enable_capture(self.pg_interfaces)
1650 capture = self.pg7.get_capture(len(pkts))
1651 self.verify_capture_in(capture, self.pg7)
1654 pkts = self.create_stream_in(self.pg7, self.pg8)
1655 self.pg7.add_stream(pkts)
1656 self.pg_enable_capture(self.pg_interfaces)
1658 capture = self.pg8.get_capture(len(pkts))
1659 self.verify_capture_out(capture)
1662 super(TestSNAT, self).tearDown()
1663 if not self.vpp_dead:
1664 self.logger.info(self.vapi.cli("show snat verbose"))
1668 class TestDeterministicNAT(MethodHolder):
1669 """ Deterministic NAT Test Cases """
1672 def setUpConstants(cls):
1673 super(TestDeterministicNAT, cls).setUpConstants()
1674 cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
1677 def setUpClass(cls):
1678 super(TestDeterministicNAT, cls).setUpClass()
1681 cls.tcp_port_in = 6303
1682 cls.tcp_external_port = 6303
1683 cls.udp_port_in = 6304
1684 cls.udp_external_port = 6304
1685 cls.icmp_id_in = 6305
1686 cls.snat_addr = '10.0.0.3'
1688 cls.create_pg_interfaces(range(3))
1689 cls.interfaces = list(cls.pg_interfaces)
1691 for i in cls.interfaces:
1696 cls.pg0.generate_remote_hosts(2)
1697 cls.pg0.configure_ipv4_neighbors()
1700 super(TestDeterministicNAT, cls).tearDownClass()
1703 def create_stream_in(self, in_if, out_if, ttl=64):
1705 Create packet stream for inside network
1707 :param in_if: Inside interface
1708 :param out_if: Outside interface
1709 :param ttl: TTL of generated packets
1713 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1714 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1715 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
1719 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1720 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1721 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
1725 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1726 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1727 ICMP(id=self.icmp_id_in, type='echo-request'))
1732 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
1734 Create packet stream for outside network
1736 :param out_if: Outside interface
1737 :param dst_ip: Destination IP address (Default use global SNAT address)
1738 :param ttl: TTL of generated packets
1741 dst_ip = self.snat_addr
1744 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1745 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1746 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
1750 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1751 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1752 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
1756 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1757 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1758 ICMP(id=self.icmp_external_id, type='echo-reply'))
1763 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
1765 Verify captured packets on outside network
1767 :param capture: Captured packets
1768 :param nat_ip: Translated IP address (Default use global SNAT address)
1769 :param same_port: Sorce port number is not translated (Default False)
1770 :param packet_num: Expected number of packets (Default 3)
1773 nat_ip = self.snat_addr
1774 self.assertEqual(packet_num, len(capture))
1775 for packet in capture:
1777 self.assertEqual(packet[IP].src, nat_ip)
1778 if packet.haslayer(TCP):
1779 self.tcp_port_out = packet[TCP].sport
1780 elif packet.haslayer(UDP):
1781 self.udp_port_out = packet[UDP].sport
1783 self.icmp_external_id = packet[ICMP].id
1785 self.logger.error(ppp("Unexpected or invalid packet "
1786 "(outside network):", packet))
1789 def initiate_tcp_session(self, in_if, out_if):
1791 Initiates TCP session
1793 :param in_if: Inside interface
1794 :param out_if: Outside interface
1797 # SYN packet in->out
1798 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1799 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1800 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1803 self.pg_enable_capture(self.pg_interfaces)
1805 capture = out_if.get_capture(1)
1807 self.tcp_port_out = p[TCP].sport
1809 # SYN + ACK packet out->in
1810 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
1811 IP(src=out_if.remote_ip4, dst=self.snat_addr) /
1812 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
1814 out_if.add_stream(p)
1815 self.pg_enable_capture(self.pg_interfaces)
1817 in_if.get_capture(1)
1819 # ACK packet in->out
1820 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1821 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1822 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1825 self.pg_enable_capture(self.pg_interfaces)
1827 out_if.get_capture(1)
1830 self.logger.error("TCP 3 way handshake failed")
1833 def verify_ipfix_max_entries_per_user(self, data):
1835 Verify IPFIX maximum entries per user exceeded event
1837 :param data: Decoded IPFIX data records
1839 self.assertEqual(1, len(data))
1842 self.assertEqual(ord(record[230]), 13)
1843 # natQuotaExceededEvent
1844 self.assertEqual('\x03\x00\x00\x00', record[466])
1846 self.assertEqual(self.pg0.remote_ip4n, record[8])
1848 def test_deterministic_mode(self):
1849 """ S-NAT run deterministic mode """
1850 in_addr = '172.16.255.0'
1851 out_addr = '172.17.255.50'
1852 in_addr_t = '172.16.255.20'
1853 in_addr_n = socket.inet_aton(in_addr)
1854 out_addr_n = socket.inet_aton(out_addr)
1855 in_addr_t_n = socket.inet_aton(in_addr_t)
1859 snat_config = self.vapi.snat_show_config()
1860 self.assertEqual(1, snat_config.deterministic)
1862 self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
1864 rep1 = self.vapi.snat_det_forward(in_addr_t_n)
1865 self.assertEqual(rep1.out_addr[:4], out_addr_n)
1866 rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
1867 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
1869 deterministic_mappings = self.vapi.snat_det_map_dump()
1870 self.assertEqual(len(deterministic_mappings), 1)
1871 dsm = deterministic_mappings[0]
1872 self.assertEqual(in_addr_n, dsm.in_addr[:4])
1873 self.assertEqual(in_plen, dsm.in_plen)
1874 self.assertEqual(out_addr_n, dsm.out_addr[:4])
1875 self.assertEqual(out_plen, dsm.out_plen)
1878 deterministic_mappings = self.vapi.snat_det_map_dump()
1879 self.assertEqual(len(deterministic_mappings), 0)
1881 def test_set_timeouts(self):
1882 """ Set deterministic NAT timeouts """
1883 timeouts_before = self.vapi.snat_det_get_timeouts()
1885 self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10,
1886 timeouts_before.tcp_established + 10,
1887 timeouts_before.tcp_transitory + 10,
1888 timeouts_before.icmp + 10)
1890 timeouts_after = self.vapi.snat_det_get_timeouts()
1892 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
1893 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
1894 self.assertNotEqual(timeouts_before.tcp_established,
1895 timeouts_after.tcp_established)
1896 self.assertNotEqual(timeouts_before.tcp_transitory,
1897 timeouts_after.tcp_transitory)
1899 def test_det_in(self):
1900 """ CGNAT translation test (TCP, UDP, ICMP) """
1902 nat_ip = "10.0.0.10"
1904 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1906 socket.inet_aton(nat_ip),
1908 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1909 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1913 pkts = self.create_stream_in(self.pg0, self.pg1)
1914 self.pg0.add_stream(pkts)
1915 self.pg_enable_capture(self.pg_interfaces)
1917 capture = self.pg1.get_capture(len(pkts))
1918 self.verify_capture_out(capture, nat_ip)
1921 pkts = self.create_stream_out(self.pg1, nat_ip)
1922 self.pg1.add_stream(pkts)
1923 self.pg_enable_capture(self.pg_interfaces)
1925 capture = self.pg0.get_capture(len(pkts))
1926 self.verify_capture_in(capture, self.pg0)
1929 sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n)
1930 self.assertEqual(len(sessions), 3)
1934 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
1935 self.assertEqual(s.in_port, self.tcp_port_in)
1936 self.assertEqual(s.out_port, self.tcp_port_out)
1937 self.assertEqual(s.ext_port, self.tcp_external_port)
1941 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
1942 self.assertEqual(s.in_port, self.udp_port_in)
1943 self.assertEqual(s.out_port, self.udp_port_out)
1944 self.assertEqual(s.ext_port, self.udp_external_port)
1948 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
1949 self.assertEqual(s.in_port, self.icmp_id_in)
1950 self.assertEqual(s.out_port, self.icmp_external_id)
1952 def test_multiple_users(self):
1953 """ CGNAT multiple users """
1955 nat_ip = "10.0.0.10"
1957 external_port = 6303
1959 host0 = self.pg0.remote_hosts[0]
1960 host1 = self.pg0.remote_hosts[1]
1962 self.vapi.snat_add_det_map(host0.ip4n,
1964 socket.inet_aton(nat_ip),
1966 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1967 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1971 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
1972 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
1973 TCP(sport=port_in, dport=external_port))
1974 self.pg0.add_stream(p)
1975 self.pg_enable_capture(self.pg_interfaces)
1977 capture = self.pg1.get_capture(1)
1982 self.assertEqual(ip.src, nat_ip)
1983 self.assertEqual(ip.dst, self.pg1.remote_ip4)
1984 self.assertEqual(tcp.dport, external_port)
1985 port_out0 = tcp.sport
1987 self.logger.error(ppp("Unexpected or invalid packet:", p))
1991 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
1992 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
1993 TCP(sport=port_in, dport=external_port))
1994 self.pg0.add_stream(p)
1995 self.pg_enable_capture(self.pg_interfaces)
1997 capture = self.pg1.get_capture(1)
2002 self.assertEqual(ip.src, nat_ip)
2003 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2004 self.assertEqual(tcp.dport, external_port)
2005 port_out1 = tcp.sport
2007 self.logger.error(ppp("Unexpected or invalid packet:", p))
2010 dms = self.vapi.snat_det_map_dump()
2011 self.assertEqual(1, len(dms))
2012 self.assertEqual(2, dms[0].ses_num)
2015 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2016 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2017 TCP(sport=external_port, dport=port_out0))
2018 self.pg1.add_stream(p)
2019 self.pg_enable_capture(self.pg_interfaces)
2021 capture = self.pg0.get_capture(1)
2026 self.assertEqual(ip.src, self.pg1.remote_ip4)
2027 self.assertEqual(ip.dst, host0.ip4)
2028 self.assertEqual(tcp.dport, port_in)
2029 self.assertEqual(tcp.sport, external_port)
2031 self.logger.error(ppp("Unexpected or invalid packet:", p))
2035 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2036 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2037 TCP(sport=external_port, dport=port_out1))
2038 self.pg1.add_stream(p)
2039 self.pg_enable_capture(self.pg_interfaces)
2041 capture = self.pg0.get_capture(1)
2046 self.assertEqual(ip.src, self.pg1.remote_ip4)
2047 self.assertEqual(ip.dst, host1.ip4)
2048 self.assertEqual(tcp.dport, port_in)
2049 self.assertEqual(tcp.sport, external_port)
2051 self.logger.error(ppp("Unexpected or invalid packet", p))
2054 # session close api test
2055 self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip),
2057 self.pg1.remote_ip4n,
2059 dms = self.vapi.snat_det_map_dump()
2060 self.assertEqual(dms[0].ses_num, 1)
2062 self.vapi.snat_det_close_session_in(host0.ip4n,
2064 self.pg1.remote_ip4n,
2066 dms = self.vapi.snat_det_map_dump()
2067 self.assertEqual(dms[0].ses_num, 0)
2069 def test_tcp_session_close_detection_in(self):
2070 """ CGNAT TCP session close initiated from inside network """
2071 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2073 socket.inet_aton(self.snat_addr),
2075 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2076 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2079 self.initiate_tcp_session(self.pg0, self.pg1)
2081 # close the session from inside
2083 # FIN packet in -> out
2084 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2085 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2086 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2088 self.pg0.add_stream(p)
2089 self.pg_enable_capture(self.pg_interfaces)
2091 self.pg1.get_capture(1)
2095 # ACK packet out -> in
2096 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2097 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2098 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2102 # FIN packet out -> in
2103 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2104 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2105 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2109 self.pg1.add_stream(pkts)
2110 self.pg_enable_capture(self.pg_interfaces)
2112 self.pg0.get_capture(2)
2114 # ACK packet in -> out
2115 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2116 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2117 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2119 self.pg0.add_stream(p)
2120 self.pg_enable_capture(self.pg_interfaces)
2122 self.pg1.get_capture(1)
2124 # Check if snat closed the session
2125 dms = self.vapi.snat_det_map_dump()
2126 self.assertEqual(0, dms[0].ses_num)
2128 self.logger.error("TCP session termination failed")
2131 def test_tcp_session_close_detection_out(self):
2132 """ CGNAT TCP session close initiated from outside network """
2133 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2135 socket.inet_aton(self.snat_addr),
2137 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2138 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2141 self.initiate_tcp_session(self.pg0, self.pg1)
2143 # close the session from outside
2145 # FIN packet out -> in
2146 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2147 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2148 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2150 self.pg1.add_stream(p)
2151 self.pg_enable_capture(self.pg_interfaces)
2153 self.pg0.get_capture(1)
2157 # ACK packet in -> out
2158 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2159 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2160 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2164 # ACK packet in -> out
2165 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2166 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2167 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2171 self.pg0.add_stream(pkts)
2172 self.pg_enable_capture(self.pg_interfaces)
2174 self.pg1.get_capture(2)
2176 # ACK packet out -> in
2177 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2178 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2179 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2181 self.pg1.add_stream(p)
2182 self.pg_enable_capture(self.pg_interfaces)
2184 self.pg0.get_capture(1)
2186 # Check if snat closed the session
2187 dms = self.vapi.snat_det_map_dump()
2188 self.assertEqual(0, dms[0].ses_num)
2190 self.logger.error("TCP session termination failed")
2193 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2194 def test_session_timeout(self):
2195 """ CGNAT session timeouts """
2196 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2198 socket.inet_aton(self.snat_addr),
2200 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2201 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2204 self.initiate_tcp_session(self.pg0, self.pg1)
2205 self.vapi.snat_det_set_timeouts(5, 5, 5, 5)
2206 pkts = self.create_stream_in(self.pg0, self.pg1)
2207 self.pg0.add_stream(pkts)
2208 self.pg_enable_capture(self.pg_interfaces)
2210 capture = self.pg1.get_capture(len(pkts))
2213 dms = self.vapi.snat_det_map_dump()
2214 self.assertEqual(0, dms[0].ses_num)
2216 def test_session_limit_per_user(self):
2217 """ CGNAT maximum 1000 sessions per user should be created """
2218 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2220 socket.inet_aton(self.snat_addr),
2222 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2223 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2225 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2226 src_address=self.pg2.local_ip4n,
2228 template_interval=10)
2229 self.vapi.snat_ipfix()
2232 for port in range(1025, 2025):
2233 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2234 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2235 UDP(sport=port, dport=port))
2238 self.pg0.add_stream(pkts)
2239 self.pg_enable_capture(self.pg_interfaces)
2241 capture = self.pg1.get_capture(len(pkts))
2243 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2244 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2245 UDP(sport=3001, dport=3002))
2246 self.pg0.add_stream(p)
2247 self.pg_enable_capture(self.pg_interfaces)
2249 capture = self.pg1.assert_nothing_captured()
2251 # verify ICMP error packet
2252 capture = self.pg0.get_capture(1)
2254 self.assertTrue(p.haslayer(ICMP))
2256 self.assertEqual(icmp.type, 3)
2257 self.assertEqual(icmp.code, 1)
2258 self.assertTrue(icmp.haslayer(IPerror))
2259 inner_ip = icmp[IPerror]
2260 self.assertEqual(inner_ip[UDPerror].sport, 3001)
2261 self.assertEqual(inner_ip[UDPerror].dport, 3002)
2263 dms = self.vapi.snat_det_map_dump()
2265 self.assertEqual(1000, dms[0].ses_num)
2267 # verify IPFIX logging
2268 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2269 capture = self.pg2.get_capture(2)
2270 ipfix = IPFIXDecoder()
2271 # first load template
2273 self.assertTrue(p.haslayer(IPFIX))
2274 if p.haslayer(Template):
2275 ipfix.add_template(p.getlayer(Template))
2276 # verify events in data set
2278 if p.haslayer(Data):
2279 data = ipfix.decode_data_set(p.getlayer(Set))
2280 self.verify_ipfix_max_entries_per_user(data)
2282 def clear_snat(self):
2284 Clear SNAT configuration.
2286 self.vapi.snat_ipfix(enable=0)
2287 self.vapi.snat_det_set_timeouts()
2288 deterministic_mappings = self.vapi.snat_det_map_dump()
2289 for dsm in deterministic_mappings:
2290 self.vapi.snat_add_det_map(dsm.in_addr,
2296 interfaces = self.vapi.snat_interface_dump()
2297 for intf in interfaces:
2298 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
2303 super(TestDeterministicNAT, self).tearDown()
2304 if not self.vpp_dead:
2305 self.logger.info(self.vapi.cli("show snat detail"))
2308 if __name__ == '__main__':
2309 unittest.main(testRunner=VppTestRunner)