hs-test: fixed timed out tests passing in the CI
[vpp.git] / test / test_det44.py
1 #!/usr/bin/env python3
2
3 import socket
4 import struct
5 import unittest
6 import scapy.compat
7 from time import sleep
8 from config import config
9 from framework import VppTestCase
10 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, UDPerror
13 from scapy.layers.l2 import Ether
14 from util import ppp
15
16
17 class TestDET44(VppTestCase):
18     """Deterministic NAT Test Cases"""
19
20     @classmethod
21     def setUpClass(cls):
22         super(TestDET44, cls).setUpClass()
23         cls.vapi.cli("set log class det44 level debug")
24
25         cls.tcp_port_in = 6303
26         cls.tcp_external_port = 6303
27         cls.udp_port_in = 6304
28         cls.udp_external_port = 6304
29         cls.icmp_id_in = 6305
30         cls.nat_addr = "10.0.0.3"
31
32         cls.create_pg_interfaces(range(3))
33         cls.interfaces = list(cls.pg_interfaces)
34
35         for i in cls.interfaces:
36             i.admin_up()
37             i.config_ip4()
38             i.resolve_arp()
39
40         cls.pg0.generate_remote_hosts(2)
41         cls.pg0.configure_ipv4_neighbors()
42
43     @classmethod
44     def tearDownClass(cls):
45         super(TestDET44, cls).tearDownClass()
46
47     def setUp(self):
48         super(TestDET44, self).setUp()
49         self.vapi.det44_plugin_enable_disable(enable=1)
50
51     def tearDown(self):
52         super(TestDET44, self).tearDown()
53         if not self.vpp_dead:
54             self.vapi.det44_plugin_enable_disable(enable=0)
55
56     def show_commands_at_teardown(self):
57         self.logger.info(self.vapi.cli("show det44 interfaces"))
58         self.logger.info(self.vapi.cli("show det44 timeouts"))
59         self.logger.info(self.vapi.cli("show det44 mappings"))
60         self.logger.info(self.vapi.cli("show det44 sessions"))
61
62     def verify_capture_in(self, capture, in_if):
63         """
64         Verify captured packets on inside network
65
66         :param capture: Captured packets
67         :param in_if: Inside interface
68         """
69         fired = False
70         for packet in capture:
71             try:
72                 self.assert_packet_checksums_valid(packet)
73                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
74                 if packet.haslayer(TCP):
75                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
76                 elif packet.haslayer(UDP):
77                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
78                 else:
79                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
80             except:
81                 fired = True
82                 self.logger.error(
83                     ppp("Unexpected or invalid packet (inside network):", packet)
84                 )
85         if fired:
86             raise
87
88     def verify_ipfix_max_entries_per_user(self, data, limit, src_addr):
89         """
90         Verify IPFIX maximum entries per user exceeded event
91
92         :param data: Decoded IPFIX data records
93         :param limit: Number of maximum entries per user
94         :param src_addr: IPv4 source address
95         """
96         self.assertEqual(1, len(data))
97         record = data[0]
98         # natEvent
99         self.assertEqual(scapy.compat.orb(record[230]), 13)
100         # natQuotaExceededEvent
101         self.assertEqual(struct.pack("!I", 3), record[466])
102         # maxEntriesPerUser
103         self.assertEqual(struct.pack("!I", limit), record[473])
104         # sourceIPv4Address
105         self.assertEqual(socket.inet_pton(socket.AF_INET, src_addr), record[8])
106
107     def initiate_tcp_session(self, in_if, out_if):
108         """
109         Initiates TCP session 3 WAY HAND SHAKE
110
111         :param in_if: Inside interface
112         :param out_if: Outside interface
113         """
114
115         # SYN packet in->out
116         p = (
117             Ether(src=in_if.remote_mac, dst=in_if.local_mac)
118             / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4)
119             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="S")
120         )
121         in_if.add_stream(p)
122         self.pg_enable_capture(self.pg_interfaces)
123         self.pg_start()
124         capture = out_if.get_capture(1)
125         p = capture[0]
126         self.tcp_port_out = p[TCP].sport
127
128         # SYN + ACK packet out->in
129         p = (
130             Ether(src=out_if.remote_mac, dst=out_if.local_mac)
131             / IP(src=out_if.remote_ip4, dst=self.nat_addr)
132             / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="SA")
133         )
134         out_if.add_stream(p)
135         self.pg_enable_capture(self.pg_interfaces)
136         self.pg_start()
137         in_if.get_capture(1)
138
139         # ACK packet in->out
140         p = (
141             Ether(src=in_if.remote_mac, dst=in_if.local_mac)
142             / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4)
143             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="A")
144         )
145         in_if.add_stream(p)
146         self.pg_enable_capture(self.pg_interfaces)
147         self.pg_start()
148         out_if.get_capture(1)
149
150     def create_stream_in(self, in_if, out_if, ttl=64):
151         """
152         Create packet stream for inside network
153
154         :param in_if: Inside interface
155         :param out_if: Outside interface
156         :param ttl: TTL of generated packets
157         """
158         pkts = []
159         # TCP
160         p = (
161             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
162             / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl)
163             / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)
164         )
165         pkts.append(p)
166
167         # UDP
168         p = (
169             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
170             / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl)
171             / UDP(sport=self.udp_port_in, dport=self.udp_external_port)
172         )
173         pkts.append(p)
174
175         # ICMP
176         p = (
177             Ether(dst=in_if.local_mac, src=in_if.remote_mac)
178             / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl)
179             / ICMP(id=self.icmp_id_in, type="echo-request")
180         )
181         pkts.append(p)
182
183         return pkts
184
185     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
186         """
187         Create packet stream for outside network
188
189         :param out_if: Outside interface
190         :param dst_ip: Destination IP address (Default use global NAT address)
191         :param ttl: TTL of generated packets
192         """
193         if dst_ip is None:
194             dst_ip = self.nat_addr
195         pkts = []
196         # TCP
197         p = (
198             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
199             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
200             / TCP(dport=self.tcp_port_out, sport=self.tcp_external_port)
201         )
202         pkts.append(p)
203
204         # UDP
205         p = (
206             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
207             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
208             / UDP(dport=self.udp_port_out, sport=self.udp_external_port)
209         )
210         pkts.append(p)
211
212         # ICMP
213         p = (
214             Ether(dst=out_if.local_mac, src=out_if.remote_mac)
215             / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
216             / ICMP(id=self.icmp_external_id, type="echo-reply")
217         )
218         pkts.append(p)
219
220         return pkts
221
222     def verify_capture_out(self, capture, nat_ip=None):
223         """
224         Verify captured packets on outside network
225
226         :param capture: Captured packets
227         :param nat_ip: Translated IP address (Default use global NAT address)
228         :param same_port: Source port number is not translated (Default False)
229         """
230         if nat_ip is None:
231             nat_ip = self.nat_addr
232         for packet in capture:
233             try:
234                 self.assertEqual(packet[IP].src, nat_ip)
235                 if packet.haslayer(TCP):
236                     self.tcp_port_out = packet[TCP].sport
237                 elif packet.haslayer(UDP):
238                     self.udp_port_out = packet[UDP].sport
239                 else:
240                     self.icmp_external_id = packet[ICMP].id
241             except:
242                 self.logger.error(
243                     ppp("Unexpected or invalid packet (outside network):", packet)
244                 )
245                 raise
246
247     def test_deterministic_mode(self):
248         """NAT plugin run deterministic mode"""
249         in_addr = "172.16.255.0"
250         out_addr = "172.17.255.50"
251         in_addr_t = "172.16.255.20"
252         in_plen = 24
253         out_plen = 32
254
255         self.vapi.det44_add_del_map(
256             is_add=1,
257             in_addr=in_addr,
258             in_plen=in_plen,
259             out_addr=out_addr,
260             out_plen=out_plen,
261         )
262
263         rep1 = self.vapi.det44_forward(in_addr_t)
264         self.assertEqual(str(rep1.out_addr), out_addr)
265         rep2 = self.vapi.det44_reverse(rep1.out_port_hi, out_addr)
266
267         self.assertEqual(str(rep2.in_addr), in_addr_t)
268
269         deterministic_mappings = self.vapi.det44_map_dump()
270         self.assertEqual(len(deterministic_mappings), 1)
271         dsm = deterministic_mappings[0]
272         self.assertEqual(in_addr, str(dsm.in_addr))
273         self.assertEqual(in_plen, dsm.in_plen)
274         self.assertEqual(out_addr, str(dsm.out_addr))
275         self.assertEqual(out_plen, dsm.out_plen)
276
277     def test_set_timeouts(self):
278         """Set deterministic NAT timeouts"""
279         timeouts_before = self.vapi.det44_get_timeouts()
280
281         self.vapi.det44_set_timeouts(
282             udp=timeouts_before.udp + 10,
283             tcp_established=timeouts_before.tcp_established + 10,
284             tcp_transitory=timeouts_before.tcp_transitory + 10,
285             icmp=timeouts_before.icmp + 10,
286         )
287
288         timeouts_after = self.vapi.det44_get_timeouts()
289
290         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
291         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
292         self.assertNotEqual(
293             timeouts_before.tcp_established, timeouts_after.tcp_established
294         )
295         self.assertNotEqual(
296             timeouts_before.tcp_transitory, timeouts_after.tcp_transitory
297         )
298
299     def test_in(self):
300         """DET44 translation test (TCP, UDP, ICMP)"""
301
302         nat_ip = "10.0.0.10"
303
304         self.vapi.det44_add_del_map(
305             is_add=1,
306             in_addr=self.pg0.remote_ip4,
307             in_plen=32,
308             out_addr=socket.inet_aton(nat_ip),
309             out_plen=32,
310         )
311
312         self.vapi.det44_interface_add_del_feature(
313             sw_if_index=self.pg0.sw_if_index, is_add=1, is_inside=1
314         )
315         self.vapi.det44_interface_add_del_feature(
316             sw_if_index=self.pg1.sw_if_index, is_add=1, is_inside=0
317         )
318
319         # in2out
320         pkts = self.create_stream_in(self.pg0, self.pg1)
321         self.pg0.add_stream(pkts)
322         self.pg_enable_capture(self.pg_interfaces)
323         self.pg_start()
324         capture = self.pg1.get_capture(len(pkts))
325         self.verify_capture_out(capture, nat_ip)
326
327         # out2in
328         pkts = self.create_stream_out(self.pg1, nat_ip)
329         self.pg1.add_stream(pkts)
330         self.pg_enable_capture(self.pg_interfaces)
331         self.pg_start()
332         capture = self.pg0.get_capture(len(pkts))
333         self.verify_capture_in(capture, self.pg0)
334
335         # session dump test
336         sessions = self.vapi.det44_session_dump(self.pg0.remote_ip4)
337         self.assertEqual(len(sessions), 3)
338
339         # TCP session
340         s = sessions[0]
341         self.assertEqual(str(s.ext_addr), self.pg1.remote_ip4)
342         self.assertEqual(s.in_port, self.tcp_port_in)
343         self.assertEqual(s.out_port, self.tcp_port_out)
344         self.assertEqual(s.ext_port, self.tcp_external_port)
345
346         # UDP session
347         s = sessions[1]
348         self.assertEqual(str(s.ext_addr), self.pg1.remote_ip4)
349         self.assertEqual(s.in_port, self.udp_port_in)
350         self.assertEqual(s.out_port, self.udp_port_out)
351         self.assertEqual(s.ext_port, self.udp_external_port)
352
353         # ICMP session
354         s = sessions[2]
355         self.assertEqual(str(s.ext_addr), self.pg1.remote_ip4)
356         self.assertEqual(s.in_port, self.icmp_id_in)
357         self.assertEqual(s.out_port, self.icmp_external_id)
358
359     def test_multiple_users(self):
360         """Deterministic NAT multiple users"""
361
362         nat_ip = "10.0.0.10"
363         port_in = 80
364         external_port = 6303
365
366         host0 = self.pg0.remote_hosts[0]
367         host1 = self.pg0.remote_hosts[1]
368
369         self.vapi.det44_add_del_map(
370             is_add=1,
371             in_addr=host0.ip4,
372             in_plen=24,
373             out_addr=socket.inet_aton(nat_ip),
374             out_plen=32,
375         )
376         self.vapi.det44_interface_add_del_feature(
377             sw_if_index=self.pg0.sw_if_index, is_add=1, is_inside=1
378         )
379         self.vapi.det44_interface_add_del_feature(
380             sw_if_index=self.pg1.sw_if_index, is_add=1, is_inside=0
381         )
382
383         # host0 to out
384         p = (
385             Ether(src=host0.mac, dst=self.pg0.local_mac)
386             / IP(src=host0.ip4, dst=self.pg1.remote_ip4)
387             / TCP(sport=port_in, dport=external_port)
388         )
389         self.pg0.add_stream(p)
390         self.pg_enable_capture(self.pg_interfaces)
391         self.pg_start()
392         capture = self.pg1.get_capture(1)
393         p = capture[0]
394         try:
395             ip = p[IP]
396             tcp = p[TCP]
397             self.assertEqual(ip.src, nat_ip)
398             self.assertEqual(ip.dst, self.pg1.remote_ip4)
399             self.assertEqual(tcp.dport, external_port)
400             port_out0 = tcp.sport
401         except:
402             self.logger.error(ppp("Unexpected or invalid packet:", p))
403             raise
404
405         # host1 to out
406         p = (
407             Ether(src=host1.mac, dst=self.pg0.local_mac)
408             / IP(src=host1.ip4, dst=self.pg1.remote_ip4)
409             / TCP(sport=port_in, dport=external_port)
410         )
411         self.pg0.add_stream(p)
412         self.pg_enable_capture(self.pg_interfaces)
413         self.pg_start()
414         capture = self.pg1.get_capture(1)
415         p = capture[0]
416         try:
417             ip = p[IP]
418             tcp = p[TCP]
419             self.assertEqual(ip.src, nat_ip)
420             self.assertEqual(ip.dst, self.pg1.remote_ip4)
421             self.assertEqual(tcp.dport, external_port)
422             port_out1 = tcp.sport
423         except:
424             self.logger.error(ppp("Unexpected or invalid packet:", p))
425             raise
426
427         dms = self.vapi.det44_map_dump()
428         self.assertEqual(1, len(dms))
429         self.assertEqual(2, dms[0].ses_num)
430
431         # out to host0
432         p = (
433             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
434             / IP(src=self.pg1.remote_ip4, dst=nat_ip)
435             / TCP(sport=external_port, dport=port_out0)
436         )
437         self.pg1.add_stream(p)
438         self.pg_enable_capture(self.pg_interfaces)
439         self.pg_start()
440         capture = self.pg0.get_capture(1)
441         p = capture[0]
442         try:
443             ip = p[IP]
444             tcp = p[TCP]
445             self.assertEqual(ip.src, self.pg1.remote_ip4)
446             self.assertEqual(ip.dst, host0.ip4)
447             self.assertEqual(tcp.dport, port_in)
448             self.assertEqual(tcp.sport, external_port)
449         except:
450             self.logger.error(ppp("Unexpected or invalid packet:", p))
451             raise
452
453         # out to host1
454         p = (
455             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
456             / IP(src=self.pg1.remote_ip4, dst=nat_ip)
457             / TCP(sport=external_port, dport=port_out1)
458         )
459         self.pg1.add_stream(p)
460         self.pg_enable_capture(self.pg_interfaces)
461         self.pg_start()
462         capture = self.pg0.get_capture(1)
463         p = capture[0]
464         try:
465             ip = p[IP]
466             tcp = p[TCP]
467             self.assertEqual(ip.src, self.pg1.remote_ip4)
468             self.assertEqual(ip.dst, host1.ip4)
469             self.assertEqual(tcp.dport, port_in)
470             self.assertEqual(tcp.sport, external_port)
471         except:
472             self.logger.error(ppp("Unexpected or invalid packet", p))
473             raise
474
475         # session close api test
476         self.vapi.det44_close_session_out(
477             socket.inet_aton(nat_ip), port_out1, self.pg1.remote_ip4, external_port
478         )
479         dms = self.vapi.det44_map_dump()
480         self.assertEqual(dms[0].ses_num, 1)
481
482         self.vapi.det44_close_session_in(
483             host0.ip4, port_in, self.pg1.remote_ip4, external_port
484         )
485         dms = self.vapi.det44_map_dump()
486         self.assertEqual(dms[0].ses_num, 0)
487
488     def test_tcp_session_close_detection_in(self):
489         """DET44 TCP session close from inside network"""
490         self.vapi.det44_add_del_map(
491             is_add=1,
492             in_addr=self.pg0.remote_ip4,
493             in_plen=32,
494             out_addr=socket.inet_aton(self.nat_addr),
495             out_plen=32,
496         )
497         self.vapi.det44_interface_add_del_feature(
498             sw_if_index=self.pg0.sw_if_index, is_add=1, is_inside=1
499         )
500         self.vapi.det44_interface_add_del_feature(
501             sw_if_index=self.pg1.sw_if_index, is_add=1, is_inside=0
502         )
503
504         self.initiate_tcp_session(self.pg0, self.pg1)
505
506         # close the session from inside
507         try:
508             # FIN packet in -> out
509             p = (
510                 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
511                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
512                 / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="F")
513             )
514             self.pg0.add_stream(p)
515             self.pg_enable_capture(self.pg_interfaces)
516             self.pg_start()
517             self.pg1.get_capture(1)
518
519             pkts = []
520
521             # ACK packet out -> in
522             p = (
523                 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
524                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
525                 / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="A")
526             )
527             pkts.append(p)
528
529             # FIN packet out -> in
530             p = (
531                 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
532                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
533                 / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="F")
534             )
535             pkts.append(p)
536
537             self.pg1.add_stream(pkts)
538             self.pg_enable_capture(self.pg_interfaces)
539             self.pg_start()
540             self.pg0.get_capture(2)
541
542             # ACK packet in -> out
543             p = (
544                 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
545                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
546                 / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="A")
547             )
548             self.pg0.add_stream(p)
549             self.pg_enable_capture(self.pg_interfaces)
550             self.pg_start()
551             self.pg1.get_capture(1)
552
553             # Check if deterministic NAT44 closed the session
554             dms = self.vapi.det44_map_dump()
555             self.assertEqual(0, dms[0].ses_num)
556         except:
557             self.logger.error("TCP session termination failed")
558             raise
559
560     def test_tcp_session_close_detection_out(self):
561         """Deterministic NAT TCP session close from outside network"""
562         self.vapi.det44_add_del_map(
563             is_add=1,
564             in_addr=self.pg0.remote_ip4,
565             in_plen=32,
566             out_addr=socket.inet_aton(self.nat_addr),
567             out_plen=32,
568         )
569         self.vapi.det44_interface_add_del_feature(
570             sw_if_index=self.pg0.sw_if_index, is_add=1, is_inside=1
571         )
572         self.vapi.det44_interface_add_del_feature(
573             sw_if_index=self.pg1.sw_if_index, is_add=1, is_inside=0
574         )
575
576         self.initiate_tcp_session(self.pg0, self.pg1)
577
578         # close the session from outside
579         try:
580             # FIN packet out -> in
581             p = (
582                 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
583                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
584                 / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="F")
585             )
586             self.pg1.add_stream(p)
587             self.pg_enable_capture(self.pg_interfaces)
588             self.pg_start()
589             self.pg0.get_capture(1)
590
591             pkts = []
592
593             # ACK packet in -> out
594             p = (
595                 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
596                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
597                 / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="A")
598             )
599             pkts.append(p)
600
601             # ACK packet in -> out
602             p = (
603                 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
604                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
605                 / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="F")
606             )
607             pkts.append(p)
608
609             self.pg0.add_stream(pkts)
610             self.pg_enable_capture(self.pg_interfaces)
611             self.pg_start()
612             self.pg1.get_capture(2)
613
614             # ACK packet out -> in
615             p = (
616                 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
617                 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
618                 / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="A")
619             )
620             self.pg1.add_stream(p)
621             self.pg_enable_capture(self.pg_interfaces)
622             self.pg_start()
623             self.pg0.get_capture(1)
624
625             # Check if deterministic NAT44 closed the session
626             dms = self.vapi.det44_map_dump()
627             self.assertEqual(0, dms[0].ses_num)
628         except:
629             self.logger.error("TCP session termination failed")
630             raise
631
632     def test_session_timeout(self):
633         """Deterministic NAT session timeouts"""
634         self.vapi.det44_add_del_map(
635             is_add=1,
636             in_addr=self.pg0.remote_ip4,
637             in_plen=32,
638             out_addr=socket.inet_aton(self.nat_addr),
639             out_plen=32,
640         )
641         self.vapi.det44_interface_add_del_feature(
642             sw_if_index=self.pg0.sw_if_index, is_add=1, is_inside=1
643         )
644         self.vapi.det44_interface_add_del_feature(
645             sw_if_index=self.pg1.sw_if_index, is_add=1, is_inside=0
646         )
647
648         self.initiate_tcp_session(self.pg0, self.pg1)
649         self.vapi.det44_set_timeouts(udp=5, tcp_established=5, tcp_transitory=5, icmp=5)
650         pkts = self.create_stream_in(self.pg0, self.pg1)
651         self.pg0.add_stream(pkts)
652         self.pg_enable_capture(self.pg_interfaces)
653         self.pg_start()
654         self.pg1.get_capture(len(pkts))
655         self.virtual_sleep(15)
656
657         dms = self.vapi.det44_map_dump()
658         self.assertEqual(0, dms[0].ses_num)
659
660     # TODO: ipfix needs to be separated from NAT base plugin
661     @unittest.skipUnless(config.extended, "part of extended tests")
662     def test_session_limit_per_user(self):
663         """Deterministic NAT maximum sessions per user limit"""
664         self.vapi.det44_add_del_map(
665             is_add=1,
666             in_addr=self.pg0.remote_ip4,
667             in_plen=32,
668             out_addr=socket.inet_aton(self.nat_addr),
669             out_plen=32,
670         )
671         self.vapi.det44_interface_add_del_feature(
672             sw_if_index=self.pg0.sw_if_index, is_add=1, is_inside=1
673         )
674         self.vapi.det44_interface_add_del_feature(
675             sw_if_index=self.pg1.sw_if_index, is_add=1, is_inside=0
676         )
677         self.vapi.set_ipfix_exporter(
678             collector_address=self.pg2.remote_ip4,
679             src_address=self.pg2.local_ip4,
680             path_mtu=512,
681             template_interval=10,
682         )
683         self.vapi.nat_ipfix_enable_disable(domain_id=1, src_port=4739, enable=1)
684
685         pkts = []
686         for port in range(1025, 2025):
687             p = (
688                 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
689                 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
690                 / UDP(sport=port, dport=port)
691             )
692             pkts.append(p)
693
694         self.pg0.add_stream(pkts)
695         self.pg_enable_capture(self.pg_interfaces)
696         self.pg_start()
697         self.pg1.get_capture(len(pkts))
698
699         p = (
700             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
701             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
702             / UDP(sport=3001, dport=3002)
703         )
704         self.pg0.add_stream(p)
705         self.pg_enable_capture(self.pg_interfaces)
706         self.pg_start()
707         self.pg1.assert_nothing_captured()
708
709         # verify ICMP error packet
710         capture = self.pg0.get_capture(1)
711         p = capture[0]
712         self.assertTrue(p.haslayer(ICMP))
713         icmp = p[ICMP]
714         self.assertEqual(icmp.type, 3)
715         self.assertEqual(icmp.code, 1)
716         self.assertTrue(icmp.haslayer(IPerror))
717         inner_ip = icmp[IPerror]
718         self.assertEqual(inner_ip[UDPerror].sport, 3001)
719         self.assertEqual(inner_ip[UDPerror].dport, 3002)
720
721         dms = self.vapi.det44_map_dump()
722
723         self.assertEqual(1000, dms[0].ses_num)
724
725         # verify IPFIX logging
726         self.vapi.ipfix_flush()
727         capture = self.pg2.get_capture(7)
728         ipfix = IPFIXDecoder()
729         # first load template
730         for p in capture:
731             self.assertTrue(p.haslayer(IPFIX))
732             if p.haslayer(Template):
733                 ipfix.add_template(p.getlayer(Template))
734         # verify events in data set
735         for p in capture:
736             if p.haslayer(Data):
737                 data = ipfix.decode_data_set(p.getlayer(Set))
738                 self.verify_ipfix_max_entries_per_user(data, 1000, self.pg0.remote_ip4)
739         self.vapi.nat_ipfix_enable_disable(domain_id=1, src_port=4739, enable=0)