SNAT: IPFIX logging (VPP-445)
[vpp.git] / test / test_snat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6
7 from framework import VppTestCase, VppTestRunner
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.l2 import Ether
10 from scapy.data import IP_PROTOS
11 from util import ppp
12 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
13
14
15 class TestSNAT(VppTestCase):
16     """ SNAT Test Cases """
17
18     @classmethod
19     def setUpClass(cls):
20         super(TestSNAT, cls).setUpClass()
21
22         try:
23             cls.tcp_port_in = 6303
24             cls.tcp_port_out = 6303
25             cls.udp_port_in = 6304
26             cls.udp_port_out = 6304
27             cls.icmp_id_in = 6305
28             cls.icmp_id_out = 6305
29             cls.snat_addr = '10.0.0.3'
30
31             cls.create_pg_interfaces(range(8))
32             cls.interfaces = list(cls.pg_interfaces[0:4])
33
34             for i in cls.interfaces:
35                 i.admin_up()
36                 i.config_ip4()
37                 i.resolve_arp()
38
39             cls.pg0.generate_remote_hosts(2)
40             cls.pg0.configure_ipv4_neighbors()
41
42             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
43
44             for i in cls.overlapping_interfaces:
45                 i._local_ip4 = "172.16.255.1"
46                 i._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
47                 i._remote_hosts[0]._ip4 = "172.16.255.2"
48                 i.set_table_ip4(i.sw_if_index)
49                 i.config_ip4()
50                 i.admin_up()
51                 i.resolve_arp()
52
53             cls.pg7.admin_up()
54
55         except Exception:
56             super(TestSNAT, cls).tearDownClass()
57             raise
58
59     def create_stream_in(self, in_if, out_if):
60         """
61         Create packet stream for inside network
62
63         :param in_if: Inside interface
64         :param out_if: Outside interface
65         """
66         pkts = []
67         # TCP
68         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
69              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
70              TCP(sport=self.tcp_port_in))
71         pkts.append(p)
72
73         # UDP
74         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
75              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
76              UDP(sport=self.udp_port_in))
77         pkts.append(p)
78
79         # ICMP
80         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
81              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
82              ICMP(id=self.icmp_id_in, type='echo-request'))
83         pkts.append(p)
84
85         return pkts
86
87     def create_stream_out(self, out_if, dst_ip=None):
88         """
89         Create packet stream for outside network
90
91         :param out_if: Outside interface
92         :param dst_ip: Destination IP address (Default use global SNAT address)
93         """
94         if dst_ip is None:
95             dst_ip = self.snat_addr
96         pkts = []
97         # TCP
98         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
99              IP(src=out_if.remote_ip4, dst=dst_ip) /
100              TCP(dport=self.tcp_port_out))
101         pkts.append(p)
102
103         # UDP
104         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
105              IP(src=out_if.remote_ip4, dst=dst_ip) /
106              UDP(dport=self.udp_port_out))
107         pkts.append(p)
108
109         # ICMP
110         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
111              IP(src=out_if.remote_ip4, dst=dst_ip) /
112              ICMP(id=self.icmp_id_out, type='echo-reply'))
113         pkts.append(p)
114
115         return pkts
116
117     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
118                            packet_num=3):
119         """
120         Verify captured packets on outside network
121
122         :param capture: Captured packets
123         :param nat_ip: Translated IP address (Default use global SNAT address)
124         :param same_port: Sorce port number is not translated (Default False)
125         :param packet_num: Expected number of packets (Default 3)
126         """
127         if nat_ip is None:
128             nat_ip = self.snat_addr
129         self.assertEqual(packet_num, len(capture))
130         for packet in capture:
131             try:
132                 self.assertEqual(packet[IP].src, nat_ip)
133                 if packet.haslayer(TCP):
134                     if same_port:
135                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
136                     else:
137                         self.assertNotEqual(
138                             packet[TCP].sport, self.tcp_port_in)
139                     self.tcp_port_out = packet[TCP].sport
140                 elif packet.haslayer(UDP):
141                     if same_port:
142                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
143                     else:
144                         self.assertNotEqual(
145                             packet[UDP].sport, self.udp_port_in)
146                     self.udp_port_out = packet[UDP].sport
147                 else:
148                     if same_port:
149                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
150                     else:
151                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
152                     self.icmp_id_out = packet[ICMP].id
153             except:
154                 self.logger.error(ppp("Unexpected or invalid packet "
155                                       "(outside network):", packet))
156                 raise
157
158     def verify_capture_in(self, capture, in_if, packet_num=3):
159         """
160         Verify captured packets on inside network
161
162         :param capture: Captured packets
163         :param in_if: Inside interface
164         :param packet_num: Expected number of packets (Default 3)
165         """
166         self.assertEqual(packet_num, len(capture))
167         for packet in capture:
168             try:
169                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
170                 if packet.haslayer(TCP):
171                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
172                 elif packet.haslayer(UDP):
173                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
174                 else:
175                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
176             except:
177                 self.logger.error(ppp("Unexpected or invalid packet "
178                                       "(inside network):", packet))
179                 raise
180
181     def verify_ipfix_nat44_ses(self, data):
182         """
183         Verify IPFIX NAT44 session create/delete event
184
185         :param data: Decoded IPFIX data records
186         """
187         nat44_ses_create_num = 0
188         nat44_ses_delete_num = 0
189         self.assertEqual(6, len(data))
190         for record in data:
191             # natEvent
192             self.assertIn(ord(record[230]), [4, 5])
193             if ord(record[230]) == 4:
194                 nat44_ses_create_num += 1
195             else:
196                 nat44_ses_delete_num += 1
197             # sourceIPv4Address
198             self.assertEqual(self.pg0.remote_ip4n, record[8])
199             # postNATSourceIPv4Address
200             self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
201                              record[225])
202             # ingressVRFID
203             self.assertEqual(struct.pack("!I", 0), record[234])
204             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
205             if IP_PROTOS.icmp == ord(record[4]):
206                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
207                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
208                                  record[227])
209             elif IP_PROTOS.tcp == ord(record[4]):
210                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
211                                  record[7])
212                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
213                                  record[227])
214             elif IP_PROTOS.udp == ord(record[4]):
215                 self.assertEqual(struct.pack("!H", self.udp_port_in),
216                                  record[7])
217                 self.assertEqual(struct.pack("!H", self.udp_port_out),
218                                  record[227])
219             else:
220                 self.fail("Invalid protocol")
221         self.assertEqual(3, nat44_ses_create_num)
222         self.assertEqual(3, nat44_ses_delete_num)
223
224     def verify_ipfix_addr_exhausted(self, data):
225         """
226         Verify IPFIX NAT addresses event
227
228         :param data: Decoded IPFIX data records
229         """
230         self.assertEqual(1, len(data))
231         record = data[0]
232         # natEvent
233         self.assertEqual(ord(record[230]), 3)
234         # natPoolID
235         self.assertEqual(struct.pack("!I", 0), record[283])
236
237     def clear_snat(self):
238         """
239         Clear SNAT configuration.
240         """
241         interfaces = self.vapi.snat_interface_addr_dump()
242         for intf in interfaces:
243             self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
244
245         self.vapi.snat_ipfix(enable=0)
246
247         interfaces = self.vapi.snat_interface_dump()
248         for intf in interfaces:
249             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
250                                                      intf.is_inside,
251                                                      is_add=0)
252
253         static_mappings = self.vapi.snat_static_mapping_dump()
254         for sm in static_mappings:
255             self.vapi.snat_add_static_mapping(sm.local_ip_address,
256                                               sm.external_ip_address,
257                                               local_port=sm.local_port,
258                                               external_port=sm.external_port,
259                                               addr_only=sm.addr_only,
260                                               vrf_id=sm.vrf_id,
261                                               is_add=0)
262
263         adresses = self.vapi.snat_address_dump()
264         for addr in adresses:
265             self.vapi.snat_add_address_range(addr.ip_address,
266                                              addr.ip_address,
267                                              is_add=0)
268
269     def snat_add_static_mapping(self, local_ip, external_ip, local_port=0,
270                                 external_port=0, vrf_id=0, is_add=1):
271         """
272         Add/delete S-NAT static mapping
273
274         :param local_ip: Local IP address
275         :param external_ip: External IP address
276         :param local_port: Local port number (Optional)
277         :param external_port: External port number (Optional)
278         :param vrf_id: VRF ID (Default 0)
279         :param is_add: 1 if add, 0 if delete (Default add)
280         """
281         addr_only = 1
282         if local_port and external_port:
283             addr_only = 0
284         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
285         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
286         self.vapi.snat_add_static_mapping(
287             l_ip,
288             e_ip,
289             local_port,
290             external_port,
291             addr_only,
292             vrf_id,
293             is_add)
294
295     def snat_add_address(self, ip, is_add=1):
296         """
297         Add/delete S-NAT address
298
299         :param ip: IP address
300         :param is_add: 1 if add, 0 if delete (Default add)
301         """
302         snat_addr = socket.inet_pton(socket.AF_INET, ip)
303         self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add)
304
305     def test_dynamic(self):
306         """ SNAT dynamic translation test """
307
308         self.snat_add_address(self.snat_addr)
309         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
310         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
311                                                  is_inside=0)
312
313         # in2out
314         pkts = self.create_stream_in(self.pg0, self.pg1)
315         self.pg0.add_stream(pkts)
316         self.pg_enable_capture(self.pg_interfaces)
317         self.pg_start()
318         capture = self.pg1.get_capture(len(pkts))
319         self.verify_capture_out(capture)
320
321         # out2in
322         pkts = self.create_stream_out(self.pg1)
323         self.pg1.add_stream(pkts)
324         self.pg_enable_capture(self.pg_interfaces)
325         self.pg_start()
326         capture = self.pg0.get_capture(len(pkts))
327         self.verify_capture_in(capture, self.pg0)
328
329     def test_static_in(self):
330         """ SNAT 1:1 NAT initialized from inside network """
331
332         nat_ip = "10.0.0.10"
333         self.tcp_port_out = 6303
334         self.udp_port_out = 6304
335         self.icmp_id_out = 6305
336
337         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
338         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
339         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
340                                                  is_inside=0)
341
342         # in2out
343         pkts = self.create_stream_in(self.pg0, self.pg1)
344         self.pg0.add_stream(pkts)
345         self.pg_enable_capture(self.pg_interfaces)
346         self.pg_start()
347         capture = self.pg1.get_capture(len(pkts))
348         self.verify_capture_out(capture, nat_ip, True)
349
350         # out2in
351         pkts = self.create_stream_out(self.pg1, nat_ip)
352         self.pg1.add_stream(pkts)
353         self.pg_enable_capture(self.pg_interfaces)
354         self.pg_start()
355         capture = self.pg0.get_capture(len(pkts))
356         self.verify_capture_in(capture, self.pg0)
357
358     def test_static_out(self):
359         """ SNAT 1:1 NAT initialized from outside network """
360
361         nat_ip = "10.0.0.20"
362         self.tcp_port_out = 6303
363         self.udp_port_out = 6304
364         self.icmp_id_out = 6305
365
366         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
367         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
368         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
369                                                  is_inside=0)
370
371         # out2in
372         pkts = self.create_stream_out(self.pg1, nat_ip)
373         self.pg1.add_stream(pkts)
374         self.pg_enable_capture(self.pg_interfaces)
375         self.pg_start()
376         capture = self.pg0.get_capture(len(pkts))
377         self.verify_capture_in(capture, self.pg0)
378
379         # in2out
380         pkts = self.create_stream_in(self.pg0, self.pg1)
381         self.pg0.add_stream(pkts)
382         self.pg_enable_capture(self.pg_interfaces)
383         self.pg_start()
384         capture = self.pg1.get_capture(len(pkts))
385         self.verify_capture_out(capture, nat_ip, True)
386
387     def test_static_with_port_in(self):
388         """ SNAT 1:1 NAT with port initialized from inside network """
389
390         self.tcp_port_out = 3606
391         self.udp_port_out = 3607
392         self.icmp_id_out = 3608
393
394         self.snat_add_address(self.snat_addr)
395         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
396                                      self.tcp_port_in, self.tcp_port_out)
397         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
398                                      self.udp_port_in, self.udp_port_out)
399         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
400                                      self.icmp_id_in, self.icmp_id_out)
401         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
402         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
403                                                  is_inside=0)
404
405         # in2out
406         pkts = self.create_stream_in(self.pg0, self.pg1)
407         self.pg0.add_stream(pkts)
408         self.pg_enable_capture(self.pg_interfaces)
409         self.pg_start()
410         capture = self.pg1.get_capture(len(pkts))
411         self.verify_capture_out(capture)
412
413         # out2in
414         pkts = self.create_stream_out(self.pg1)
415         self.pg1.add_stream(pkts)
416         self.pg_enable_capture(self.pg_interfaces)
417         self.pg_start()
418         capture = self.pg0.get_capture(len(pkts))
419         self.verify_capture_in(capture, self.pg0)
420
421     def test_static_with_port_out(self):
422         """ SNAT 1:1 NAT with port initialized from outside network """
423
424         self.tcp_port_out = 30606
425         self.udp_port_out = 30607
426         self.icmp_id_out = 30608
427
428         self.snat_add_address(self.snat_addr)
429         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
430                                      self.tcp_port_in, self.tcp_port_out)
431         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
432                                      self.udp_port_in, self.udp_port_out)
433         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
434                                      self.icmp_id_in, self.icmp_id_out)
435         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
436         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
437                                                  is_inside=0)
438
439         # out2in
440         pkts = self.create_stream_out(self.pg1)
441         self.pg1.add_stream(pkts)
442         self.pg_enable_capture(self.pg_interfaces)
443         self.pg_start()
444         capture = self.pg0.get_capture(len(pkts))
445         self.verify_capture_in(capture, self.pg0)
446
447         # in2out
448         pkts = self.create_stream_in(self.pg0, self.pg1)
449         self.pg0.add_stream(pkts)
450         self.pg_enable_capture(self.pg_interfaces)
451         self.pg_start()
452         capture = self.pg1.get_capture(len(pkts))
453         self.verify_capture_out(capture)
454
455     def test_static_vrf_aware(self):
456         """ SNAT 1:1 NAT VRF awareness """
457
458         nat_ip1 = "10.0.0.30"
459         nat_ip2 = "10.0.0.40"
460         self.tcp_port_out = 6303
461         self.udp_port_out = 6304
462         self.icmp_id_out = 6305
463
464         self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
465                                      vrf_id=self.pg4.sw_if_index)
466         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
467                                      vrf_id=self.pg4.sw_if_index)
468         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
469                                                  is_inside=0)
470         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
471         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
472
473         # inside interface VRF match SNAT static mapping VRF
474         pkts = self.create_stream_in(self.pg4, self.pg3)
475         self.pg4.add_stream(pkts)
476         self.pg_enable_capture(self.pg_interfaces)
477         self.pg_start()
478         capture = self.pg3.get_capture(len(pkts))
479         self.verify_capture_out(capture, nat_ip1, True)
480
481         # inside interface VRF don't match SNAT static mapping VRF (packets
482         # are dropped)
483         pkts = self.create_stream_in(self.pg0, self.pg3)
484         self.pg0.add_stream(pkts)
485         self.pg_enable_capture(self.pg_interfaces)
486         self.pg_start()
487         self.pg3.assert_nothing_captured()
488
489     def test_multiple_inside_interfaces(self):
490         """
491         SNAT multiple inside interfaces with non-overlapping address space
492         """
493
494         self.snat_add_address(self.snat_addr)
495         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
496         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
497         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index)
498         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
499                                                  is_inside=0)
500
501         # in2out 1st interface
502         pkts = self.create_stream_in(self.pg0, self.pg3)
503         self.pg0.add_stream(pkts)
504         self.pg_enable_capture(self.pg_interfaces)
505         self.pg_start()
506         capture = self.pg3.get_capture(len(pkts))
507         self.verify_capture_out(capture)
508
509         # out2in 1st interface
510         pkts = self.create_stream_out(self.pg3)
511         self.pg3.add_stream(pkts)
512         self.pg_enable_capture(self.pg_interfaces)
513         self.pg_start()
514         capture = self.pg0.get_capture(len(pkts))
515         self.verify_capture_in(capture, self.pg0)
516
517         # in2out 2nd interface
518         pkts = self.create_stream_in(self.pg1, self.pg3)
519         self.pg1.add_stream(pkts)
520         self.pg_enable_capture(self.pg_interfaces)
521         self.pg_start()
522         capture = self.pg3.get_capture(len(pkts))
523         self.verify_capture_out(capture)
524
525         # out2in 2nd interface
526         pkts = self.create_stream_out(self.pg3)
527         self.pg3.add_stream(pkts)
528         self.pg_enable_capture(self.pg_interfaces)
529         self.pg_start()
530         capture = self.pg1.get_capture(len(pkts))
531         self.verify_capture_in(capture, self.pg1)
532
533         # in2out 3rd interface
534         pkts = self.create_stream_in(self.pg2, self.pg3)
535         self.pg2.add_stream(pkts)
536         self.pg_enable_capture(self.pg_interfaces)
537         self.pg_start()
538         capture = self.pg3.get_capture(len(pkts))
539         self.verify_capture_out(capture)
540
541         # out2in 3rd interface
542         pkts = self.create_stream_out(self.pg3)
543         self.pg3.add_stream(pkts)
544         self.pg_enable_capture(self.pg_interfaces)
545         self.pg_start()
546         capture = self.pg2.get_capture(len(pkts))
547         self.verify_capture_in(capture, self.pg2)
548
549     def test_inside_overlapping_interfaces(self):
550         """ SNAT multiple inside interfaces with overlapping address space """
551
552         self.snat_add_address(self.snat_addr)
553         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
554                                                  is_inside=0)
555         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
556         self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
557         self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
558
559         # in2out 1st interface
560         pkts = self.create_stream_in(self.pg4, self.pg3)
561         self.pg4.add_stream(pkts)
562         self.pg_enable_capture(self.pg_interfaces)
563         self.pg_start()
564         capture = self.pg3.get_capture(len(pkts))
565         self.verify_capture_out(capture)
566
567         # out2in 1st interface
568         pkts = self.create_stream_out(self.pg3)
569         self.pg3.add_stream(pkts)
570         self.pg_enable_capture(self.pg_interfaces)
571         self.pg_start()
572         capture = self.pg4.get_capture(len(pkts))
573         self.verify_capture_in(capture, self.pg4)
574
575         # in2out 2nd interface
576         pkts = self.create_stream_in(self.pg5, self.pg3)
577         self.pg5.add_stream(pkts)
578         self.pg_enable_capture(self.pg_interfaces)
579         self.pg_start()
580         capture = self.pg3.get_capture(len(pkts))
581         self.verify_capture_out(capture)
582
583         # out2in 2nd interface
584         pkts = self.create_stream_out(self.pg3)
585         self.pg3.add_stream(pkts)
586         self.pg_enable_capture(self.pg_interfaces)
587         self.pg_start()
588         capture = self.pg5.get_capture(len(pkts))
589         self.verify_capture_in(capture, self.pg5)
590
591         # in2out 3rd interface
592         pkts = self.create_stream_in(self.pg6, self.pg3)
593         self.pg6.add_stream(pkts)
594         self.pg_enable_capture(self.pg_interfaces)
595         self.pg_start()
596         capture = self.pg3.get_capture(len(pkts))
597         self.verify_capture_out(capture)
598
599         # out2in 3rd interface
600         pkts = self.create_stream_out(self.pg3)
601         self.pg3.add_stream(pkts)
602         self.pg_enable_capture(self.pg_interfaces)
603         self.pg_start()
604         capture = self.pg6.get_capture(len(pkts))
605         self.verify_capture_in(capture, self.pg6)
606
607     def test_hairpinning(self):
608         """ SNAT hairpinning """
609
610         host = self.pg0.remote_hosts[0]
611         server = self.pg0.remote_hosts[1]
612         host_in_port = 1234
613         host_out_port = 0
614         server_in_port = 5678
615         server_out_port = 8765
616
617         self.snat_add_address(self.snat_addr)
618         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
619         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
620                                                  is_inside=0)
621         # add static mapping for server
622         self.snat_add_static_mapping(server.ip4, self.snat_addr,
623                                      server_in_port, server_out_port)
624
625         # send packet from host to server
626         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
627              IP(src=host.ip4, dst=self.snat_addr) /
628              TCP(sport=host_in_port, dport=server_out_port))
629         self.pg0.add_stream(p)
630         self.pg_enable_capture(self.pg_interfaces)
631         self.pg_start()
632         capture = self.pg0.get_capture(1)
633         p = capture[0]
634         try:
635             ip = p[IP]
636             tcp = p[TCP]
637             self.assertEqual(ip.src, self.snat_addr)
638             self.assertEqual(ip.dst, server.ip4)
639             self.assertNotEqual(tcp.sport, host_in_port)
640             self.assertEqual(tcp.dport, server_in_port)
641             host_out_port = tcp.sport
642         except:
643             self.logger.error(ppp("Unexpected or invalid packet:", p))
644             raise
645
646         # send reply from server to host
647         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
648              IP(src=server.ip4, dst=self.snat_addr) /
649              TCP(sport=server_in_port, dport=host_out_port))
650         self.pg0.add_stream(p)
651         self.pg_enable_capture(self.pg_interfaces)
652         self.pg_start()
653         capture = self.pg0.get_capture(1)
654         p = capture[0]
655         try:
656             ip = p[IP]
657             tcp = p[TCP]
658             self.assertEqual(ip.src, self.snat_addr)
659             self.assertEqual(ip.dst, host.ip4)
660             self.assertEqual(tcp.sport, server_out_port)
661             self.assertEqual(tcp.dport, host_in_port)
662         except:
663             self.logger.error(ppp("Unexpected or invalid packet:"), p)
664             raise
665
666     def test_max_translations_per_user(self):
667         """ MAX translations per user - recycle the least recently used """
668
669         self.snat_add_address(self.snat_addr)
670         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
671         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
672                                                  is_inside=0)
673
674         # get maximum number of translations per user
675         snat_config = self.vapi.snat_show_config()
676
677         # send more than maximum number of translations per user packets
678         pkts_num = snat_config.max_translations_per_user + 5
679         pkts = []
680         for port in range(0, pkts_num):
681             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
682                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
683                  TCP(sport=1025 + port))
684             pkts.append(p)
685         self.pg0.add_stream(pkts)
686         self.pg_enable_capture(self.pg_interfaces)
687         self.pg_start()
688
689         # verify number of translated packet
690         self.pg1.get_capture(pkts_num)
691
692     def test_interface_addr(self):
693         """ Acquire SNAT addresses from interface """
694         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
695
696         # no address in NAT pool
697         adresses = self.vapi.snat_address_dump()
698         self.assertEqual(0, len(adresses))
699
700         # configure interface address and check NAT address pool
701         self.pg7.config_ip4()
702         adresses = self.vapi.snat_address_dump()
703         self.assertEqual(1, len(adresses))
704
705         # remove interface address and check NAT address pool
706         self.pg7.unconfig_ip4()
707         adresses = self.vapi.snat_address_dump()
708         self.assertEqual(0, len(adresses))
709
710     def test_ipfix_nat44_sess(self):
711         """ S-NAT IPFIX logging NAT44 session created/delted """
712         self.snat_add_address(self.snat_addr)
713         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
714         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
715                                                  is_inside=0)
716         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
717                                      src_address=self.pg3.local_ip4n,
718                                      path_mtu=512,
719                                      template_interval=10)
720         self.vapi.snat_ipfix()
721
722         pkts = self.create_stream_in(self.pg0, self.pg1)
723         self.pg0.add_stream(pkts)
724         self.pg_enable_capture(self.pg_interfaces)
725         self.pg_start()
726         capture = self.pg1.get_capture(len(pkts))
727         self.verify_capture_out(capture)
728         self.snat_add_address(self.snat_addr, is_add=0)
729         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
730         capture = self.pg3.get_capture(3)
731         ipfix = IPFIXDecoder()
732         # first load template
733         for p in capture:
734             self.assertTrue(p.haslayer(IPFIX))
735             if p.haslayer(Template):
736                 ipfix.add_template(p.getlayer(Template))
737         # verify events in data set
738         for p in capture:
739             if p.haslayer(Data):
740                 data = ipfix.decode_data_set(p.getlayer(Set))
741                 self.verify_ipfix_nat44_ses(data)
742
743     def test_ipfix_addr_exhausted(self):
744         """ S-NAT IPFIX logging NAT addresses exhausted """
745         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
746         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
747                                                  is_inside=0)
748         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
749                                      src_address=self.pg3.local_ip4n,
750                                      path_mtu=512,
751                                      template_interval=10)
752         self.vapi.snat_ipfix()
753
754         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
755              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
756              TCP(sport=3025))
757         self.pg0.add_stream(p)
758         self.pg_enable_capture(self.pg_interfaces)
759         self.pg_start()
760         capture = self.pg1.get_capture(0)
761         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
762         capture = self.pg3.get_capture(3)
763         ipfix = IPFIXDecoder()
764         # first load template
765         for p in capture:
766             self.assertTrue(p.haslayer(IPFIX))
767             if p.haslayer(Template):
768                 ipfix.add_template(p.getlayer(Template))
769         # verify events in data set
770         for p in capture:
771             if p.haslayer(Data):
772                 data = ipfix.decode_data_set(p.getlayer(Set))
773                 self.verify_ipfix_addr_exhausted(data)
774
775     def tearDown(self):
776         super(TestSNAT, self).tearDown()
777         if not self.vpp_dead:
778             self.logger.info(self.vapi.cli("show snat verbose"))
779             self.clear_snat()
780
781
782 if __name__ == '__main__':
783     unittest.main(testRunner=VppTestRunner)