ip: refactor reassembly
[vpp.git] / test / test_reassembly.py
1 #!/usr/bin/env python
2
3 import six
4 import unittest
5 from random import shuffle, choice, randrange
6
7 from framework import VppTestCase, VppTestRunner
8
9 import scapy.compat
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, GRE
12 from scapy.layers.inet import IP, UDP, ICMP
13 from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, ICMPv6ParamProblem,\
14     ICMPv6TimeExceeded
15 from framework import VppTestCase, VppTestRunner
16 from util import ppp, ppc, fragment_rfc791, fragment_rfc8200
17 from vpp_gre_interface import VppGreInterface
18 from vpp_ip import DpoProto
19 from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathProto
20 from vpp_papi import VppEnum
21
22 # 35 is enough to have >257 400-byte fragments
23 test_packet_count = 35
24
25 # number of workers used for multi-worker test cases
26 worker_count = 3
27
28
29 class TestIPv4Reassembly(VppTestCase):
30     """ IPv4 Reassembly """
31
32     @classmethod
33     def setUpClass(cls):
34         super(TestIPv4Reassembly, cls).setUpClass()
35
36         cls.create_pg_interfaces([0, 1])
37         cls.src_if = cls.pg0
38         cls.dst_if = cls.pg1
39
40         # setup all interfaces
41         for i in cls.pg_interfaces:
42             i.admin_up()
43             i.config_ip4()
44             i.resolve_arp()
45
46         # packet sizes
47         cls.packet_sizes = [64, 512, 1518, 9018]
48         cls.padding = " abcdefghijklmn"
49         cls.create_stream(cls.packet_sizes)
50         cls.create_fragments()
51
52     @classmethod
53     def tearDownClass(cls):
54         super(TestIPv4Reassembly, cls).tearDownClass()
55
56     def setUp(self):
57         """ Test setup - force timeout on existing reassemblies """
58         super(TestIPv4Reassembly, self).setUp()
59         self.vapi.ip_reassembly_enable_disable(
60             sw_if_index=self.src_if.sw_if_index, enable_ip4=True)
61         self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000,
62                                     max_reassembly_length=1000,
63                                     expire_walk_interval_ms=10)
64         self.sleep(.25)
65         self.vapi.ip_reassembly_set(timeout_ms=1000000, max_reassemblies=1000,
66                                     max_reassembly_length=1000,
67                                     expire_walk_interval_ms=10000)
68
69     def tearDown(self):
70         super(TestIPv4Reassembly, self).tearDown()
71
72     def show_commands_at_teardown(self):
73         self.logger.debug(self.vapi.ppcli("show ip4-full-reassembly details"))
74         self.logger.debug(self.vapi.ppcli("show buffers"))
75
76     @classmethod
77     def create_stream(cls, packet_sizes, packet_count=test_packet_count):
78         """Create input packet stream
79
80         :param list packet_sizes: Required packet sizes.
81         """
82         for i in range(0, packet_count):
83             info = cls.create_packet_info(cls.src_if, cls.src_if)
84             payload = cls.info_to_payload(info)
85             p = (Ether(dst=cls.src_if.local_mac, src=cls.src_if.remote_mac) /
86                  IP(id=info.index, src=cls.src_if.remote_ip4,
87                     dst=cls.dst_if.remote_ip4) /
88                  UDP(sport=1234, dport=5678) /
89                  Raw(payload))
90             size = packet_sizes[(i // 2) % len(packet_sizes)]
91             cls.extend_packet(p, size, cls.padding)
92             info.data = p
93
94     @classmethod
95     def create_fragments(cls):
96         infos = cls._packet_infos
97         cls.pkt_infos = []
98         for index, info in six.iteritems(infos):
99             p = info.data
100             # cls.logger.debug(ppp("Packet:",
101             #                      p.__class__(scapy.compat.raw(p))))
102             fragments_400 = fragment_rfc791(p, 400)
103             fragments_300 = fragment_rfc791(p, 300)
104             fragments_200 = [
105                 x for f in fragments_400 for x in fragment_rfc791(f, 200)]
106             cls.pkt_infos.append(
107                 (index, fragments_400, fragments_300, fragments_200))
108         cls.fragments_400 = [
109             x for (_, frags, _, _) in cls.pkt_infos for x in frags]
110         cls.fragments_300 = [
111             x for (_, _, frags, _) in cls.pkt_infos for x in frags]
112         cls.fragments_200 = [
113             x for (_, _, _, frags) in cls.pkt_infos for x in frags]
114         cls.logger.debug("Fragmented %s packets into %s 400-byte fragments, "
115                          "%s 300-byte fragments and %s 200-byte fragments" %
116                          (len(infos), len(cls.fragments_400),
117                              len(cls.fragments_300), len(cls.fragments_200)))
118
119     def verify_capture(self, capture, dropped_packet_indexes=[]):
120         """Verify captured packet stream.
121
122         :param list capture: Captured packet stream.
123         """
124         info = None
125         seen = set()
126         for packet in capture:
127             try:
128                 self.logger.debug(ppp("Got packet:", packet))
129                 ip = packet[IP]
130                 udp = packet[UDP]
131                 payload_info = self.payload_to_info(packet[Raw])
132                 packet_index = payload_info.index
133                 self.assertTrue(
134                     packet_index not in dropped_packet_indexes,
135                     ppp("Packet received, but should be dropped:", packet))
136                 if packet_index in seen:
137                     raise Exception(ppp("Duplicate packet received", packet))
138                 seen.add(packet_index)
139                 self.assertEqual(payload_info.dst, self.src_if.sw_if_index)
140                 info = self._packet_infos[packet_index]
141                 self.assertTrue(info is not None)
142                 self.assertEqual(packet_index, info.index)
143                 saved_packet = info.data
144                 self.assertEqual(ip.src, saved_packet[IP].src)
145                 self.assertEqual(ip.dst, saved_packet[IP].dst)
146                 self.assertEqual(udp.payload, saved_packet[UDP].payload)
147             except Exception:
148                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
149                 raise
150         for index in self._packet_infos:
151             self.assertTrue(index in seen or index in dropped_packet_indexes,
152                             "Packet with packet_index %d not received" % index)
153
154     def test_reassembly(self):
155         """ basic reassembly """
156
157         self.pg_enable_capture()
158         self.src_if.add_stream(self.fragments_200)
159         self.pg_start()
160
161         packets = self.dst_if.get_capture(len(self.pkt_infos))
162         self.verify_capture(packets)
163         self.src_if.assert_nothing_captured()
164
165         # run it all again to verify correctness
166         self.pg_enable_capture()
167         self.src_if.add_stream(self.fragments_200)
168         self.pg_start()
169
170         packets = self.dst_if.get_capture(len(self.pkt_infos))
171         self.verify_capture(packets)
172         self.src_if.assert_nothing_captured()
173
174     def test_reversed(self):
175         """ reverse order reassembly """
176
177         fragments = list(self.fragments_200)
178         fragments.reverse()
179
180         self.pg_enable_capture()
181         self.src_if.add_stream(fragments)
182         self.pg_start()
183
184         packets = self.dst_if.get_capture(len(self.packet_infos))
185         self.verify_capture(packets)
186         self.src_if.assert_nothing_captured()
187
188         # run it all again to verify correctness
189         self.pg_enable_capture()
190         self.src_if.add_stream(fragments)
191         self.pg_start()
192
193         packets = self.dst_if.get_capture(len(self.packet_infos))
194         self.verify_capture(packets)
195         self.src_if.assert_nothing_captured()
196
197     def test_long_fragment_chain(self):
198         """ long fragment chain """
199
200         error_cnt_str = \
201             "/err/ip4-full-reassembly-feature/fragment chain too long (drop)"
202
203         error_cnt = self.statistics.get_err_counter(error_cnt_str)
204
205         self.vapi.ip_reassembly_set(timeout_ms=100, max_reassemblies=1000,
206                                     max_reassembly_length=3,
207                                     expire_walk_interval_ms=50)
208
209         p1 = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
210               IP(id=1000, src=self.src_if.remote_ip4,
211                  dst=self.dst_if.remote_ip4) /
212               UDP(sport=1234, dport=5678) /
213               Raw("X" * 1000))
214         p2 = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
215               IP(id=1001, src=self.src_if.remote_ip4,
216                  dst=self.dst_if.remote_ip4) /
217               UDP(sport=1234, dport=5678) /
218               Raw("X" * 1000))
219         frags = fragment_rfc791(p1, 200) + fragment_rfc791(p2, 500)
220
221         self.pg_enable_capture()
222         self.src_if.add_stream(frags)
223         self.pg_start()
224
225         self.dst_if.get_capture(1)
226         self.assert_error_counter_equal(error_cnt_str, error_cnt + 1)
227
228     def test_5737(self):
229         """ fragment length + ip header size > 65535 """
230         self.vapi.cli("clear errors")
231         raw = ('E\x00\x00\x88,\xf8\x1f\xfe@\x01\x98\x00\xc0\xa8\n-\xc0\xa8\n'
232                '\x01\x08\x00\xf0J\xed\xcb\xf1\xf5Test-group: IPv4.IPv4.ipv4-'
233                'message.Ethernet-Payload.IPv4-Packet.IPv4-Header.Fragment-Of'
234                'fset; Test-case: 5737')
235
236         malformed_packet = (Ether(dst=self.src_if.local_mac,
237                                   src=self.src_if.remote_mac) /
238                             IP(raw))
239         p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
240              IP(id=1000, src=self.src_if.remote_ip4,
241                 dst=self.dst_if.remote_ip4) /
242              UDP(sport=1234, dport=5678) /
243              Raw("X" * 1000))
244         valid_fragments = fragment_rfc791(p, 400)
245
246         self.pg_enable_capture()
247         self.src_if.add_stream([malformed_packet] + valid_fragments)
248         self.pg_start()
249
250         self.dst_if.get_capture(1)
251         self.logger.debug(self.vapi.ppcli("show error"))
252         self.assert_packet_counter_equal("ip4-full-reassembly-feature", 1)
253         # TODO remove above, uncomment below once clearing of counters
254         # is supported
255         # self.assert_packet_counter_equal(
256         #     "/err/ip4-full-reassembly-feature/malformed packets", 1)
257
258     def test_44924(self):
259         """ compress tiny fragments """
260         packets = [(Ether(dst=self.src_if.local_mac,
261                           src=self.src_if.remote_mac) /
262                     IP(id=24339, flags="MF", frag=0, ttl=64,
263                        src=self.src_if.remote_ip4,
264                        dst=self.dst_if.remote_ip4) /
265                     ICMP(type="echo-request", code=0, id=0x1fe6, seq=0x2407) /
266                     Raw(load='Test-group: IPv4')),
267                    (Ether(dst=self.src_if.local_mac,
268                           src=self.src_if.remote_mac) /
269                     IP(id=24339, flags="MF", frag=3, ttl=64,
270                        src=self.src_if.remote_ip4,
271                        dst=self.dst_if.remote_ip4) /
272                     ICMP(type="echo-request", code=0, id=0x1fe6, seq=0x2407) /
273                     Raw(load='.IPv4.Fragmentation.vali')),
274                    (Ether(dst=self.src_if.local_mac,
275                           src=self.src_if.remote_mac) /
276                     IP(id=24339, frag=6, ttl=64,
277                        src=self.src_if.remote_ip4,
278                        dst=self.dst_if.remote_ip4) /
279                     ICMP(type="echo-request", code=0, id=0x1fe6, seq=0x2407) /
280                     Raw(load='d; Test-case: 44924'))
281                    ]
282
283         self.pg_enable_capture()
284         self.src_if.add_stream(packets)
285         self.pg_start()
286
287         self.dst_if.get_capture(1)
288
289     def test_frag_1(self):
290         """ fragment of size 1 """
291         self.vapi.cli("clear errors")
292         malformed_packets = [(Ether(dst=self.src_if.local_mac,
293                                     src=self.src_if.remote_mac) /
294                               IP(id=7, len=21, flags="MF", frag=0, ttl=64,
295                                  src=self.src_if.remote_ip4,
296                                  dst=self.dst_if.remote_ip4) /
297                               ICMP(type="echo-request")),
298                              (Ether(dst=self.src_if.local_mac,
299                                     src=self.src_if.remote_mac) /
300                               IP(id=7, len=21, frag=1, ttl=64,
301                                  src=self.src_if.remote_ip4,
302                                  dst=self.dst_if.remote_ip4) /
303                               Raw(load='\x08')),
304                              ]
305
306         p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
307              IP(id=1000, src=self.src_if.remote_ip4,
308                 dst=self.dst_if.remote_ip4) /
309              UDP(sport=1234, dport=5678) /
310              Raw("X" * 1000))
311         valid_fragments = fragment_rfc791(p, 400)
312
313         self.pg_enable_capture()
314         self.src_if.add_stream(malformed_packets + valid_fragments)
315         self.pg_start()
316
317         self.dst_if.get_capture(1)
318
319         self.assert_packet_counter_equal("ip4-full-reassembly-feature", 1)
320         # TODO remove above, uncomment below once clearing of counters
321         # is supported
322         # self.assert_packet_counter_equal(
323         #     "/err/ip4-full-reassembly-feature/malformed packets", 1)
324
325     def test_random(self):
326         """ random order reassembly """
327
328         fragments = list(self.fragments_200)
329         shuffle(fragments)
330
331         self.pg_enable_capture()
332         self.src_if.add_stream(fragments)
333         self.pg_start()
334
335         packets = self.dst_if.get_capture(len(self.packet_infos))
336         self.verify_capture(packets)
337         self.src_if.assert_nothing_captured()
338
339         # run it all again to verify correctness
340         self.pg_enable_capture()
341         self.src_if.add_stream(fragments)
342         self.pg_start()
343
344         packets = self.dst_if.get_capture(len(self.packet_infos))
345         self.verify_capture(packets)
346         self.src_if.assert_nothing_captured()
347
348     def test_duplicates(self):
349         """ duplicate fragments """
350
351         fragments = [
352             x for (_, frags, _, _) in self.pkt_infos
353             for x in frags
354             for _ in range(0, min(2, len(frags)))
355         ]
356
357         self.pg_enable_capture()
358         self.src_if.add_stream(fragments)
359         self.pg_start()
360
361         packets = self.dst_if.get_capture(len(self.pkt_infos))
362         self.verify_capture(packets)
363         self.src_if.assert_nothing_captured()
364
365     def test_overlap1(self):
366         """ overlapping fragments case #1 """
367
368         fragments = []
369         for _, _, frags_300, frags_200 in self.pkt_infos:
370             if len(frags_300) == 1:
371                 fragments.extend(frags_300)
372             else:
373                 for i, j in zip(frags_200, frags_300):
374                     fragments.extend(i)
375                     fragments.extend(j)
376
377         self.pg_enable_capture()
378         self.src_if.add_stream(fragments)
379         self.pg_start()
380
381         packets = self.dst_if.get_capture(len(self.pkt_infos))
382         self.verify_capture(packets)
383         self.src_if.assert_nothing_captured()
384
385         # run it all to verify correctness
386         self.pg_enable_capture()
387         self.src_if.add_stream(fragments)
388         self.pg_start()
389
390         packets = self.dst_if.get_capture(len(self.pkt_infos))
391         self.verify_capture(packets)
392         self.src_if.assert_nothing_captured()
393
394     def test_overlap2(self):
395         """ overlapping fragments case #2 """
396
397         fragments = []
398         for _, _, frags_300, frags_200 in self.pkt_infos:
399             if len(frags_300) == 1:
400                 fragments.extend(frags_300)
401             else:
402                 # care must be taken here so that there are no fragments
403                 # received by vpp after reassembly is finished, otherwise
404                 # new reassemblies will be started and packet generator will
405                 # freak out when it detects unfreed buffers
406                 zipped = zip(frags_300, frags_200)
407                 for i, j in zipped:
408                     fragments.extend(i)
409                     fragments.extend(j)
410                 fragments.pop()
411
412         self.pg_enable_capture()
413         self.src_if.add_stream(fragments)
414         self.pg_start()
415
416         packets = self.dst_if.get_capture(len(self.pkt_infos))
417         self.verify_capture(packets)
418         self.src_if.assert_nothing_captured()
419
420         # run it all to verify correctness
421         self.pg_enable_capture()
422         self.src_if.add_stream(fragments)
423         self.pg_start()
424
425         packets = self.dst_if.get_capture(len(self.pkt_infos))
426         self.verify_capture(packets)
427         self.src_if.assert_nothing_captured()
428
429     def test_timeout_inline(self):
430         """ timeout (inline) """
431
432         dropped_packet_indexes = set(
433             index for (index, frags, _, _) in self.pkt_infos if len(frags) > 1
434         )
435
436         self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000,
437                                     max_reassembly_length=3,
438                                     expire_walk_interval_ms=10000)
439
440         self.pg_enable_capture()
441         self.src_if.add_stream(self.fragments_400)
442         self.pg_start()
443
444         packets = self.dst_if.get_capture(
445             len(self.pkt_infos) - len(dropped_packet_indexes))
446         self.verify_capture(packets, dropped_packet_indexes)
447         self.src_if.assert_nothing_captured()
448
449     def test_timeout_cleanup(self):
450         """ timeout (cleanup) """
451
452         # whole packets + fragmented packets sans last fragment
453         fragments = [
454             x for (_, frags_400, _, _) in self.pkt_infos
455             for x in frags_400[:-1 if len(frags_400) > 1 else None]
456         ]
457
458         # last fragments for fragmented packets
459         fragments2 = [frags_400[-1]
460                       for (_, frags_400, _, _) in self.pkt_infos
461                       if len(frags_400) > 1]
462
463         dropped_packet_indexes = set(
464             index for (index, frags_400, _, _) in self.pkt_infos
465             if len(frags_400) > 1)
466
467         self.vapi.ip_reassembly_set(timeout_ms=100, max_reassemblies=1000,
468                                     max_reassembly_length=1000,
469                                     expire_walk_interval_ms=50)
470
471         self.pg_enable_capture()
472         self.src_if.add_stream(fragments)
473         self.pg_start()
474
475         self.sleep(.25, "wait before sending rest of fragments")
476
477         self.src_if.add_stream(fragments2)
478         self.pg_start()
479
480         packets = self.dst_if.get_capture(
481             len(self.pkt_infos) - len(dropped_packet_indexes))
482         self.verify_capture(packets, dropped_packet_indexes)
483         self.src_if.assert_nothing_captured()
484
485     def test_disabled(self):
486         """ reassembly disabled """
487
488         dropped_packet_indexes = set(
489             index for (index, frags_400, _, _) in self.pkt_infos
490             if len(frags_400) > 1)
491
492         self.vapi.ip_reassembly_set(timeout_ms=1000, max_reassemblies=0,
493                                     max_reassembly_length=3,
494                                     expire_walk_interval_ms=10000)
495
496         self.pg_enable_capture()
497         self.src_if.add_stream(self.fragments_400)
498         self.pg_start()
499
500         packets = self.dst_if.get_capture(
501             len(self.pkt_infos) - len(dropped_packet_indexes))
502         self.verify_capture(packets, dropped_packet_indexes)
503         self.src_if.assert_nothing_captured()
504
505
506 class TestIPv4MWReassembly(VppTestCase):
507     """ IPv4 Reassembly (multiple workers) """
508     worker_config = "workers %d" % worker_count
509
510     @classmethod
511     def setUpClass(cls):
512         super(TestIPv4MWReassembly, cls).setUpClass()
513
514         cls.create_pg_interfaces(range(worker_count+1))
515         cls.src_if = cls.pg0
516         cls.send_ifs = cls.pg_interfaces[:-1]
517         cls.dst_if = cls.pg_interfaces[-1]
518
519         # setup all interfaces
520         for i in cls.pg_interfaces:
521             i.admin_up()
522             i.config_ip4()
523             i.resolve_arp()
524
525         # packets sizes reduced here because we are generating packets without
526         # Ethernet headers, which are added later (diff fragments go via
527         # different interfaces)
528         cls.packet_sizes = [64-len(Ether()), 512-len(Ether()),
529                             1518-len(Ether()), 9018-len(Ether())]
530         cls.padding = " abcdefghijklmn"
531         cls.create_stream(cls.packet_sizes)
532         cls.create_fragments()
533
534     @classmethod
535     def tearDownClass(cls):
536         super(TestIPv4MWReassembly, cls).tearDownClass()
537
538     def setUp(self):
539         """ Test setup - force timeout on existing reassemblies """
540         super(TestIPv4MWReassembly, self).setUp()
541         for intf in self.send_ifs:
542             self.vapi.ip_reassembly_enable_disable(
543                 sw_if_index=intf.sw_if_index, enable_ip4=True)
544         self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000,
545                                     max_reassembly_length=1000,
546                                     expire_walk_interval_ms=10)
547         self.sleep(.25)
548         self.vapi.ip_reassembly_set(timeout_ms=1000000, max_reassemblies=1000,
549                                     max_reassembly_length=1000,
550                                     expire_walk_interval_ms=10000)
551
552     def tearDown(self):
553         super(TestIPv4MWReassembly, self).tearDown()
554
555     def show_commands_at_teardown(self):
556         self.logger.debug(self.vapi.ppcli("show ip4-full-reassembly details"))
557         self.logger.debug(self.vapi.ppcli("show buffers"))
558
559     @classmethod
560     def create_stream(cls, packet_sizes, packet_count=test_packet_count):
561         """Create input packet stream
562
563         :param list packet_sizes: Required packet sizes.
564         """
565         for i in range(0, packet_count):
566             info = cls.create_packet_info(cls.src_if, cls.src_if)
567             payload = cls.info_to_payload(info)
568             p = (IP(id=info.index, src=cls.src_if.remote_ip4,
569                     dst=cls.dst_if.remote_ip4) /
570                  UDP(sport=1234, dport=5678) /
571                  Raw(payload))
572             size = packet_sizes[(i // 2) % len(packet_sizes)]
573             cls.extend_packet(p, size, cls.padding)
574             info.data = p
575
576     @classmethod
577     def create_fragments(cls):
578         infos = cls._packet_infos
579         cls.pkt_infos = []
580         for index, info in six.iteritems(infos):
581             p = info.data
582             # cls.logger.debug(ppp("Packet:",
583             #                      p.__class__(scapy.compat.raw(p))))
584             fragments_400 = fragment_rfc791(p, 400)
585             cls.pkt_infos.append((index, fragments_400))
586         cls.fragments_400 = [
587             x for (_, frags) in cls.pkt_infos for x in frags]
588         cls.logger.debug("Fragmented %s packets into %s 400-byte fragments, " %
589                          (len(infos), len(cls.fragments_400)))
590
591     def verify_capture(self, capture, dropped_packet_indexes=[]):
592         """Verify captured packet stream.
593
594         :param list capture: Captured packet stream.
595         """
596         info = None
597         seen = set()
598         for packet in capture:
599             try:
600                 self.logger.debug(ppp("Got packet:", packet))
601                 ip = packet[IP]
602                 udp = packet[UDP]
603                 payload_info = self.payload_to_info(packet[Raw])
604                 packet_index = payload_info.index
605                 self.assertTrue(
606                     packet_index not in dropped_packet_indexes,
607                     ppp("Packet received, but should be dropped:", packet))
608                 if packet_index in seen:
609                     raise Exception(ppp("Duplicate packet received", packet))
610                 seen.add(packet_index)
611                 self.assertEqual(payload_info.dst, self.src_if.sw_if_index)
612                 info = self._packet_infos[packet_index]
613                 self.assertTrue(info is not None)
614                 self.assertEqual(packet_index, info.index)
615                 saved_packet = info.data
616                 self.assertEqual(ip.src, saved_packet[IP].src)
617                 self.assertEqual(ip.dst, saved_packet[IP].dst)
618                 self.assertEqual(udp.payload, saved_packet[UDP].payload)
619             except Exception:
620                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
621                 raise
622         for index in self._packet_infos:
623             self.assertTrue(index in seen or index in dropped_packet_indexes,
624                             "Packet with packet_index %d not received" % index)
625
626     def send_packets(self, packets):
627         for counter in range(worker_count):
628             if 0 == len(packets[counter]):
629                 continue
630             send_if = self.send_ifs[counter]
631             send_if.add_stream(
632                 (Ether(dst=send_if.local_mac, src=send_if.remote_mac) / x
633                  for x in packets[counter]),
634                 worker=counter)
635         self.pg_start()
636
637     def test_worker_conflict(self):
638         """ 1st and FO=0 fragments on different workers """
639
640         # in first wave we send fragments which don't start at offset 0
641         # then we send fragments with offset 0 on a different thread
642         # then the rest of packets on a random thread
643         first_packets = [[] for n in range(worker_count)]
644         second_packets = [[] for n in range(worker_count)]
645         rest_of_packets = [[] for n in range(worker_count)]
646         for (_, p) in self.pkt_infos:
647             wi = randrange(worker_count)
648             second_packets[wi].append(p[0])
649             if len(p) <= 1:
650                 continue
651             wi2 = wi
652             while wi2 == wi:
653                 wi2 = randrange(worker_count)
654             first_packets[wi2].append(p[1])
655             wi3 = randrange(worker_count)
656             rest_of_packets[wi3].extend(p[2:])
657
658         self.pg_enable_capture()
659         self.send_packets(first_packets)
660         self.send_packets(second_packets)
661         self.send_packets(rest_of_packets)
662
663         packets = self.dst_if.get_capture(len(self.pkt_infos))
664         self.verify_capture(packets)
665         for send_if in self.send_ifs:
666             send_if.assert_nothing_captured()
667
668         self.pg_enable_capture()
669         self.send_packets(first_packets)
670         self.send_packets(second_packets)
671         self.send_packets(rest_of_packets)
672
673         packets = self.dst_if.get_capture(len(self.pkt_infos))
674         self.verify_capture(packets)
675         for send_if in self.send_ifs:
676             send_if.assert_nothing_captured()
677
678
679 class TestIPv6Reassembly(VppTestCase):
680     """ IPv6 Reassembly """
681
682     @classmethod
683     def setUpClass(cls):
684         super(TestIPv6Reassembly, cls).setUpClass()
685
686         cls.create_pg_interfaces([0, 1])
687         cls.src_if = cls.pg0
688         cls.dst_if = cls.pg1
689
690         # setup all interfaces
691         for i in cls.pg_interfaces:
692             i.admin_up()
693             i.config_ip6()
694             i.resolve_ndp()
695
696         # packet sizes
697         cls.packet_sizes = [64, 512, 1518, 9018]
698         cls.padding = " abcdefghijklmn"
699         cls.create_stream(cls.packet_sizes)
700         cls.create_fragments()
701
702     @classmethod
703     def tearDownClass(cls):
704         super(TestIPv6Reassembly, cls).tearDownClass()
705
706     def setUp(self):
707         """ Test setup - force timeout on existing reassemblies """
708         super(TestIPv6Reassembly, self).setUp()
709         self.vapi.ip_reassembly_enable_disable(
710             sw_if_index=self.src_if.sw_if_index, enable_ip6=True)
711         self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000,
712                                     max_reassembly_length=1000,
713                                     expire_walk_interval_ms=10, is_ip6=1)
714         self.sleep(.25)
715         self.vapi.ip_reassembly_set(timeout_ms=1000000, max_reassemblies=1000,
716                                     max_reassembly_length=1000,
717                                     expire_walk_interval_ms=10000, is_ip6=1)
718         self.logger.debug(self.vapi.ppcli("show ip6-full-reassembly details"))
719         self.logger.debug(self.vapi.ppcli("show buffers"))
720
721     def tearDown(self):
722         super(TestIPv6Reassembly, self).tearDown()
723
724     def show_commands_at_teardown(self):
725         self.logger.debug(self.vapi.ppcli("show ip6-full-reassembly details"))
726         self.logger.debug(self.vapi.ppcli("show buffers"))
727
728     @classmethod
729     def create_stream(cls, packet_sizes, packet_count=test_packet_count):
730         """Create input packet stream for defined interface.
731
732         :param list packet_sizes: Required packet sizes.
733         """
734         for i in range(0, packet_count):
735             info = cls.create_packet_info(cls.src_if, cls.src_if)
736             payload = cls.info_to_payload(info)
737             p = (Ether(dst=cls.src_if.local_mac, src=cls.src_if.remote_mac) /
738                  IPv6(src=cls.src_if.remote_ip6,
739                       dst=cls.dst_if.remote_ip6) /
740                  UDP(sport=1234, dport=5678) /
741                  Raw(payload))
742             size = packet_sizes[(i // 2) % len(packet_sizes)]
743             cls.extend_packet(p, size, cls.padding)
744             info.data = p
745
746     @classmethod
747     def create_fragments(cls):
748         infos = cls._packet_infos
749         cls.pkt_infos = []
750         for index, info in six.iteritems(infos):
751             p = info.data
752             # cls.logger.debug(ppp("Packet:",
753             #                      p.__class__(scapy.compat.raw(p))))
754             fragments_400 = fragment_rfc8200(p, info.index, 400)
755             fragments_300 = fragment_rfc8200(p, info.index, 300)
756             cls.pkt_infos.append((index, fragments_400, fragments_300))
757         cls.fragments_400 = [
758             x for _, frags, _ in cls.pkt_infos for x in frags]
759         cls.fragments_300 = [
760             x for _, _, frags in cls.pkt_infos for x in frags]
761         cls.logger.debug("Fragmented %s packets into %s 400-byte fragments, "
762                          "and %s 300-byte fragments" %
763                          (len(infos), len(cls.fragments_400),
764                              len(cls.fragments_300)))
765
766     def verify_capture(self, capture, dropped_packet_indexes=[]):
767         """Verify captured packet strea .
768
769         :param list capture: Captured packet stream.
770         """
771         info = None
772         seen = set()
773         for packet in capture:
774             try:
775                 self.logger.debug(ppp("Got packet:", packet))
776                 ip = packet[IPv6]
777                 udp = packet[UDP]
778                 payload_info = self.payload_to_info(packet[Raw])
779                 packet_index = payload_info.index
780                 self.assertTrue(
781                     packet_index not in dropped_packet_indexes,
782                     ppp("Packet received, but should be dropped:", packet))
783                 if packet_index in seen:
784                     raise Exception(ppp("Duplicate packet received", packet))
785                 seen.add(packet_index)
786                 self.assertEqual(payload_info.dst, self.src_if.sw_if_index)
787                 info = self._packet_infos[packet_index]
788                 self.assertTrue(info is not None)
789                 self.assertEqual(packet_index, info.index)
790                 saved_packet = info.data
791                 self.assertEqual(ip.src, saved_packet[IPv6].src)
792                 self.assertEqual(ip.dst, saved_packet[IPv6].dst)
793                 self.assertEqual(udp.payload, saved_packet[UDP].payload)
794             except Exception:
795                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
796                 raise
797         for index in self._packet_infos:
798             self.assertTrue(index in seen or index in dropped_packet_indexes,
799                             "Packet with packet_index %d not received" % index)
800
801     def test_reassembly(self):
802         """ basic reassembly """
803
804         self.pg_enable_capture()
805         self.src_if.add_stream(self.fragments_400)
806         self.pg_start()
807
808         packets = self.dst_if.get_capture(len(self.pkt_infos))
809         self.verify_capture(packets)
810         self.src_if.assert_nothing_captured()
811
812         # run it all again to verify correctness
813         self.pg_enable_capture()
814         self.src_if.add_stream(self.fragments_400)
815         self.pg_start()
816
817         packets = self.dst_if.get_capture(len(self.pkt_infos))
818         self.verify_capture(packets)
819         self.src_if.assert_nothing_captured()
820
821     def test_reversed(self):
822         """ reverse order reassembly """
823
824         fragments = list(self.fragments_400)
825         fragments.reverse()
826
827         self.pg_enable_capture()
828         self.src_if.add_stream(fragments)
829         self.pg_start()
830
831         packets = self.dst_if.get_capture(len(self.pkt_infos))
832         self.verify_capture(packets)
833         self.src_if.assert_nothing_captured()
834
835         # run it all again to verify correctness
836         self.pg_enable_capture()
837         self.src_if.add_stream(fragments)
838         self.pg_start()
839
840         packets = self.dst_if.get_capture(len(self.pkt_infos))
841         self.verify_capture(packets)
842         self.src_if.assert_nothing_captured()
843
844     def test_random(self):
845         """ random order reassembly """
846
847         fragments = list(self.fragments_400)
848         shuffle(fragments)
849
850         self.pg_enable_capture()
851         self.src_if.add_stream(fragments)
852         self.pg_start()
853
854         packets = self.dst_if.get_capture(len(self.pkt_infos))
855         self.verify_capture(packets)
856         self.src_if.assert_nothing_captured()
857
858         # run it all again to verify correctness
859         self.pg_enable_capture()
860         self.src_if.add_stream(fragments)
861         self.pg_start()
862
863         packets = self.dst_if.get_capture(len(self.pkt_infos))
864         self.verify_capture(packets)
865         self.src_if.assert_nothing_captured()
866
867     def test_duplicates(self):
868         """ duplicate fragments """
869
870         fragments = [
871             x for (_, frags, _) in self.pkt_infos
872             for x in frags
873             for _ in range(0, min(2, len(frags)))
874         ]
875
876         self.pg_enable_capture()
877         self.src_if.add_stream(fragments)
878         self.pg_start()
879
880         packets = self.dst_if.get_capture(len(self.pkt_infos))
881         self.verify_capture(packets)
882         self.src_if.assert_nothing_captured()
883
884     def test_long_fragment_chain(self):
885         """ long fragment chain """
886
887         error_cnt_str = \
888             "/err/ip6-full-reassembly-feature/fragment chain too long (drop)"
889
890         error_cnt = self.statistics.get_err_counter(error_cnt_str)
891
892         self.vapi.ip_reassembly_set(timeout_ms=100, max_reassemblies=1000,
893                                     max_reassembly_length=3,
894                                     expire_walk_interval_ms=50, is_ip6=1)
895
896         p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
897              IPv6(src=self.src_if.remote_ip6,
898                   dst=self.dst_if.remote_ip6) /
899              UDP(sport=1234, dport=5678) /
900              Raw("X" * 1000))
901         frags = fragment_rfc8200(p, 1, 300) + fragment_rfc8200(p, 2, 500)
902
903         self.pg_enable_capture()
904         self.src_if.add_stream(frags)
905         self.pg_start()
906
907         self.dst_if.get_capture(1)
908         self.assert_error_counter_equal(error_cnt_str, error_cnt + 1)
909
910     def test_overlap1(self):
911         """ overlapping fragments case #1 """
912
913         fragments = []
914         for _, frags_400, frags_300 in self.pkt_infos:
915             if len(frags_300) == 1:
916                 fragments.extend(frags_400)
917             else:
918                 for i, j in zip(frags_300, frags_400):
919                     fragments.extend(i)
920                     fragments.extend(j)
921
922         dropped_packet_indexes = set(
923             index for (index, _, frags) in self.pkt_infos if len(frags) > 1
924         )
925
926         self.pg_enable_capture()
927         self.src_if.add_stream(fragments)
928         self.pg_start()
929
930         packets = self.dst_if.get_capture(
931             len(self.pkt_infos) - len(dropped_packet_indexes))
932         self.verify_capture(packets, dropped_packet_indexes)
933         self.src_if.assert_nothing_captured()
934
935     def test_overlap2(self):
936         """ overlapping fragments case #2 """
937
938         fragments = []
939         for _, frags_400, frags_300 in self.pkt_infos:
940             if len(frags_400) == 1:
941                 fragments.extend(frags_400)
942             else:
943                 # care must be taken here so that there are no fragments
944                 # received by vpp after reassembly is finished, otherwise
945                 # new reassemblies will be started and packet generator will
946                 # freak out when it detects unfreed buffers
947                 zipped = zip(frags_400, frags_300)
948                 for i, j in zipped:
949                     fragments.extend(i)
950                     fragments.extend(j)
951                 fragments.pop()
952
953         dropped_packet_indexes = set(
954             index for (index, _, frags) in self.pkt_infos if len(frags) > 1
955         )
956
957         self.pg_enable_capture()
958         self.src_if.add_stream(fragments)
959         self.pg_start()
960
961         packets = self.dst_if.get_capture(
962             len(self.pkt_infos) - len(dropped_packet_indexes))
963         self.verify_capture(packets, dropped_packet_indexes)
964         self.src_if.assert_nothing_captured()
965
966     def test_timeout_inline(self):
967         """ timeout (inline) """
968
969         dropped_packet_indexes = set(
970             index for (index, frags, _) in self.pkt_infos if len(frags) > 1
971         )
972
973         self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000,
974                                     max_reassembly_length=3,
975                                     expire_walk_interval_ms=10000, is_ip6=1)
976
977         self.pg_enable_capture()
978         self.src_if.add_stream(self.fragments_400)
979         self.pg_start()
980
981         packets = self.dst_if.get_capture(
982             len(self.pkt_infos) - len(dropped_packet_indexes))
983         self.verify_capture(packets, dropped_packet_indexes)
984         pkts = self.src_if.get_capture(
985             expected_count=len(dropped_packet_indexes))
986         for icmp in pkts:
987             self.assertIn(ICMPv6TimeExceeded, icmp)
988             self.assertIn(IPv6ExtHdrFragment, icmp)
989             self.assertIn(icmp[IPv6ExtHdrFragment].id, dropped_packet_indexes)
990             dropped_packet_indexes.remove(icmp[IPv6ExtHdrFragment].id)
991
992     def test_timeout_cleanup(self):
993         """ timeout (cleanup) """
994
995         # whole packets + fragmented packets sans last fragment
996         fragments = [
997             x for (_, frags_400, _) in self.pkt_infos
998             for x in frags_400[:-1 if len(frags_400) > 1 else None]
999         ]
1000
1001         # last fragments for fragmented packets
1002         fragments2 = [frags_400[-1]
1003                       for (_, frags_400, _) in self.pkt_infos
1004                       if len(frags_400) > 1]
1005
1006         dropped_packet_indexes = set(
1007             index for (index, frags_400, _) in self.pkt_infos
1008             if len(frags_400) > 1)
1009
1010         self.vapi.ip_reassembly_set(timeout_ms=100, max_reassemblies=1000,
1011                                     max_reassembly_length=1000,
1012                                     expire_walk_interval_ms=50)
1013
1014         self.vapi.ip_reassembly_set(timeout_ms=100, max_reassemblies=1000,
1015                                     max_reassembly_length=1000,
1016                                     expire_walk_interval_ms=50, is_ip6=1)
1017
1018         self.pg_enable_capture()
1019         self.src_if.add_stream(fragments)
1020         self.pg_start()
1021
1022         self.sleep(.25, "wait before sending rest of fragments")
1023
1024         self.src_if.add_stream(fragments2)
1025         self.pg_start()
1026
1027         packets = self.dst_if.get_capture(
1028             len(self.pkt_infos) - len(dropped_packet_indexes))
1029         self.verify_capture(packets, dropped_packet_indexes)
1030         pkts = self.src_if.get_capture(
1031             expected_count=len(dropped_packet_indexes))
1032         for icmp in pkts:
1033             self.assertIn(ICMPv6TimeExceeded, icmp)
1034             self.assertIn(IPv6ExtHdrFragment, icmp)
1035             self.assertIn(icmp[IPv6ExtHdrFragment].id, dropped_packet_indexes)
1036             dropped_packet_indexes.remove(icmp[IPv6ExtHdrFragment].id)
1037
1038     def test_disabled(self):
1039         """ reassembly disabled """
1040
1041         dropped_packet_indexes = set(
1042             index for (index, frags_400, _) in self.pkt_infos
1043             if len(frags_400) > 1)
1044
1045         self.vapi.ip_reassembly_set(timeout_ms=1000, max_reassemblies=0,
1046                                     max_reassembly_length=3,
1047                                     expire_walk_interval_ms=10000, is_ip6=1)
1048
1049         self.pg_enable_capture()
1050         self.src_if.add_stream(self.fragments_400)
1051         self.pg_start()
1052
1053         packets = self.dst_if.get_capture(
1054             len(self.pkt_infos) - len(dropped_packet_indexes))
1055         self.verify_capture(packets, dropped_packet_indexes)
1056         self.src_if.assert_nothing_captured()
1057
1058     def test_missing_upper(self):
1059         """ missing upper layer """
1060         p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
1061              IPv6(src=self.src_if.remote_ip6,
1062                   dst=self.src_if.local_ip6) /
1063              UDP(sport=1234, dport=5678) /
1064              Raw())
1065         self.extend_packet(p, 1000, self.padding)
1066         fragments = fragment_rfc8200(p, 1, 500)
1067         bad_fragment = p.__class__(scapy.compat.raw(fragments[1]))
1068         bad_fragment[IPv6ExtHdrFragment].nh = 59
1069         bad_fragment[IPv6ExtHdrFragment].offset = 0
1070         self.pg_enable_capture()
1071         self.src_if.add_stream([bad_fragment])
1072         self.pg_start()
1073         pkts = self.src_if.get_capture(expected_count=1)
1074         icmp = pkts[0]
1075         self.assertIn(ICMPv6ParamProblem, icmp)
1076         self.assert_equal(icmp[ICMPv6ParamProblem].code, 3, "ICMP code")
1077
1078     def test_invalid_frag_size(self):
1079         """ fragment size not a multiple of 8 """
1080         p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
1081              IPv6(src=self.src_if.remote_ip6,
1082                   dst=self.src_if.local_ip6) /
1083              UDP(sport=1234, dport=5678) /
1084              Raw())
1085         self.extend_packet(p, 1000, self.padding)
1086         fragments = fragment_rfc8200(p, 1, 500)
1087         bad_fragment = fragments[0]
1088         self.extend_packet(bad_fragment, len(bad_fragment) + 5)
1089         self.pg_enable_capture()
1090         self.src_if.add_stream([bad_fragment])
1091         self.pg_start()
1092         pkts = self.src_if.get_capture(expected_count=1)
1093         icmp = pkts[0]
1094         self.assertIn(ICMPv6ParamProblem, icmp)
1095         self.assert_equal(icmp[ICMPv6ParamProblem].code, 0, "ICMP code")
1096
1097     def test_invalid_packet_size(self):
1098         """ total packet size > 65535 """
1099         p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
1100              IPv6(src=self.src_if.remote_ip6,
1101                   dst=self.src_if.local_ip6) /
1102              UDP(sport=1234, dport=5678) /
1103              Raw())
1104         self.extend_packet(p, 1000, self.padding)
1105         fragments = fragment_rfc8200(p, 1, 500)
1106         bad_fragment = fragments[1]
1107         bad_fragment[IPv6ExtHdrFragment].offset = 65500
1108         self.pg_enable_capture()
1109         self.src_if.add_stream([bad_fragment])
1110         self.pg_start()
1111         pkts = self.src_if.get_capture(expected_count=1)
1112         icmp = pkts[0]
1113         self.assertIn(ICMPv6ParamProblem, icmp)
1114         self.assert_equal(icmp[ICMPv6ParamProblem].code, 0, "ICMP code")
1115
1116
1117 class TestIPv6MWReassembly(VppTestCase):
1118     """ IPv6 Reassembly (multiple workers) """
1119     worker_config = "workers %d" % worker_count
1120
1121     @classmethod
1122     def setUpClass(cls):
1123         super(TestIPv6MWReassembly, cls).setUpClass()
1124
1125         cls.create_pg_interfaces(range(worker_count+1))
1126         cls.src_if = cls.pg0
1127         cls.send_ifs = cls.pg_interfaces[:-1]
1128         cls.dst_if = cls.pg_interfaces[-1]
1129
1130         # setup all interfaces
1131         for i in cls.pg_interfaces:
1132             i.admin_up()
1133             i.config_ip6()
1134             i.resolve_ndp()
1135
1136         # packets sizes reduced here because we are generating packets without
1137         # Ethernet headers, which are added later (diff fragments go via
1138         # different interfaces)
1139         cls.packet_sizes = [64-len(Ether()), 512-len(Ether()),
1140                             1518-len(Ether()), 9018-len(Ether())]
1141         cls.padding = " abcdefghijklmn"
1142         cls.create_stream(cls.packet_sizes)
1143         cls.create_fragments()
1144
1145     @classmethod
1146     def tearDownClass(cls):
1147         super(TestIPv6MWReassembly, cls).tearDownClass()
1148
1149     def setUp(self):
1150         """ Test setup - force timeout on existing reassemblies """
1151         super(TestIPv6MWReassembly, self).setUp()
1152         for intf in self.send_ifs:
1153             self.vapi.ip_reassembly_enable_disable(
1154                 sw_if_index=intf.sw_if_index, enable_ip6=True)
1155         self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000,
1156                                     max_reassembly_length=1000,
1157                                     expire_walk_interval_ms=10, is_ip6=1)
1158         self.sleep(.25)
1159         self.vapi.ip_reassembly_set(timeout_ms=1000000, max_reassemblies=1000,
1160                                     max_reassembly_length=1000,
1161                                     expire_walk_interval_ms=1000, is_ip6=1)
1162
1163     def tearDown(self):
1164         super(TestIPv6MWReassembly, self).tearDown()
1165
1166     def show_commands_at_teardown(self):
1167         self.logger.debug(self.vapi.ppcli("show ip6-full-reassembly details"))
1168         self.logger.debug(self.vapi.ppcli("show buffers"))
1169
1170     @classmethod
1171     def create_stream(cls, packet_sizes, packet_count=test_packet_count):
1172         """Create input packet stream
1173
1174         :param list packet_sizes: Required packet sizes.
1175         """
1176         for i in range(0, packet_count):
1177             info = cls.create_packet_info(cls.src_if, cls.src_if)
1178             payload = cls.info_to_payload(info)
1179             p = (IPv6(src=cls.src_if.remote_ip6,
1180                       dst=cls.dst_if.remote_ip6) /
1181                  UDP(sport=1234, dport=5678) /
1182                  Raw(payload))
1183             size = packet_sizes[(i // 2) % len(packet_sizes)]
1184             cls.extend_packet(p, size, cls.padding)
1185             info.data = p
1186
1187     @classmethod
1188     def create_fragments(cls):
1189         infos = cls._packet_infos
1190         cls.pkt_infos = []
1191         for index, info in six.iteritems(infos):
1192             p = info.data
1193             # cls.logger.debug(ppp("Packet:",
1194             #                      p.__class__(scapy.compat.raw(p))))
1195             fragments_400 = fragment_rfc8200(p, index, 400)
1196             cls.pkt_infos.append((index, fragments_400))
1197         cls.fragments_400 = [
1198             x for (_, frags) in cls.pkt_infos for x in frags]
1199         cls.logger.debug("Fragmented %s packets into %s 400-byte fragments, " %
1200                          (len(infos), len(cls.fragments_400)))
1201
1202     def verify_capture(self, capture, dropped_packet_indexes=[]):
1203         """Verify captured packet strea .
1204
1205         :param list capture: Captured packet stream.
1206         """
1207         info = None
1208         seen = set()
1209         for packet in capture:
1210             try:
1211                 self.logger.debug(ppp("Got packet:", packet))
1212                 ip = packet[IPv6]
1213                 udp = packet[UDP]
1214                 payload_info = self.payload_to_info(packet[Raw])
1215                 packet_index = payload_info.index
1216                 self.assertTrue(
1217                     packet_index not in dropped_packet_indexes,
1218                     ppp("Packet received, but should be dropped:", packet))
1219                 if packet_index in seen:
1220                     raise Exception(ppp("Duplicate packet received", packet))
1221                 seen.add(packet_index)
1222                 self.assertEqual(payload_info.dst, self.src_if.sw_if_index)
1223                 info = self._packet_infos[packet_index]
1224                 self.assertTrue(info is not None)
1225                 self.assertEqual(packet_index, info.index)
1226                 saved_packet = info.data
1227                 self.assertEqual(ip.src, saved_packet[IPv6].src)
1228                 self.assertEqual(ip.dst, saved_packet[IPv6].dst)
1229                 self.assertEqual(udp.payload, saved_packet[UDP].payload)
1230             except Exception:
1231                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1232                 raise
1233         for index in self._packet_infos:
1234             self.assertTrue(index in seen or index in dropped_packet_indexes,
1235                             "Packet with packet_index %d not received" % index)
1236
1237     def send_packets(self, packets):
1238         for counter in range(worker_count):
1239             if 0 == len(packets[counter]):
1240                 continue
1241             send_if = self.send_ifs[counter]
1242             send_if.add_stream(
1243                 (Ether(dst=send_if.local_mac, src=send_if.remote_mac) / x
1244                  for x in packets[counter]),
1245                 worker=counter)
1246         self.pg_start()
1247
1248     def test_worker_conflict(self):
1249         """ 1st and FO=0 fragments on different workers """
1250
1251         # in first wave we send fragments which don't start at offset 0
1252         # then we send fragments with offset 0 on a different thread
1253         # then the rest of packets on a random thread
1254         first_packets = [[] for n in range(worker_count)]
1255         second_packets = [[] for n in range(worker_count)]
1256         rest_of_packets = [[] for n in range(worker_count)]
1257         for (_, p) in self.pkt_infos:
1258             wi = randrange(worker_count)
1259             second_packets[wi].append(p[0])
1260             if len(p) <= 1:
1261                 continue
1262             wi2 = wi
1263             while wi2 == wi:
1264                 wi2 = randrange(worker_count)
1265             first_packets[wi2].append(p[1])
1266             wi3 = randrange(worker_count)
1267             rest_of_packets[wi3].extend(p[2:])
1268
1269         self.pg_enable_capture()
1270         self.send_packets(first_packets)
1271         self.send_packets(second_packets)
1272         self.send_packets(rest_of_packets)
1273
1274         packets = self.dst_if.get_capture(len(self.pkt_infos))
1275         self.verify_capture(packets)
1276         for send_if in self.send_ifs:
1277             send_if.assert_nothing_captured()
1278
1279         self.pg_enable_capture()
1280         self.send_packets(first_packets)
1281         self.send_packets(second_packets)
1282         self.send_packets(rest_of_packets)
1283
1284         packets = self.dst_if.get_capture(len(self.pkt_infos))
1285         self.verify_capture(packets)
1286         for send_if in self.send_ifs:
1287             send_if.assert_nothing_captured()
1288
1289
1290 class TestIPv4ReassemblyLocalNode(VppTestCase):
1291     """ IPv4 Reassembly for packets coming to ip4-local node """
1292
1293     @classmethod
1294     def setUpClass(cls):
1295         super(TestIPv4ReassemblyLocalNode, cls).setUpClass()
1296
1297         cls.create_pg_interfaces([0])
1298         cls.src_dst_if = cls.pg0
1299
1300         # setup all interfaces
1301         for i in cls.pg_interfaces:
1302             i.admin_up()
1303             i.config_ip4()
1304             i.resolve_arp()
1305
1306         cls.padding = " abcdefghijklmn"
1307         cls.create_stream()
1308         cls.create_fragments()
1309
1310     @classmethod
1311     def tearDownClass(cls):
1312         super(TestIPv4ReassemblyLocalNode, cls).tearDownClass()
1313
1314     def setUp(self):
1315         """ Test setup - force timeout on existing reassemblies """
1316         super(TestIPv4ReassemblyLocalNode, self).setUp()
1317         self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000,
1318                                     max_reassembly_length=1000,
1319                                     expire_walk_interval_ms=10)
1320         self.sleep(.25)
1321         self.vapi.ip_reassembly_set(timeout_ms=1000000, max_reassemblies=1000,
1322                                     max_reassembly_length=1000,
1323                                     expire_walk_interval_ms=10000)
1324
1325     def tearDown(self):
1326         super(TestIPv4ReassemblyLocalNode, self).tearDown()
1327
1328     def show_commands_at_teardown(self):
1329         self.logger.debug(self.vapi.ppcli("show ip4-full-reassembly details"))
1330         self.logger.debug(self.vapi.ppcli("show buffers"))
1331
1332     @classmethod
1333     def create_stream(cls, packet_count=test_packet_count):
1334         """Create input packet stream for defined interface.
1335
1336         :param list packet_sizes: Required packet sizes.
1337         """
1338         for i in range(0, packet_count):
1339             info = cls.create_packet_info(cls.src_dst_if, cls.src_dst_if)
1340             payload = cls.info_to_payload(info)
1341             p = (Ether(dst=cls.src_dst_if.local_mac,
1342                        src=cls.src_dst_if.remote_mac) /
1343                  IP(id=info.index, src=cls.src_dst_if.remote_ip4,
1344                     dst=cls.src_dst_if.local_ip4) /
1345                  ICMP(type='echo-request', id=1234) /
1346                  Raw(payload))
1347             cls.extend_packet(p, 1518, cls.padding)
1348             info.data = p
1349
1350     @classmethod
1351     def create_fragments(cls):
1352         infos = cls._packet_infos
1353         cls.pkt_infos = []
1354         for index, info in six.iteritems(infos):
1355             p = info.data
1356             # cls.logger.debug(ppp("Packet:",
1357             #                      p.__class__(scapy.compat.raw(p))))
1358             fragments_300 = fragment_rfc791(p, 300)
1359             cls.pkt_infos.append((index, fragments_300))
1360         cls.fragments_300 = [x for (_, frags) in cls.pkt_infos for x in frags]
1361         cls.logger.debug("Fragmented %s packets into %s 300-byte fragments" %
1362                          (len(infos), len(cls.fragments_300)))
1363
1364     def verify_capture(self, capture):
1365         """Verify captured packet stream.
1366
1367         :param list capture: Captured packet stream.
1368         """
1369         info = None
1370         seen = set()
1371         for packet in capture:
1372             try:
1373                 self.logger.debug(ppp("Got packet:", packet))
1374                 ip = packet[IP]
1375                 icmp = packet[ICMP]
1376                 payload_info = self.payload_to_info(packet[Raw])
1377                 packet_index = payload_info.index
1378                 if packet_index in seen:
1379                     raise Exception(ppp("Duplicate packet received", packet))
1380                 seen.add(packet_index)
1381                 self.assertEqual(payload_info.dst, self.src_dst_if.sw_if_index)
1382                 info = self._packet_infos[packet_index]
1383                 self.assertIsNotNone(info)
1384                 self.assertEqual(packet_index, info.index)
1385                 saved_packet = info.data
1386                 self.assertEqual(ip.src, saved_packet[IP].dst)
1387                 self.assertEqual(ip.dst, saved_packet[IP].src)
1388                 self.assertEqual(icmp.type, 0)  # echo reply
1389                 self.assertEqual(icmp.id, saved_packet[ICMP].id)
1390                 self.assertEqual(icmp.payload, saved_packet[ICMP].payload)
1391             except Exception:
1392                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1393                 raise
1394         for index in self._packet_infos:
1395             self.assertIn(index, seen,
1396                           "Packet with packet_index %d not received" % index)
1397
1398     def test_reassembly(self):
1399         """ basic reassembly """
1400
1401         self.pg_enable_capture()
1402         self.src_dst_if.add_stream(self.fragments_300)
1403         self.pg_start()
1404
1405         packets = self.src_dst_if.get_capture(len(self.pkt_infos))
1406         self.verify_capture(packets)
1407
1408         # run it all again to verify correctness
1409         self.pg_enable_capture()
1410         self.src_dst_if.add_stream(self.fragments_300)
1411         self.pg_start()
1412
1413         packets = self.src_dst_if.get_capture(len(self.pkt_infos))
1414         self.verify_capture(packets)
1415
1416
1417 class TestFIFReassembly(VppTestCase):
1418     """ Fragments in fragments reassembly """
1419
1420     @classmethod
1421     def setUpClass(cls):
1422         super(TestFIFReassembly, cls).setUpClass()
1423
1424         cls.create_pg_interfaces([0, 1])
1425         cls.src_if = cls.pg0
1426         cls.dst_if = cls.pg1
1427         for i in cls.pg_interfaces:
1428             i.admin_up()
1429             i.config_ip4()
1430             i.resolve_arp()
1431             i.config_ip6()
1432             i.resolve_ndp()
1433
1434         cls.packet_sizes = [64, 512, 1518, 9018]
1435         cls.padding = " abcdefghijklmn"
1436
1437     @classmethod
1438     def tearDownClass(cls):
1439         super(TestFIFReassembly, cls).tearDownClass()
1440
1441     def setUp(self):
1442         """ Test setup - force timeout on existing reassemblies """
1443         super(TestFIFReassembly, self).setUp()
1444         self.vapi.ip_reassembly_enable_disable(
1445             sw_if_index=self.src_if.sw_if_index, enable_ip4=True,
1446             enable_ip6=True)
1447         self.vapi.ip_reassembly_enable_disable(
1448             sw_if_index=self.dst_if.sw_if_index, enable_ip4=True,
1449             enable_ip6=True)
1450         self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000,
1451                                     max_reassembly_length=1000,
1452                                     expire_walk_interval_ms=10)
1453         self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000,
1454                                     max_reassembly_length=1000,
1455                                     expire_walk_interval_ms=10, is_ip6=1)
1456         self.sleep(.25)
1457         self.vapi.ip_reassembly_set(timeout_ms=1000000, max_reassemblies=1000,
1458                                     max_reassembly_length=1000,
1459                                     expire_walk_interval_ms=10000)
1460         self.vapi.ip_reassembly_set(timeout_ms=1000000, max_reassemblies=1000,
1461                                     max_reassembly_length=1000,
1462                                     expire_walk_interval_ms=10000, is_ip6=1)
1463
1464     def tearDown(self):
1465         super(TestFIFReassembly, self).tearDown()
1466
1467     def show_commands_at_teardown(self):
1468         self.logger.debug(self.vapi.ppcli("show ip4-full-reassembly details"))
1469         self.logger.debug(self.vapi.ppcli("show ip6-full-reassembly details"))
1470         self.logger.debug(self.vapi.ppcli("show buffers"))
1471
1472     def verify_capture(self, capture, ip_class, dropped_packet_indexes=[]):
1473         """Verify captured packet stream.
1474
1475         :param list capture: Captured packet stream.
1476         """
1477         info = None
1478         seen = set()
1479         for packet in capture:
1480             try:
1481                 self.logger.debug(ppp("Got packet:", packet))
1482                 ip = packet[ip_class]
1483                 udp = packet[UDP]
1484                 payload_info = self.payload_to_info(packet[Raw])
1485                 packet_index = payload_info.index
1486                 self.assertTrue(
1487                     packet_index not in dropped_packet_indexes,
1488                     ppp("Packet received, but should be dropped:", packet))
1489                 if packet_index in seen:
1490                     raise Exception(ppp("Duplicate packet received", packet))
1491                 seen.add(packet_index)
1492                 self.assertEqual(payload_info.dst, self.dst_if.sw_if_index)
1493                 info = self._packet_infos[packet_index]
1494                 self.assertTrue(info is not None)
1495                 self.assertEqual(packet_index, info.index)
1496                 saved_packet = info.data
1497                 self.assertEqual(ip.src, saved_packet[ip_class].src)
1498                 self.assertEqual(ip.dst, saved_packet[ip_class].dst)
1499                 self.assertEqual(udp.payload, saved_packet[UDP].payload)
1500             except Exception:
1501                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1502                 raise
1503         for index in self._packet_infos:
1504             self.assertTrue(index in seen or index in dropped_packet_indexes,
1505                             "Packet with packet_index %d not received" % index)
1506
1507     def test_fif4(self):
1508         """ Fragments in fragments (4o4) """
1509
1510         # TODO this should be ideally in setUpClass, but then we hit a bug
1511         # with VppIpRoute incorrectly reporting it's present when it's not
1512         # so we need to manually remove the vpp config, thus we cannot have
1513         # it shared for multiple test cases
1514         self.tun_ip4 = "1.1.1.2"
1515
1516         self.gre4 = VppGreInterface(self, self.src_if.local_ip4, self.tun_ip4)
1517         self.gre4.add_vpp_config()
1518         self.gre4.admin_up()
1519         self.gre4.config_ip4()
1520
1521         self.vapi.ip_reassembly_enable_disable(
1522             sw_if_index=self.gre4.sw_if_index, enable_ip4=True)
1523
1524         self.route4 = VppIpRoute(self, self.tun_ip4, 32,
1525                                  [VppRoutePath(self.src_if.remote_ip4,
1526                                                self.src_if.sw_if_index)])
1527         self.route4.add_vpp_config()
1528
1529         self.reset_packet_infos()
1530         for i in range(test_packet_count):
1531             info = self.create_packet_info(self.src_if, self.dst_if)
1532             payload = self.info_to_payload(info)
1533             # Ethernet header here is only for size calculation, thus it
1534             # doesn't matter how it's initialized. This is to ensure that
1535             # reassembled packet is not > 9000 bytes, so that it's not dropped
1536             p = (Ether() /
1537                  IP(id=i, src=self.src_if.remote_ip4,
1538                     dst=self.dst_if.remote_ip4) /
1539                  UDP(sport=1234, dport=5678) /
1540                  Raw(payload))
1541             size = self.packet_sizes[(i // 2) % len(self.packet_sizes)]
1542             self.extend_packet(p, size, self.padding)
1543             info.data = p[IP]  # use only IP part, without ethernet header
1544
1545         fragments = [x for _, p in six.iteritems(self._packet_infos)
1546                      for x in fragment_rfc791(p.data, 400)]
1547
1548         encapped_fragments = \
1549             [Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
1550              IP(src=self.tun_ip4, dst=self.src_if.local_ip4) /
1551                 GRE() /
1552                 p
1553                 for p in fragments]
1554
1555         fragmented_encapped_fragments = \
1556             [x for p in encapped_fragments
1557              for x in fragment_rfc791(p, 200)]
1558
1559         self.src_if.add_stream(fragmented_encapped_fragments)
1560
1561         self.pg_enable_capture(self.pg_interfaces)
1562         self.pg_start()
1563
1564         self.src_if.assert_nothing_captured()
1565         packets = self.dst_if.get_capture(len(self._packet_infos))
1566         self.verify_capture(packets, IP)
1567
1568         # TODO remove gre vpp config by hand until VppIpRoute gets fixed
1569         # so that it's query_vpp_config() works as it should
1570         self.gre4.remove_vpp_config()
1571         self.logger.debug(self.vapi.ppcli("show interface"))
1572
1573     def test_fif6(self):
1574         """ Fragments in fragments (6o6) """
1575         # TODO this should be ideally in setUpClass, but then we hit a bug
1576         # with VppIpRoute incorrectly reporting it's present when it's not
1577         # so we need to manually remove the vpp config, thus we cannot have
1578         # it shared for multiple test cases
1579         self.tun_ip6 = "1002::1"
1580
1581         self.gre6 = VppGreInterface(self, self.src_if.local_ip6, self.tun_ip6)
1582         self.gre6.add_vpp_config()
1583         self.gre6.admin_up()
1584         self.gre6.config_ip6()
1585
1586         self.vapi.ip_reassembly_enable_disable(
1587             sw_if_index=self.gre6.sw_if_index, enable_ip6=True)
1588
1589         self.route6 = VppIpRoute(self, self.tun_ip6, 128,
1590                                  [VppRoutePath(
1591                                      self.src_if.remote_ip6,
1592                                      self.src_if.sw_if_index)])
1593         self.route6.add_vpp_config()
1594
1595         self.reset_packet_infos()
1596         for i in range(test_packet_count):
1597             info = self.create_packet_info(self.src_if, self.dst_if)
1598             payload = self.info_to_payload(info)
1599             # Ethernet header here is only for size calculation, thus it
1600             # doesn't matter how it's initialized. This is to ensure that
1601             # reassembled packet is not > 9000 bytes, so that it's not dropped
1602             p = (Ether() /
1603                  IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6) /
1604                  UDP(sport=1234, dport=5678) /
1605                  Raw(payload))
1606             size = self.packet_sizes[(i // 2) % len(self.packet_sizes)]
1607             self.extend_packet(p, size, self.padding)
1608             info.data = p[IPv6]  # use only IPv6 part, without ethernet header
1609
1610         fragments = [x for _, i in six.iteritems(self._packet_infos)
1611                      for x in fragment_rfc8200(
1612                          i.data, i.index, 400)]
1613
1614         encapped_fragments = \
1615             [Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
1616              IPv6(src=self.tun_ip6, dst=self.src_if.local_ip6) /
1617                 GRE() /
1618                 p
1619                 for p in fragments]
1620
1621         fragmented_encapped_fragments = \
1622             [x for p in encapped_fragments for x in (
1623                 fragment_rfc8200(
1624                     p,
1625                     2 * len(self._packet_infos) + p[IPv6ExtHdrFragment].id,
1626                     200)
1627                 if IPv6ExtHdrFragment in p else [p]
1628             )
1629             ]
1630
1631         self.src_if.add_stream(fragmented_encapped_fragments)
1632
1633         self.pg_enable_capture(self.pg_interfaces)
1634         self.pg_start()
1635
1636         self.src_if.assert_nothing_captured()
1637         packets = self.dst_if.get_capture(len(self._packet_infos))
1638         self.verify_capture(packets, IPv6)
1639
1640         # TODO remove gre vpp config by hand until VppIpRoute gets fixed
1641         # so that it's query_vpp_config() works as it should
1642         self.gre6.remove_vpp_config()
1643
1644
1645 if __name__ == '__main__':
1646     unittest.main(testRunner=VppTestRunner)