7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.l2 import Ether, ARP
11 from scapy.data import IP_PROTOS
13 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
14 from time import sleep
17 class MethodHolder(VppTestCase):
18 """ SNAT create capture and verify method holder """
22 super(MethodHolder, cls).setUpClass()
25 super(MethodHolder, self).tearDown()
27 def create_stream_in(self, in_if, out_if, ttl=64):
29 Create packet stream for inside network
31 :param in_if: Inside interface
32 :param out_if: Outside interface
33 :param ttl: TTL of generated packets
37 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
38 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
39 TCP(sport=self.tcp_port_in))
43 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
44 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
45 UDP(sport=self.udp_port_in))
49 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
50 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
51 ICMP(id=self.icmp_id_in, type='echo-request'))
56 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
58 Create packet stream for outside network
60 :param out_if: Outside interface
61 :param dst_ip: Destination IP address (Default use global SNAT address)
62 :param ttl: TTL of generated packets
65 dst_ip = self.snat_addr
68 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
69 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
70 TCP(dport=self.tcp_port_out))
74 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
75 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
76 UDP(dport=self.udp_port_out))
80 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
81 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
82 ICMP(id=self.icmp_id_out, type='echo-reply'))
87 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
90 Verify captured packets on outside network
92 :param capture: Captured packets
93 :param nat_ip: Translated IP address (Default use global SNAT address)
94 :param same_port: Sorce port number is not translated (Default False)
95 :param packet_num: Expected number of packets (Default 3)
98 nat_ip = self.snat_addr
99 self.assertEqual(packet_num, len(capture))
100 for packet in capture:
102 self.assertEqual(packet[IP].src, nat_ip)
103 if packet.haslayer(TCP):
105 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
108 packet[TCP].sport, self.tcp_port_in)
109 self.tcp_port_out = packet[TCP].sport
110 elif packet.haslayer(UDP):
112 self.assertEqual(packet[UDP].sport, self.udp_port_in)
115 packet[UDP].sport, self.udp_port_in)
116 self.udp_port_out = packet[UDP].sport
119 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
121 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
122 self.icmp_id_out = packet[ICMP].id
124 self.logger.error(ppp("Unexpected or invalid packet "
125 "(outside network):", packet))
128 def verify_capture_in(self, capture, in_if, packet_num=3):
130 Verify captured packets on inside network
132 :param capture: Captured packets
133 :param in_if: Inside interface
134 :param packet_num: Expected number of packets (Default 3)
136 self.assertEqual(packet_num, len(capture))
137 for packet in capture:
139 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
140 if packet.haslayer(TCP):
141 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
142 elif packet.haslayer(UDP):
143 self.assertEqual(packet[UDP].dport, self.udp_port_in)
145 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
147 self.logger.error(ppp("Unexpected or invalid packet "
148 "(inside network):", packet))
151 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
153 Verify captured packet that don't have to be translated
155 :param capture: Captured packets
156 :param ingress_if: Ingress interface
157 :param egress_if: Egress interface
159 for packet in capture:
161 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
162 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
163 if packet.haslayer(TCP):
164 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
165 elif packet.haslayer(UDP):
166 self.assertEqual(packet[UDP].sport, self.udp_port_in)
168 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
170 self.logger.error(ppp("Unexpected or invalid packet "
171 "(inside network):", packet))
174 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
175 packet_num=3, icmp_type=11):
177 Verify captured packets with ICMP errors on outside network
179 :param capture: Captured packets
180 :param src_ip: Translated IP address or IP address of VPP
181 (Default use global SNAT address)
182 :param packet_num: Expected number of packets (Default 3)
183 :param icmp_type: Type of error ICMP packet
184 we are expecting (Default 11)
187 src_ip = self.snat_addr
188 self.assertEqual(packet_num, len(capture))
189 for packet in capture:
191 self.assertEqual(packet[IP].src, src_ip)
192 self.assertTrue(packet.haslayer(ICMP))
194 self.assertEqual(icmp.type, icmp_type)
195 self.assertTrue(icmp.haslayer(IPerror))
196 inner_ip = icmp[IPerror]
197 if inner_ip.haslayer(TCPerror):
198 self.assertEqual(inner_ip[TCPerror].dport,
200 elif inner_ip.haslayer(UDPerror):
201 self.assertEqual(inner_ip[UDPerror].dport,
204 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
206 self.logger.error(ppp("Unexpected or invalid packet "
207 "(outside network):", packet))
210 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
213 Verify captured packets with ICMP errors on inside network
215 :param capture: Captured packets
216 :param in_if: Inside interface
217 :param packet_num: Expected number of packets (Default 3)
218 :param icmp_type: Type of error ICMP packet
219 we are expecting (Default 11)
221 self.assertEqual(packet_num, len(capture))
222 for packet in capture:
224 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
225 self.assertTrue(packet.haslayer(ICMP))
227 self.assertEqual(icmp.type, icmp_type)
228 self.assertTrue(icmp.haslayer(IPerror))
229 inner_ip = icmp[IPerror]
230 if inner_ip.haslayer(TCPerror):
231 self.assertEqual(inner_ip[TCPerror].sport,
233 elif inner_ip.haslayer(UDPerror):
234 self.assertEqual(inner_ip[UDPerror].sport,
237 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
239 self.logger.error(ppp("Unexpected or invalid packet "
240 "(inside network):", packet))
243 def verify_ipfix_nat44_ses(self, data):
245 Verify IPFIX NAT44 session create/delete event
247 :param data: Decoded IPFIX data records
249 nat44_ses_create_num = 0
250 nat44_ses_delete_num = 0
251 self.assertEqual(6, len(data))
254 self.assertIn(ord(record[230]), [4, 5])
255 if ord(record[230]) == 4:
256 nat44_ses_create_num += 1
258 nat44_ses_delete_num += 1
260 self.assertEqual(self.pg0.remote_ip4n, record[8])
261 # postNATSourceIPv4Address
262 self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
265 self.assertEqual(struct.pack("!I", 0), record[234])
266 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
267 if IP_PROTOS.icmp == ord(record[4]):
268 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
269 self.assertEqual(struct.pack("!H", self.icmp_id_out),
271 elif IP_PROTOS.tcp == ord(record[4]):
272 self.assertEqual(struct.pack("!H", self.tcp_port_in),
274 self.assertEqual(struct.pack("!H", self.tcp_port_out),
276 elif IP_PROTOS.udp == ord(record[4]):
277 self.assertEqual(struct.pack("!H", self.udp_port_in),
279 self.assertEqual(struct.pack("!H", self.udp_port_out),
282 self.fail("Invalid protocol")
283 self.assertEqual(3, nat44_ses_create_num)
284 self.assertEqual(3, nat44_ses_delete_num)
286 def verify_ipfix_addr_exhausted(self, data):
288 Verify IPFIX NAT addresses event
290 :param data: Decoded IPFIX data records
292 self.assertEqual(1, len(data))
295 self.assertEqual(ord(record[230]), 3)
297 self.assertEqual(struct.pack("!I", 0), record[283])
300 class TestSNAT(MethodHolder):
301 """ SNAT Test Cases """
305 super(TestSNAT, cls).setUpClass()
308 cls.tcp_port_in = 6303
309 cls.tcp_port_out = 6303
310 cls.udp_port_in = 6304
311 cls.udp_port_out = 6304
312 cls.icmp_id_in = 6305
313 cls.icmp_id_out = 6305
314 cls.snat_addr = '10.0.0.3'
316 cls.create_pg_interfaces(range(8))
317 cls.interfaces = list(cls.pg_interfaces[0:4])
319 for i in cls.interfaces:
324 cls.pg0.generate_remote_hosts(2)
325 cls.pg0.configure_ipv4_neighbors()
327 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
329 cls.pg4._local_ip4 = "172.16.255.1"
330 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
331 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
332 cls.pg4.set_table_ip4(10)
333 cls.pg5._local_ip4 = "172.16.255.3"
334 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
335 cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
336 cls.pg5.set_table_ip4(10)
337 cls.pg6._local_ip4 = "172.16.255.1"
338 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
339 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
340 cls.pg6.set_table_ip4(20)
341 for i in cls.overlapping_interfaces:
349 super(TestSNAT, cls).tearDownClass()
352 def clear_snat(self):
354 Clear SNAT configuration.
356 if self.pg7.has_ip4_config:
357 self.pg7.unconfig_ip4()
359 interfaces = self.vapi.snat_interface_addr_dump()
360 for intf in interfaces:
361 self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
363 self.vapi.snat_ipfix(enable=0)
365 interfaces = self.vapi.snat_interface_dump()
366 for intf in interfaces:
367 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
371 static_mappings = self.vapi.snat_static_mapping_dump()
372 for sm in static_mappings:
373 self.vapi.snat_add_static_mapping(sm.local_ip_address,
374 sm.external_ip_address,
375 local_port=sm.local_port,
376 external_port=sm.external_port,
377 addr_only=sm.addr_only,
379 protocol=sm.protocol,
382 adresses = self.vapi.snat_address_dump()
383 for addr in adresses:
384 self.vapi.snat_add_address_range(addr.ip_address,
388 def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
389 local_port=0, external_port=0, vrf_id=0,
390 is_add=1, external_sw_if_index=0xFFFFFFFF,
393 Add/delete S-NAT static mapping
395 :param local_ip: Local IP address
396 :param external_ip: External IP address
397 :param local_port: Local port number (Optional)
398 :param external_port: External port number (Optional)
399 :param vrf_id: VRF ID (Default 0)
400 :param is_add: 1 if add, 0 if delete (Default add)
401 :param external_sw_if_index: External interface instead of IP address
402 :param proto: IP protocol (Mandatory if port specified)
405 if local_port and external_port:
407 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
408 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
409 self.vapi.snat_add_static_mapping(
412 external_sw_if_index,
420 def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
422 Add/delete S-NAT address
424 :param ip: IP address
425 :param is_add: 1 if add, 0 if delete (Default add)
427 snat_addr = socket.inet_pton(socket.AF_INET, ip)
428 self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
431 def test_dynamic(self):
432 """ SNAT dynamic translation test """
434 self.snat_add_address(self.snat_addr)
435 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
436 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
440 pkts = self.create_stream_in(self.pg0, self.pg1)
441 self.pg0.add_stream(pkts)
442 self.pg_enable_capture(self.pg_interfaces)
444 capture = self.pg1.get_capture(len(pkts))
445 self.verify_capture_out(capture)
448 pkts = self.create_stream_out(self.pg1)
449 self.pg1.add_stream(pkts)
450 self.pg_enable_capture(self.pg_interfaces)
452 capture = self.pg0.get_capture(len(pkts))
453 self.verify_capture_in(capture, self.pg0)
455 def test_dynamic_icmp_errors_in2out_ttl_1(self):
456 """ SNAT handling of client packets with TTL=1 """
458 self.snat_add_address(self.snat_addr)
459 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
460 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
463 # Client side - generate traffic
464 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
465 self.pg0.add_stream(pkts)
466 self.pg_enable_capture(self.pg_interfaces)
469 # Client side - verify ICMP type 11 packets
470 capture = self.pg0.get_capture(len(pkts))
471 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
473 def test_dynamic_icmp_errors_out2in_ttl_1(self):
474 """ SNAT handling of server packets with TTL=1 """
476 self.snat_add_address(self.snat_addr)
477 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
478 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
481 # Client side - create sessions
482 pkts = self.create_stream_in(self.pg0, self.pg1)
483 self.pg0.add_stream(pkts)
484 self.pg_enable_capture(self.pg_interfaces)
487 # Server side - generate traffic
488 capture = self.pg1.get_capture(len(pkts))
489 self.verify_capture_out(capture)
490 pkts = self.create_stream_out(self.pg1, ttl=1)
491 self.pg1.add_stream(pkts)
492 self.pg_enable_capture(self.pg_interfaces)
495 # Server side - verify ICMP type 11 packets
496 capture = self.pg1.get_capture(len(pkts))
497 self.verify_capture_out_with_icmp_errors(capture,
498 src_ip=self.pg1.local_ip4)
500 def test_dynamic_icmp_errors_in2out_ttl_2(self):
501 """ SNAT handling of error responses to client packets with TTL=2 """
503 self.snat_add_address(self.snat_addr)
504 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
505 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
508 # Client side - generate traffic
509 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
510 self.pg0.add_stream(pkts)
511 self.pg_enable_capture(self.pg_interfaces)
514 # Server side - simulate ICMP type 11 response
515 capture = self.pg1.get_capture(len(pkts))
516 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
517 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
518 ICMP(type=11) / packet[IP] for packet in capture]
519 self.pg1.add_stream(pkts)
520 self.pg_enable_capture(self.pg_interfaces)
523 # Client side - verify ICMP type 11 packets
524 capture = self.pg0.get_capture(len(pkts))
525 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
527 def test_dynamic_icmp_errors_out2in_ttl_2(self):
528 """ SNAT handling of error responses to server packets with TTL=2 """
530 self.snat_add_address(self.snat_addr)
531 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
532 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
535 # Client side - create sessions
536 pkts = self.create_stream_in(self.pg0, self.pg1)
537 self.pg0.add_stream(pkts)
538 self.pg_enable_capture(self.pg_interfaces)
541 # Server side - generate traffic
542 capture = self.pg1.get_capture(len(pkts))
543 self.verify_capture_out(capture)
544 pkts = self.create_stream_out(self.pg1, ttl=2)
545 self.pg1.add_stream(pkts)
546 self.pg_enable_capture(self.pg_interfaces)
549 # Client side - simulate ICMP type 11 response
550 capture = self.pg0.get_capture(len(pkts))
551 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
552 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
553 ICMP(type=11) / packet[IP] for packet in capture]
554 self.pg0.add_stream(pkts)
555 self.pg_enable_capture(self.pg_interfaces)
558 # Server side - verify ICMP type 11 packets
559 capture = self.pg1.get_capture(len(pkts))
560 self.verify_capture_out_with_icmp_errors(capture)
562 def test_ping_out_interface_from_outside(self):
563 """ Ping SNAT out interface from outside """
565 self.snat_add_address(self.snat_addr)
566 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
567 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
570 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
571 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
572 ICMP(id=self.icmp_id_out, type='echo-request'))
574 self.pg1.add_stream(pkts)
575 self.pg_enable_capture(self.pg_interfaces)
577 capture = self.pg1.get_capture(len(pkts))
578 self.assertEqual(1, len(capture))
581 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
582 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
583 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
584 self.assertEqual(packet[ICMP].type, 0) # echo reply
586 self.logger.error(ppp("Unexpected or invalid packet "
587 "(outside network):", packet))
590 def test_static_in(self):
591 """ SNAT 1:1 NAT initialized from inside network """
594 self.tcp_port_out = 6303
595 self.udp_port_out = 6304
596 self.icmp_id_out = 6305
598 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
599 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
600 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
604 pkts = self.create_stream_in(self.pg0, self.pg1)
605 self.pg0.add_stream(pkts)
606 self.pg_enable_capture(self.pg_interfaces)
608 capture = self.pg1.get_capture(len(pkts))
609 self.verify_capture_out(capture, nat_ip, True)
612 pkts = self.create_stream_out(self.pg1, nat_ip)
613 self.pg1.add_stream(pkts)
614 self.pg_enable_capture(self.pg_interfaces)
616 capture = self.pg0.get_capture(len(pkts))
617 self.verify_capture_in(capture, self.pg0)
619 def test_static_out(self):
620 """ SNAT 1:1 NAT initialized from outside network """
623 self.tcp_port_out = 6303
624 self.udp_port_out = 6304
625 self.icmp_id_out = 6305
627 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
628 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
629 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
633 pkts = self.create_stream_out(self.pg1, nat_ip)
634 self.pg1.add_stream(pkts)
635 self.pg_enable_capture(self.pg_interfaces)
637 capture = self.pg0.get_capture(len(pkts))
638 self.verify_capture_in(capture, self.pg0)
641 pkts = self.create_stream_in(self.pg0, self.pg1)
642 self.pg0.add_stream(pkts)
643 self.pg_enable_capture(self.pg_interfaces)
645 capture = self.pg1.get_capture(len(pkts))
646 self.verify_capture_out(capture, nat_ip, True)
648 def test_static_with_port_in(self):
649 """ SNAT 1:1 NAT with port initialized from inside network """
651 self.tcp_port_out = 3606
652 self.udp_port_out = 3607
653 self.icmp_id_out = 3608
655 self.snat_add_address(self.snat_addr)
656 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
657 self.tcp_port_in, self.tcp_port_out,
659 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
660 self.udp_port_in, self.udp_port_out,
662 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
663 self.icmp_id_in, self.icmp_id_out,
664 proto=IP_PROTOS.icmp)
665 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
666 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
670 pkts = self.create_stream_in(self.pg0, self.pg1)
671 self.pg0.add_stream(pkts)
672 self.pg_enable_capture(self.pg_interfaces)
674 capture = self.pg1.get_capture(len(pkts))
675 self.verify_capture_out(capture)
678 pkts = self.create_stream_out(self.pg1)
679 self.pg1.add_stream(pkts)
680 self.pg_enable_capture(self.pg_interfaces)
682 capture = self.pg0.get_capture(len(pkts))
683 self.verify_capture_in(capture, self.pg0)
685 def test_static_with_port_out(self):
686 """ SNAT 1:1 NAT with port initialized from outside network """
688 self.tcp_port_out = 30606
689 self.udp_port_out = 30607
690 self.icmp_id_out = 30608
692 self.snat_add_address(self.snat_addr)
693 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
694 self.tcp_port_in, self.tcp_port_out,
696 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
697 self.udp_port_in, self.udp_port_out,
699 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
700 self.icmp_id_in, self.icmp_id_out,
701 proto=IP_PROTOS.icmp)
702 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
703 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
707 pkts = self.create_stream_out(self.pg1)
708 self.pg1.add_stream(pkts)
709 self.pg_enable_capture(self.pg_interfaces)
711 capture = self.pg0.get_capture(len(pkts))
712 self.verify_capture_in(capture, self.pg0)
715 pkts = self.create_stream_in(self.pg0, self.pg1)
716 self.pg0.add_stream(pkts)
717 self.pg_enable_capture(self.pg_interfaces)
719 capture = self.pg1.get_capture(len(pkts))
720 self.verify_capture_out(capture)
722 def test_static_vrf_aware(self):
723 """ SNAT 1:1 NAT VRF awareness """
725 nat_ip1 = "10.0.0.30"
726 nat_ip2 = "10.0.0.40"
727 self.tcp_port_out = 6303
728 self.udp_port_out = 6304
729 self.icmp_id_out = 6305
731 self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
733 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
735 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
737 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
738 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
740 # inside interface VRF match SNAT static mapping VRF
741 pkts = self.create_stream_in(self.pg4, self.pg3)
742 self.pg4.add_stream(pkts)
743 self.pg_enable_capture(self.pg_interfaces)
745 capture = self.pg3.get_capture(len(pkts))
746 self.verify_capture_out(capture, nat_ip1, True)
748 # inside interface VRF don't match SNAT static mapping VRF (packets
750 pkts = self.create_stream_in(self.pg0, self.pg3)
751 self.pg0.add_stream(pkts)
752 self.pg_enable_capture(self.pg_interfaces)
754 self.pg3.assert_nothing_captured()
756 def test_multiple_inside_interfaces(self):
757 """ SNAT multiple inside interfaces (non-overlapping address space) """
759 self.snat_add_address(self.snat_addr)
760 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
761 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
762 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
765 # between two S-NAT inside interfaces (no translation)
766 pkts = self.create_stream_in(self.pg0, self.pg1)
767 self.pg0.add_stream(pkts)
768 self.pg_enable_capture(self.pg_interfaces)
770 capture = self.pg1.get_capture(len(pkts))
771 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
773 # from S-NAT inside to interface without S-NAT feature (no translation)
774 pkts = self.create_stream_in(self.pg0, self.pg2)
775 self.pg0.add_stream(pkts)
776 self.pg_enable_capture(self.pg_interfaces)
778 capture = self.pg2.get_capture(len(pkts))
779 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
781 # in2out 1st interface
782 pkts = self.create_stream_in(self.pg0, self.pg3)
783 self.pg0.add_stream(pkts)
784 self.pg_enable_capture(self.pg_interfaces)
786 capture = self.pg3.get_capture(len(pkts))
787 self.verify_capture_out(capture)
789 # out2in 1st interface
790 pkts = self.create_stream_out(self.pg3)
791 self.pg3.add_stream(pkts)
792 self.pg_enable_capture(self.pg_interfaces)
794 capture = self.pg0.get_capture(len(pkts))
795 self.verify_capture_in(capture, self.pg0)
797 # in2out 2nd interface
798 pkts = self.create_stream_in(self.pg1, self.pg3)
799 self.pg1.add_stream(pkts)
800 self.pg_enable_capture(self.pg_interfaces)
802 capture = self.pg3.get_capture(len(pkts))
803 self.verify_capture_out(capture)
805 # out2in 2nd interface
806 pkts = self.create_stream_out(self.pg3)
807 self.pg3.add_stream(pkts)
808 self.pg_enable_capture(self.pg_interfaces)
810 capture = self.pg1.get_capture(len(pkts))
811 self.verify_capture_in(capture, self.pg1)
813 def test_inside_overlapping_interfaces(self):
814 """ SNAT multiple inside interfaces with overlapping address space """
816 static_nat_ip = "10.0.0.10"
817 self.snat_add_address(self.snat_addr)
818 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
820 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
821 self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
822 self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
823 self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
826 # between S-NAT inside interfaces with same VRF (no translation)
827 pkts = self.create_stream_in(self.pg4, self.pg5)
828 self.pg4.add_stream(pkts)
829 self.pg_enable_capture(self.pg_interfaces)
831 capture = self.pg5.get_capture(len(pkts))
832 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
834 # between S-NAT inside interfaces with different VRF (hairpinning)
835 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
836 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
837 TCP(sport=1234, dport=5678))
838 self.pg4.add_stream(p)
839 self.pg_enable_capture(self.pg_interfaces)
841 capture = self.pg6.get_capture(1)
846 self.assertEqual(ip.src, self.snat_addr)
847 self.assertEqual(ip.dst, self.pg6.remote_ip4)
848 self.assertNotEqual(tcp.sport, 1234)
849 self.assertEqual(tcp.dport, 5678)
851 self.logger.error(ppp("Unexpected or invalid packet:", p))
854 # in2out 1st interface
855 pkts = self.create_stream_in(self.pg4, self.pg3)
856 self.pg4.add_stream(pkts)
857 self.pg_enable_capture(self.pg_interfaces)
859 capture = self.pg3.get_capture(len(pkts))
860 self.verify_capture_out(capture)
862 # out2in 1st interface
863 pkts = self.create_stream_out(self.pg3)
864 self.pg3.add_stream(pkts)
865 self.pg_enable_capture(self.pg_interfaces)
867 capture = self.pg4.get_capture(len(pkts))
868 self.verify_capture_in(capture, self.pg4)
870 # in2out 2nd interface
871 pkts = self.create_stream_in(self.pg5, self.pg3)
872 self.pg5.add_stream(pkts)
873 self.pg_enable_capture(self.pg_interfaces)
875 capture = self.pg3.get_capture(len(pkts))
876 self.verify_capture_out(capture)
878 # out2in 2nd interface
879 pkts = self.create_stream_out(self.pg3)
880 self.pg3.add_stream(pkts)
881 self.pg_enable_capture(self.pg_interfaces)
883 capture = self.pg5.get_capture(len(pkts))
884 self.verify_capture_in(capture, self.pg5)
887 addresses = self.vapi.snat_address_dump()
888 self.assertEqual(len(addresses), 1)
889 sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
890 self.assertEqual(len(sessions), 3)
891 for session in sessions:
892 self.assertFalse(session.is_static)
893 self.assertEqual(session.inside_ip_address[0:4],
894 self.pg5.remote_ip4n)
895 self.assertEqual(session.outside_ip_address,
896 addresses[0].ip_address)
897 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
898 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
899 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
900 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
901 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
902 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
903 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
904 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
905 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
907 # in2out 3rd interface
908 pkts = self.create_stream_in(self.pg6, self.pg3)
909 self.pg6.add_stream(pkts)
910 self.pg_enable_capture(self.pg_interfaces)
912 capture = self.pg3.get_capture(len(pkts))
913 self.verify_capture_out(capture, static_nat_ip, True)
915 # out2in 3rd interface
916 pkts = self.create_stream_out(self.pg3, static_nat_ip)
917 self.pg3.add_stream(pkts)
918 self.pg_enable_capture(self.pg_interfaces)
920 capture = self.pg6.get_capture(len(pkts))
921 self.verify_capture_in(capture, self.pg6)
923 # general user and session dump verifications
924 users = self.vapi.snat_user_dump()
925 self.assertTrue(len(users) >= 3)
926 addresses = self.vapi.snat_address_dump()
927 self.assertEqual(len(addresses), 1)
929 sessions = self.vapi.snat_user_session_dump(user.ip_address,
931 for session in sessions:
932 self.assertEqual(user.ip_address, session.inside_ip_address)
933 self.assertTrue(session.total_bytes > session.total_pkts > 0)
934 self.assertTrue(session.protocol in
935 [IP_PROTOS.tcp, IP_PROTOS.udp,
939 sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
940 self.assertTrue(len(sessions) >= 4)
941 for session in sessions:
942 self.assertFalse(session.is_static)
943 self.assertEqual(session.inside_ip_address[0:4],
944 self.pg4.remote_ip4n)
945 self.assertEqual(session.outside_ip_address,
946 addresses[0].ip_address)
949 sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
950 self.assertTrue(len(sessions) >= 3)
951 for session in sessions:
952 self.assertTrue(session.is_static)
953 self.assertEqual(session.inside_ip_address[0:4],
954 self.pg6.remote_ip4n)
955 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
956 map(int, static_nat_ip.split('.')))
957 self.assertTrue(session.inside_port in
958 [self.tcp_port_in, self.udp_port_in,
961 def test_hairpinning(self):
962 """ SNAT hairpinning """
964 host = self.pg0.remote_hosts[0]
965 server = self.pg0.remote_hosts[1]
968 server_in_port = 5678
969 server_out_port = 8765
971 self.snat_add_address(self.snat_addr)
972 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
973 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
975 # add static mapping for server
976 self.snat_add_static_mapping(server.ip4, self.snat_addr,
977 server_in_port, server_out_port,
980 # send packet from host to server
981 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
982 IP(src=host.ip4, dst=self.snat_addr) /
983 TCP(sport=host_in_port, dport=server_out_port))
984 self.pg0.add_stream(p)
985 self.pg_enable_capture(self.pg_interfaces)
987 capture = self.pg0.get_capture(1)
992 self.assertEqual(ip.src, self.snat_addr)
993 self.assertEqual(ip.dst, server.ip4)
994 self.assertNotEqual(tcp.sport, host_in_port)
995 self.assertEqual(tcp.dport, server_in_port)
996 host_out_port = tcp.sport
998 self.logger.error(ppp("Unexpected or invalid packet:", p))
1001 # send reply from server to host
1002 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1003 IP(src=server.ip4, dst=self.snat_addr) /
1004 TCP(sport=server_in_port, dport=host_out_port))
1005 self.pg0.add_stream(p)
1006 self.pg_enable_capture(self.pg_interfaces)
1008 capture = self.pg0.get_capture(1)
1013 self.assertEqual(ip.src, self.snat_addr)
1014 self.assertEqual(ip.dst, host.ip4)
1015 self.assertEqual(tcp.sport, server_out_port)
1016 self.assertEqual(tcp.dport, host_in_port)
1018 self.logger.error(ppp("Unexpected or invalid packet:"), p)
1021 def test_max_translations_per_user(self):
1022 """ MAX translations per user - recycle the least recently used """
1024 self.snat_add_address(self.snat_addr)
1025 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1026 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1029 # get maximum number of translations per user
1030 snat_config = self.vapi.snat_show_config()
1032 # send more than maximum number of translations per user packets
1033 pkts_num = snat_config.max_translations_per_user + 5
1035 for port in range(0, pkts_num):
1036 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1037 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1038 TCP(sport=1025 + port))
1040 self.pg0.add_stream(pkts)
1041 self.pg_enable_capture(self.pg_interfaces)
1044 # verify number of translated packet
1045 self.pg1.get_capture(pkts_num)
1047 def test_interface_addr(self):
1048 """ Acquire SNAT addresses from interface """
1049 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1051 # no address in NAT pool
1052 adresses = self.vapi.snat_address_dump()
1053 self.assertEqual(0, len(adresses))
1055 # configure interface address and check NAT address pool
1056 self.pg7.config_ip4()
1057 adresses = self.vapi.snat_address_dump()
1058 self.assertEqual(1, len(adresses))
1059 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1061 # remove interface address and check NAT address pool
1062 self.pg7.unconfig_ip4()
1063 adresses = self.vapi.snat_address_dump()
1064 self.assertEqual(0, len(adresses))
1066 def test_interface_addr_static_mapping(self):
1067 """ Static mapping with addresses from interface """
1068 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1069 self.snat_add_static_mapping('1.2.3.4',
1070 external_sw_if_index=self.pg7.sw_if_index)
1072 # static mappings with external interface
1073 static_mappings = self.vapi.snat_static_mapping_dump()
1074 self.assertEqual(1, len(static_mappings))
1075 self.assertEqual(self.pg7.sw_if_index,
1076 static_mappings[0].external_sw_if_index)
1078 # configure interface address and check static mappings
1079 self.pg7.config_ip4()
1080 static_mappings = self.vapi.snat_static_mapping_dump()
1081 self.assertEqual(1, len(static_mappings))
1082 self.assertEqual(static_mappings[0].external_ip_address[0:4],
1083 self.pg7.local_ip4n)
1084 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1086 # remove interface address and check static mappings
1087 self.pg7.unconfig_ip4()
1088 static_mappings = self.vapi.snat_static_mapping_dump()
1089 self.assertEqual(0, len(static_mappings))
1091 def test_ipfix_nat44_sess(self):
1092 """ S-NAT IPFIX logging NAT44 session created/delted """
1093 self.snat_add_address(self.snat_addr)
1094 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1095 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1097 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1098 src_address=self.pg3.local_ip4n,
1100 template_interval=10)
1101 self.vapi.snat_ipfix()
1103 pkts = self.create_stream_in(self.pg0, self.pg1)
1104 self.pg0.add_stream(pkts)
1105 self.pg_enable_capture(self.pg_interfaces)
1107 capture = self.pg1.get_capture(len(pkts))
1108 self.verify_capture_out(capture)
1109 self.snat_add_address(self.snat_addr, is_add=0)
1110 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1111 capture = self.pg3.get_capture(3)
1112 ipfix = IPFIXDecoder()
1113 # first load template
1115 self.assertTrue(p.haslayer(IPFIX))
1116 if p.haslayer(Template):
1117 ipfix.add_template(p.getlayer(Template))
1118 # verify events in data set
1120 if p.haslayer(Data):
1121 data = ipfix.decode_data_set(p.getlayer(Set))
1122 self.verify_ipfix_nat44_ses(data)
1124 def test_ipfix_addr_exhausted(self):
1125 """ S-NAT IPFIX logging NAT addresses exhausted """
1126 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1127 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1129 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1130 src_address=self.pg3.local_ip4n,
1132 template_interval=10)
1133 self.vapi.snat_ipfix()
1135 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1136 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1138 self.pg0.add_stream(p)
1139 self.pg_enable_capture(self.pg_interfaces)
1141 capture = self.pg1.get_capture(0)
1142 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1143 capture = self.pg3.get_capture(3)
1144 ipfix = IPFIXDecoder()
1145 # first load template
1147 self.assertTrue(p.haslayer(IPFIX))
1148 if p.haslayer(Template):
1149 ipfix.add_template(p.getlayer(Template))
1150 # verify events in data set
1152 if p.haslayer(Data):
1153 data = ipfix.decode_data_set(p.getlayer(Set))
1154 self.verify_ipfix_addr_exhausted(data)
1156 def test_pool_addr_fib(self):
1157 """ S-NAT add pool addresses to FIB """
1158 static_addr = '10.0.0.10'
1159 self.snat_add_address(self.snat_addr)
1160 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1161 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1163 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
1166 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1167 ARP(op=ARP.who_has, pdst=self.snat_addr,
1168 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1169 self.pg1.add_stream(p)
1170 self.pg_enable_capture(self.pg_interfaces)
1172 capture = self.pg1.get_capture(1)
1173 self.assertTrue(capture[0].haslayer(ARP))
1174 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1177 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1178 ARP(op=ARP.who_has, pdst=static_addr,
1179 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1180 self.pg1.add_stream(p)
1181 self.pg_enable_capture(self.pg_interfaces)
1183 capture = self.pg1.get_capture(1)
1184 self.assertTrue(capture[0].haslayer(ARP))
1185 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1187 # send ARP to non-SNAT interface
1188 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1189 ARP(op=ARP.who_has, pdst=self.snat_addr,
1190 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1191 self.pg2.add_stream(p)
1192 self.pg_enable_capture(self.pg_interfaces)
1194 capture = self.pg1.get_capture(0)
1196 # remove addresses and verify
1197 self.snat_add_address(self.snat_addr, is_add=0)
1198 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
1201 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1202 ARP(op=ARP.who_has, pdst=self.snat_addr,
1203 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1204 self.pg1.add_stream(p)
1205 self.pg_enable_capture(self.pg_interfaces)
1207 capture = self.pg1.get_capture(0)
1209 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1210 ARP(op=ARP.who_has, pdst=static_addr,
1211 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1212 self.pg1.add_stream(p)
1213 self.pg_enable_capture(self.pg_interfaces)
1215 capture = self.pg1.get_capture(0)
1217 def test_vrf_mode(self):
1218 """ S-NAT tenant VRF aware address pool mode """
1222 nat_ip1 = "10.0.0.10"
1223 nat_ip2 = "10.0.0.11"
1225 self.pg0.unconfig_ip4()
1226 self.pg1.unconfig_ip4()
1227 self.pg0.set_table_ip4(vrf_id1)
1228 self.pg1.set_table_ip4(vrf_id2)
1229 self.pg0.config_ip4()
1230 self.pg1.config_ip4()
1232 self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
1233 self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
1234 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1235 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1236 self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1240 pkts = self.create_stream_in(self.pg0, self.pg2)
1241 self.pg0.add_stream(pkts)
1242 self.pg_enable_capture(self.pg_interfaces)
1244 capture = self.pg2.get_capture(len(pkts))
1245 self.verify_capture_out(capture, nat_ip1)
1248 pkts = self.create_stream_in(self.pg1, self.pg2)
1249 self.pg1.add_stream(pkts)
1250 self.pg_enable_capture(self.pg_interfaces)
1252 capture = self.pg2.get_capture(len(pkts))
1253 self.verify_capture_out(capture, nat_ip2)
1255 def test_vrf_feature_independent(self):
1256 """ S-NAT tenant VRF independent address pool mode """
1258 nat_ip1 = "10.0.0.10"
1259 nat_ip2 = "10.0.0.11"
1261 self.snat_add_address(nat_ip1)
1262 self.snat_add_address(nat_ip2)
1263 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1264 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1265 self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1269 pkts = self.create_stream_in(self.pg0, self.pg2)
1270 self.pg0.add_stream(pkts)
1271 self.pg_enable_capture(self.pg_interfaces)
1273 capture = self.pg2.get_capture(len(pkts))
1274 self.verify_capture_out(capture, nat_ip1)
1277 pkts = self.create_stream_in(self.pg1, self.pg2)
1278 self.pg1.add_stream(pkts)
1279 self.pg_enable_capture(self.pg_interfaces)
1281 capture = self.pg2.get_capture(len(pkts))
1282 self.verify_capture_out(capture, nat_ip1)
1285 super(TestSNAT, self).tearDown()
1286 if not self.vpp_dead:
1287 self.logger.info(self.vapi.cli("show snat verbose"))
1291 class TestDeterministicNAT(MethodHolder):
1292 """ Deterministic NAT Test Cases """
1295 def setUpConstants(cls):
1296 super(TestDeterministicNAT, cls).setUpConstants()
1297 cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
1300 def setUpClass(cls):
1301 super(TestDeterministicNAT, cls).setUpClass()
1304 cls.tcp_port_in = 6303
1305 cls.tcp_port_out = 6303
1306 cls.udp_port_in = 6304
1307 cls.udp_port_out = 6304
1308 cls.icmp_id_in = 6305
1309 cls.snat_addr = '10.0.0.3'
1311 cls.create_pg_interfaces(range(2))
1312 cls.interfaces = list(cls.pg_interfaces)
1314 for i in cls.interfaces:
1319 cls.pg0.generate_remote_hosts(2)
1320 cls.pg0.configure_ipv4_neighbors()
1323 super(TestDeterministicNAT, cls).tearDownClass()
1326 def create_stream_in(self, in_if, out_if, ttl=64):
1328 Create packet stream for inside network
1330 :param in_if: Inside interface
1331 :param out_if: Outside interface
1332 :param ttl: TTL of generated packets
1336 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1337 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1338 TCP(sport=self.tcp_port_in, dport=self.tcp_port_out))
1342 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1343 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1344 UDP(sport=self.udp_port_in, dport=self.udp_port_out))
1348 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1349 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1350 ICMP(id=self.icmp_id_in, type='echo-request'))
1355 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
1357 Create packet stream for outside network
1359 :param out_if: Outside interface
1360 :param dst_ip: Destination IP address (Default use global SNAT address)
1361 :param ttl: TTL of generated packets
1364 dst_ip = self.snat_addr
1367 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1368 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1369 TCP(dport=self.tcp_external_port, sport=self.tcp_port_out))
1373 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1374 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1375 UDP(dport=self.udp_external_port, sport=self.udp_port_out))
1379 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1380 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1381 ICMP(id=self.icmp_external_id, type='echo-reply'))
1386 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
1388 Verify captured packets on outside network
1390 :param capture: Captured packets
1391 :param nat_ip: Translated IP address (Default use global SNAT address)
1392 :param same_port: Sorce port number is not translated (Default False)
1393 :param packet_num: Expected number of packets (Default 3)
1396 nat_ip = self.snat_addr
1397 self.assertEqual(packet_num, len(capture))
1398 for packet in capture:
1400 self.assertEqual(packet[IP].src, nat_ip)
1401 if packet.haslayer(TCP):
1402 self.tcp_external_port = packet[TCP].sport
1403 elif packet.haslayer(UDP):
1404 self.udp_external_port = packet[UDP].sport
1406 self.icmp_external_id = packet[ICMP].id
1408 self.logger.error(ppp("Unexpected or invalid packet "
1409 "(outside network):", packet))
1412 def initiate_tcp_session(self, in_if, out_if):
1414 Initiates TCP session
1416 :param in_if: Inside interface
1417 :param out_if: Outside interface
1420 # SYN packet in->out
1421 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1422 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1423 TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
1426 self.pg_enable_capture(self.pg_interfaces)
1428 capture = out_if.get_capture(1)
1430 self.tcp_external_port = p[TCP].sport
1432 # SYN + ACK packet out->in
1433 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
1434 IP(src=out_if.remote_ip4, dst=self.snat_addr) /
1435 TCP(sport=self.tcp_port_out, dport=self.tcp_external_port,
1437 out_if.add_stream(p)
1438 self.pg_enable_capture(self.pg_interfaces)
1440 in_if.get_capture(1)
1442 # ACK packet in->out
1443 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1444 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1445 TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
1448 self.pg_enable_capture(self.pg_interfaces)
1450 out_if.get_capture(1)
1453 self.logger.error("TCP 3 way handshake failed")
1456 def test_deterministic_mode(self):
1457 """ S-NAT run deterministic mode """
1458 in_addr = '172.16.255.0'
1459 out_addr = '172.17.255.50'
1460 in_addr_t = '172.16.255.20'
1461 in_addr_n = socket.inet_aton(in_addr)
1462 out_addr_n = socket.inet_aton(out_addr)
1463 in_addr_t_n = socket.inet_aton(in_addr_t)
1467 snat_config = self.vapi.snat_show_config()
1468 self.assertEqual(1, snat_config.deterministic)
1470 self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
1472 rep1 = self.vapi.snat_det_forward(in_addr_t_n)
1473 self.assertEqual(rep1.out_addr[:4], out_addr_n)
1474 rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
1475 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
1477 deterministic_mappings = self.vapi.snat_det_map_dump()
1478 self.assertEqual(len(deterministic_mappings), 1)
1479 dsm = deterministic_mappings[0]
1480 self.assertEqual(in_addr_n, dsm.in_addr[:4])
1481 self.assertEqual(in_plen, dsm.in_plen)
1482 self.assertEqual(out_addr_n, dsm.out_addr[:4])
1483 self.assertEqual(out_plen, dsm.out_plen)
1486 deterministic_mappings = self.vapi.snat_det_map_dump()
1487 self.assertEqual(len(deterministic_mappings), 0)
1489 def test_set_timeouts(self):
1490 """ Set deterministic NAT timeouts """
1491 timeouts_before = self.vapi.snat_det_get_timeouts()
1493 self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10,
1494 timeouts_before.tcp_established + 10,
1495 timeouts_before.tcp_transitory + 10,
1496 timeouts_before.icmp + 10)
1498 timeouts_after = self.vapi.snat_det_get_timeouts()
1500 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
1501 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
1502 self.assertNotEqual(timeouts_before.tcp_established,
1503 timeouts_after.tcp_established)
1504 self.assertNotEqual(timeouts_before.tcp_transitory,
1505 timeouts_after.tcp_transitory)
1507 def test_det_in(self):
1508 """ CGNAT translation test (TCP, UDP, ICMP) """
1510 nat_ip = "10.0.0.10"
1511 self.tcp_port_out = 6303
1512 self.udp_port_out = 6304
1513 self.icmp_id_in = 6305
1514 self.tcp_external_port = 7303
1515 self.udp_external_port = 7304
1516 self.icmp_id_out = 7305
1518 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1520 socket.inet_aton(nat_ip),
1522 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1523 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1527 pkts = self.create_stream_in(self.pg0, self.pg1)
1528 self.pg0.add_stream(pkts)
1529 self.pg_enable_capture(self.pg_interfaces)
1531 capture = self.pg1.get_capture(len(pkts))
1532 self.verify_capture_out(capture, nat_ip)
1535 pkts = self.create_stream_out(self.pg1, nat_ip)
1536 self.pg1.add_stream(pkts)
1537 self.pg_enable_capture(self.pg_interfaces)
1539 capture = self.pg0.get_capture(len(pkts))
1540 self.verify_capture_in(capture, self.pg0)
1542 def test_multiple_users(self):
1543 """ CGNAT multiple users """
1545 nat_ip = "10.0.0.10"
1549 host0 = self.pg0.remote_hosts[0]
1550 host1 = self.pg0.remote_hosts[1]
1552 self.vapi.snat_add_det_map(host0.ip4n,
1554 socket.inet_aton(nat_ip),
1556 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1557 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1561 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
1562 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
1563 TCP(sport=port_in, dport=port_out))
1564 self.pg0.add_stream(p)
1565 self.pg_enable_capture(self.pg_interfaces)
1567 capture = self.pg1.get_capture(1)
1572 self.assertEqual(ip.src, nat_ip)
1573 self.assertEqual(ip.dst, self.pg1.remote_ip4)
1574 self.assertEqual(tcp.dport, port_out)
1575 external_port0 = tcp.sport
1577 self.logger.error(ppp("Unexpected or invalid packet:", p))
1581 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
1582 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
1583 TCP(sport=port_in, dport=port_out))
1584 self.pg0.add_stream(p)
1585 self.pg_enable_capture(self.pg_interfaces)
1587 capture = self.pg1.get_capture(1)
1592 self.assertEqual(ip.src, nat_ip)
1593 self.assertEqual(ip.dst, self.pg1.remote_ip4)
1594 self.assertEqual(tcp.dport, port_out)
1595 external_port1 = tcp.sport
1597 self.logger.error(ppp("Unexpected or invalid packet:", p))
1600 dms = self.vapi.snat_det_map_dump()
1601 self.assertEqual(1, len(dms))
1602 self.assertEqual(2, dms[0].ses_num)
1605 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1606 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
1607 TCP(sport=port_out, dport=external_port0))
1608 self.pg1.add_stream(p)
1609 self.pg_enable_capture(self.pg_interfaces)
1611 capture = self.pg0.get_capture(1)
1616 self.assertEqual(ip.src, self.pg1.remote_ip4)
1617 self.assertEqual(ip.dst, host0.ip4)
1618 self.assertEqual(tcp.dport, port_in)
1619 self.assertEqual(tcp.sport, port_out)
1621 self.logger.error(ppp("Unexpected or invalid packet:", p))
1625 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1626 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
1627 TCP(sport=port_out, dport=external_port1))
1628 self.pg1.add_stream(p)
1629 self.pg_enable_capture(self.pg_interfaces)
1631 capture = self.pg0.get_capture(1)
1636 self.assertEqual(ip.src, self.pg1.remote_ip4)
1637 self.assertEqual(ip.dst, host1.ip4)
1638 self.assertEqual(tcp.dport, port_in)
1639 self.assertEqual(tcp.sport, port_out)
1641 self.logger.error(ppp("Unexpected or invalid packet", p))
1644 def test_tcp_session_close_detection_in(self):
1645 """ CGNAT TCP session close initiated from inside network """
1646 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1648 socket.inet_aton(self.snat_addr),
1650 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1651 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1654 self.initiate_tcp_session(self.pg0, self.pg1)
1656 # close the session from inside
1658 # FIN packet in -> out
1659 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1660 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1661 TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
1663 self.pg0.add_stream(p)
1664 self.pg_enable_capture(self.pg_interfaces)
1666 self.pg1.get_capture(1)
1670 # ACK packet out -> in
1671 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1672 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
1673 TCP(sport=self.tcp_port_out, dport=self.tcp_external_port,
1677 # FIN packet out -> in
1678 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1679 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
1680 TCP(sport=self.tcp_port_out, dport=self.tcp_external_port,
1684 self.pg1.add_stream(pkts)
1685 self.pg_enable_capture(self.pg_interfaces)
1687 self.pg0.get_capture(2)
1689 # ACK packet in -> out
1690 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1691 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1692 TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
1694 self.pg0.add_stream(p)
1695 self.pg_enable_capture(self.pg_interfaces)
1697 self.pg1.get_capture(1)
1699 # Check if snat closed the session
1700 dms = self.vapi.snat_det_map_dump()
1701 self.assertEqual(0, dms[0].ses_num)
1703 self.logger.error("TCP session termination failed")
1706 def test_tcp_session_close_detection_out(self):
1707 """ CGNAT TCP session close initiated from outside network """
1708 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1710 socket.inet_aton(self.snat_addr),
1712 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1713 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1716 self.initiate_tcp_session(self.pg0, self.pg1)
1718 # close the session from outside
1720 # FIN packet out -> in
1721 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1722 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
1723 TCP(sport=self.tcp_port_out, dport=self.tcp_external_port,
1725 self.pg1.add_stream(p)
1726 self.pg_enable_capture(self.pg_interfaces)
1728 self.pg0.get_capture(1)
1732 # ACK packet in -> out
1733 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1734 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1735 TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
1739 # ACK packet in -> out
1740 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1741 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1742 TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
1746 self.pg0.add_stream(pkts)
1747 self.pg_enable_capture(self.pg_interfaces)
1749 self.pg1.get_capture(2)
1751 # ACK packet out -> in
1752 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1753 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
1754 TCP(sport=self.tcp_port_out, dport=self.tcp_external_port,
1756 self.pg1.add_stream(p)
1757 self.pg_enable_capture(self.pg_interfaces)
1759 self.pg0.get_capture(1)
1761 # Check if snat closed the session
1762 dms = self.vapi.snat_det_map_dump()
1763 self.assertEqual(0, dms[0].ses_num)
1765 self.logger.error("TCP session termination failed")
1768 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1769 def test_session_timeout(self):
1770 """ CGNAT session timeouts """
1771 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1773 socket.inet_aton(self.snat_addr),
1775 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1776 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1779 self.initiate_tcp_session(self.pg0, self.pg1)
1780 self.vapi.snat_det_set_timeouts(5, 5, 5, 5)
1781 pkts = self.create_stream_in(self.pg0, self.pg1)
1782 self.pg0.add_stream(pkts)
1783 self.pg_enable_capture(self.pg_interfaces)
1785 capture = self.pg1.get_capture(len(pkts))
1788 dms = self.vapi.snat_det_map_dump()
1789 self.assertEqual(0, dms[0].ses_num)
1791 def test_session_limit_per_user(self):
1792 """ CGNAT maximum 1000 sessions per user should be created """
1793 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1795 socket.inet_aton(self.snat_addr),
1797 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1798 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1802 for port in range(1025, 2025):
1803 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1804 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1805 UDP(sport=port, dport=port))
1808 self.pg0.add_stream(pkts)
1809 self.pg_enable_capture(self.pg_interfaces)
1811 capture = self.pg1.get_capture(len(pkts))
1813 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1814 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1815 UDP(sport=3000, dport=3000))
1816 self.pg0.add_stream(p)
1817 self.pg_enable_capture(self.pg_interfaces)
1819 capture = self.pg1.assert_nothing_captured()
1821 dms = self.vapi.snat_det_map_dump()
1823 self.assertEqual(1000, dms[0].ses_num)
1825 def clear_snat(self):
1827 Clear SNAT configuration.
1829 self.vapi.snat_det_set_timeouts()
1830 deterministic_mappings = self.vapi.snat_det_map_dump()
1831 for dsm in deterministic_mappings:
1832 self.vapi.snat_add_det_map(dsm.in_addr,
1838 interfaces = self.vapi.snat_interface_dump()
1839 for intf in interfaces:
1840 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
1845 super(TestDeterministicNAT, self).tearDown()
1846 if not self.vpp_dead:
1847 self.logger.info(self.vapi.cli("show snat detail"))
1850 if __name__ == '__main__':
1851 unittest.main(testRunner=VppTestRunner)