ip: reassembly: send packet out on correct worker
[vpp.git] / test / test_reassembly.py
1 #!/usr/bin/env python
2
3 import six
4 import unittest
5 from random import shuffle, choice, randrange
6
7 from framework import VppTestCase, VppTestRunner
8
9 import scapy.compat
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, GRE
12 from scapy.layers.inet import IP, UDP, ICMP
13 from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, ICMPv6ParamProblem,\
14     ICMPv6TimeExceeded
15 from framework import VppTestCase, VppTestRunner
16 from util import ppp, ppc, fragment_rfc791, fragment_rfc8200
17 from vpp_gre_interface import VppGreInterface
18 from vpp_ip import DpoProto
19 from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathProto
20
21 # 35 is enough to have >257 400-byte fragments
22 test_packet_count = 35
23
24 # number of workers used for multi-worker test cases
25 worker_count = 3
26
27
28 class TestIPv4Reassembly(VppTestCase):
29     """ IPv4 Reassembly """
30
31     @classmethod
32     def setUpClass(cls):
33         super(TestIPv4Reassembly, cls).setUpClass()
34
35         cls.create_pg_interfaces([0, 1])
36         cls.src_if = cls.pg0
37         cls.dst_if = cls.pg1
38
39         # setup all interfaces
40         for i in cls.pg_interfaces:
41             i.admin_up()
42             i.config_ip4()
43             i.resolve_arp()
44
45         # packet sizes
46         cls.packet_sizes = [64, 512, 1518, 9018]
47         cls.padding = " abcdefghijklmn"
48         cls.create_stream(cls.packet_sizes)
49         cls.create_fragments()
50
51     @classmethod
52     def tearDownClass(cls):
53         super(TestIPv4Reassembly, cls).tearDownClass()
54
55     def setUp(self):
56         """ Test setup - force timeout on existing reassemblies """
57         super(TestIPv4Reassembly, self).setUp()
58         self.vapi.ip_reassembly_enable_disable(
59             sw_if_index=self.src_if.sw_if_index, enable_ip4=True)
60         self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000,
61                                     max_reassembly_length=1000,
62                                     expire_walk_interval_ms=10)
63         self.sleep(.25)
64         self.vapi.ip_reassembly_set(timeout_ms=1000000, max_reassemblies=1000,
65                                     max_reassembly_length=1000,
66                                     expire_walk_interval_ms=10000)
67
68     def tearDown(self):
69         super(TestIPv4Reassembly, self).tearDown()
70
71     def show_commands_at_teardown(self):
72         self.logger.debug(self.vapi.ppcli("show ip4-reassembly details"))
73         self.logger.debug(self.vapi.ppcli("show buffers"))
74
75     @classmethod
76     def create_stream(cls, packet_sizes, packet_count=test_packet_count):
77         """Create input packet stream
78
79         :param list packet_sizes: Required packet sizes.
80         """
81         for i in range(0, packet_count):
82             info = cls.create_packet_info(cls.src_if, cls.src_if)
83             payload = cls.info_to_payload(info)
84             p = (Ether(dst=cls.src_if.local_mac, src=cls.src_if.remote_mac) /
85                  IP(id=info.index, src=cls.src_if.remote_ip4,
86                     dst=cls.dst_if.remote_ip4) /
87                  UDP(sport=1234, dport=5678) /
88                  Raw(payload))
89             size = packet_sizes[(i // 2) % len(packet_sizes)]
90             cls.extend_packet(p, size, cls.padding)
91             info.data = p
92
93     @classmethod
94     def create_fragments(cls):
95         infos = cls._packet_infos
96         cls.pkt_infos = []
97         for index, info in six.iteritems(infos):
98             p = info.data
99             # cls.logger.debug(ppp("Packet:",
100             #                      p.__class__(scapy.compat.raw(p))))
101             fragments_400 = fragment_rfc791(p, 400)
102             fragments_300 = fragment_rfc791(p, 300)
103             fragments_200 = [
104                 x for f in fragments_400 for x in fragment_rfc791(f, 200)]
105             cls.pkt_infos.append(
106                 (index, fragments_400, fragments_300, fragments_200))
107         cls.fragments_400 = [
108             x for (_, frags, _, _) in cls.pkt_infos for x in frags]
109         cls.fragments_300 = [
110             x for (_, _, frags, _) in cls.pkt_infos for x in frags]
111         cls.fragments_200 = [
112             x for (_, _, _, frags) in cls.pkt_infos for x in frags]
113         cls.logger.debug("Fragmented %s packets into %s 400-byte fragments, "
114                          "%s 300-byte fragments and %s 200-byte fragments" %
115                          (len(infos), len(cls.fragments_400),
116                              len(cls.fragments_300), len(cls.fragments_200)))
117
118     def verify_capture(self, capture, dropped_packet_indexes=[]):
119         """Verify captured packet stream.
120
121         :param list capture: Captured packet stream.
122         """
123         info = None
124         seen = set()
125         for packet in capture:
126             try:
127                 self.logger.debug(ppp("Got packet:", packet))
128                 ip = packet[IP]
129                 udp = packet[UDP]
130                 payload_info = self.payload_to_info(packet[Raw])
131                 packet_index = payload_info.index
132                 self.assertTrue(
133                     packet_index not in dropped_packet_indexes,
134                     ppp("Packet received, but should be dropped:", packet))
135                 if packet_index in seen:
136                     raise Exception(ppp("Duplicate packet received", packet))
137                 seen.add(packet_index)
138                 self.assertEqual(payload_info.dst, self.src_if.sw_if_index)
139                 info = self._packet_infos[packet_index]
140                 self.assertTrue(info is not None)
141                 self.assertEqual(packet_index, info.index)
142                 saved_packet = info.data
143                 self.assertEqual(ip.src, saved_packet[IP].src)
144                 self.assertEqual(ip.dst, saved_packet[IP].dst)
145                 self.assertEqual(udp.payload, saved_packet[UDP].payload)
146             except Exception:
147                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
148                 raise
149         for index in self._packet_infos:
150             self.assertTrue(index in seen or index in dropped_packet_indexes,
151                             "Packet with packet_index %d not received" % index)
152
153     def test_reassembly(self):
154         """ basic reassembly """
155
156         self.pg_enable_capture()
157         self.src_if.add_stream(self.fragments_200)
158         self.pg_start()
159
160         packets = self.dst_if.get_capture(len(self.pkt_infos))
161         self.verify_capture(packets)
162         self.src_if.assert_nothing_captured()
163
164         # run it all again to verify correctness
165         self.pg_enable_capture()
166         self.src_if.add_stream(self.fragments_200)
167         self.pg_start()
168
169         packets = self.dst_if.get_capture(len(self.pkt_infos))
170         self.verify_capture(packets)
171         self.src_if.assert_nothing_captured()
172
173     def test_reversed(self):
174         """ reverse order reassembly """
175
176         fragments = list(self.fragments_200)
177         fragments.reverse()
178
179         self.pg_enable_capture()
180         self.src_if.add_stream(fragments)
181         self.pg_start()
182
183         packets = self.dst_if.get_capture(len(self.packet_infos))
184         self.verify_capture(packets)
185         self.src_if.assert_nothing_captured()
186
187         # run it all again to verify correctness
188         self.pg_enable_capture()
189         self.src_if.add_stream(fragments)
190         self.pg_start()
191
192         packets = self.dst_if.get_capture(len(self.packet_infos))
193         self.verify_capture(packets)
194         self.src_if.assert_nothing_captured()
195
196     def test_long_fragment_chain(self):
197         """ long fragment chain """
198
199         error_cnt_str = \
200             "/err/ip4-reassembly-feature/fragment chain too long (drop)"
201
202         error_cnt = self.statistics.get_err_counter(error_cnt_str)
203
204         self.vapi.ip_reassembly_set(timeout_ms=100, max_reassemblies=1000,
205                                     max_reassembly_length=3,
206                                     expire_walk_interval_ms=50)
207
208         p1 = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
209               IP(id=1000, src=self.src_if.remote_ip4,
210                  dst=self.dst_if.remote_ip4) /
211               UDP(sport=1234, dport=5678) /
212               Raw("X" * 1000))
213         p2 = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
214               IP(id=1001, src=self.src_if.remote_ip4,
215                  dst=self.dst_if.remote_ip4) /
216               UDP(sport=1234, dport=5678) /
217               Raw("X" * 1000))
218         frags = fragment_rfc791(p1, 200) + fragment_rfc791(p2, 500)
219
220         self.pg_enable_capture()
221         self.src_if.add_stream(frags)
222         self.pg_start()
223
224         self.dst_if.get_capture(1)
225         self.assert_error_counter_equal(error_cnt_str, error_cnt + 1)
226
227     def test_5737(self):
228         """ fragment length + ip header size > 65535 """
229         self.vapi.cli("clear errors")
230         raw = ('E\x00\x00\x88,\xf8\x1f\xfe@\x01\x98\x00\xc0\xa8\n-\xc0\xa8\n'
231                '\x01\x08\x00\xf0J\xed\xcb\xf1\xf5Test-group: IPv4.IPv4.ipv4-'
232                'message.Ethernet-Payload.IPv4-Packet.IPv4-Header.Fragment-Of'
233                'fset; Test-case: 5737')
234
235         malformed_packet = (Ether(dst=self.src_if.local_mac,
236                                   src=self.src_if.remote_mac) /
237                             IP(raw))
238         p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
239              IP(id=1000, src=self.src_if.remote_ip4,
240                 dst=self.dst_if.remote_ip4) /
241              UDP(sport=1234, dport=5678) /
242              Raw("X" * 1000))
243         valid_fragments = fragment_rfc791(p, 400)
244
245         self.pg_enable_capture()
246         self.src_if.add_stream([malformed_packet] + valid_fragments)
247         self.pg_start()
248
249         self.dst_if.get_capture(1)
250         self.assert_packet_counter_equal("ip4-reassembly-feature", 1)
251         # TODO remove above, uncomment below once clearing of counters
252         # is supported
253         # self.assert_packet_counter_equal(
254         #     "/err/ip4-reassembly-feature/malformed packets", 1)
255
256     def test_44924(self):
257         """ compress tiny fragments """
258         packets = [(Ether(dst=self.src_if.local_mac,
259                           src=self.src_if.remote_mac) /
260                     IP(id=24339, flags="MF", frag=0, ttl=64,
261                        src=self.src_if.remote_ip4,
262                        dst=self.dst_if.remote_ip4) /
263                     ICMP(type="echo-request", code=0, id=0x1fe6, seq=0x2407) /
264                     Raw(load='Test-group: IPv4')),
265                    (Ether(dst=self.src_if.local_mac,
266                           src=self.src_if.remote_mac) /
267                     IP(id=24339, flags="MF", frag=3, ttl=64,
268                        src=self.src_if.remote_ip4,
269                        dst=self.dst_if.remote_ip4) /
270                     ICMP(type="echo-request", code=0, id=0x1fe6, seq=0x2407) /
271                     Raw(load='.IPv4.Fragmentation.vali')),
272                    (Ether(dst=self.src_if.local_mac,
273                           src=self.src_if.remote_mac) /
274                     IP(id=24339, frag=6, ttl=64,
275                        src=self.src_if.remote_ip4,
276                        dst=self.dst_if.remote_ip4) /
277                     ICMP(type="echo-request", code=0, id=0x1fe6, seq=0x2407) /
278                     Raw(load='d; Test-case: 44924'))
279                    ]
280
281         self.pg_enable_capture()
282         self.src_if.add_stream(packets)
283         self.pg_start()
284
285         self.dst_if.get_capture(1)
286
287     def test_frag_1(self):
288         """ fragment of size 1 """
289         self.vapi.cli("clear errors")
290         malformed_packets = [(Ether(dst=self.src_if.local_mac,
291                                     src=self.src_if.remote_mac) /
292                               IP(id=7, len=21, flags="MF", frag=0, ttl=64,
293                                  src=self.src_if.remote_ip4,
294                                  dst=self.dst_if.remote_ip4) /
295                               ICMP(type="echo-request")),
296                              (Ether(dst=self.src_if.local_mac,
297                                     src=self.src_if.remote_mac) /
298                               IP(id=7, len=21, frag=1, ttl=64,
299                                  src=self.src_if.remote_ip4,
300                                  dst=self.dst_if.remote_ip4) /
301                               Raw(load='\x08')),
302                              ]
303
304         p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
305              IP(id=1000, src=self.src_if.remote_ip4,
306                 dst=self.dst_if.remote_ip4) /
307              UDP(sport=1234, dport=5678) /
308              Raw("X" * 1000))
309         valid_fragments = fragment_rfc791(p, 400)
310
311         self.pg_enable_capture()
312         self.src_if.add_stream(malformed_packets + valid_fragments)
313         self.pg_start()
314
315         self.dst_if.get_capture(1)
316
317         self.assert_packet_counter_equal("ip4-reassembly-feature", 1)
318         # TODO remove above, uncomment below once clearing of counters
319         # is supported
320         # self.assert_packet_counter_equal(
321         #     "/err/ip4-reassembly-feature/malformed packets", 1)
322
323     def test_random(self):
324         """ random order reassembly """
325
326         fragments = list(self.fragments_200)
327         shuffle(fragments)
328
329         self.pg_enable_capture()
330         self.src_if.add_stream(fragments)
331         self.pg_start()
332
333         packets = self.dst_if.get_capture(len(self.packet_infos))
334         self.verify_capture(packets)
335         self.src_if.assert_nothing_captured()
336
337         # run it all again to verify correctness
338         self.pg_enable_capture()
339         self.src_if.add_stream(fragments)
340         self.pg_start()
341
342         packets = self.dst_if.get_capture(len(self.packet_infos))
343         self.verify_capture(packets)
344         self.src_if.assert_nothing_captured()
345
346     def test_duplicates(self):
347         """ duplicate fragments """
348
349         fragments = [
350             x for (_, frags, _, _) in self.pkt_infos
351             for x in frags
352             for _ in range(0, min(2, len(frags)))
353         ]
354
355         self.pg_enable_capture()
356         self.src_if.add_stream(fragments)
357         self.pg_start()
358
359         packets = self.dst_if.get_capture(len(self.pkt_infos))
360         self.verify_capture(packets)
361         self.src_if.assert_nothing_captured()
362
363     def test_overlap1(self):
364         """ overlapping fragments case #1 """
365
366         fragments = []
367         for _, _, frags_300, frags_200 in self.pkt_infos:
368             if len(frags_300) == 1:
369                 fragments.extend(frags_300)
370             else:
371                 for i, j in zip(frags_200, frags_300):
372                     fragments.extend(i)
373                     fragments.extend(j)
374
375         self.pg_enable_capture()
376         self.src_if.add_stream(fragments)
377         self.pg_start()
378
379         packets = self.dst_if.get_capture(len(self.pkt_infos))
380         self.verify_capture(packets)
381         self.src_if.assert_nothing_captured()
382
383         # run it all to verify correctness
384         self.pg_enable_capture()
385         self.src_if.add_stream(fragments)
386         self.pg_start()
387
388         packets = self.dst_if.get_capture(len(self.pkt_infos))
389         self.verify_capture(packets)
390         self.src_if.assert_nothing_captured()
391
392     def test_overlap2(self):
393         """ overlapping fragments case #2 """
394
395         fragments = []
396         for _, _, frags_300, frags_200 in self.pkt_infos:
397             if len(frags_300) == 1:
398                 fragments.extend(frags_300)
399             else:
400                 # care must be taken here so that there are no fragments
401                 # received by vpp after reassembly is finished, otherwise
402                 # new reassemblies will be started and packet generator will
403                 # freak out when it detects unfreed buffers
404                 zipped = zip(frags_300, frags_200)
405                 for i, j in zipped:
406                     fragments.extend(i)
407                     fragments.extend(j)
408                 fragments.pop()
409
410         self.pg_enable_capture()
411         self.src_if.add_stream(fragments)
412         self.pg_start()
413
414         packets = self.dst_if.get_capture(len(self.pkt_infos))
415         self.verify_capture(packets)
416         self.src_if.assert_nothing_captured()
417
418         # run it all to verify correctness
419         self.pg_enable_capture()
420         self.src_if.add_stream(fragments)
421         self.pg_start()
422
423         packets = self.dst_if.get_capture(len(self.pkt_infos))
424         self.verify_capture(packets)
425         self.src_if.assert_nothing_captured()
426
427     def test_timeout_inline(self):
428         """ timeout (inline) """
429
430         dropped_packet_indexes = set(
431             index for (index, frags, _, _) in self.pkt_infos if len(frags) > 1
432         )
433
434         self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000,
435                                     max_reassembly_length=3,
436                                     expire_walk_interval_ms=10000)
437
438         self.pg_enable_capture()
439         self.src_if.add_stream(self.fragments_400)
440         self.pg_start()
441
442         packets = self.dst_if.get_capture(
443             len(self.pkt_infos) - len(dropped_packet_indexes))
444         self.verify_capture(packets, dropped_packet_indexes)
445         self.src_if.assert_nothing_captured()
446
447     def test_timeout_cleanup(self):
448         """ timeout (cleanup) """
449
450         # whole packets + fragmented packets sans last fragment
451         fragments = [
452             x for (_, frags_400, _, _) in self.pkt_infos
453             for x in frags_400[:-1 if len(frags_400) > 1 else None]
454         ]
455
456         # last fragments for fragmented packets
457         fragments2 = [frags_400[-1]
458                       for (_, frags_400, _, _) in self.pkt_infos
459                       if len(frags_400) > 1]
460
461         dropped_packet_indexes = set(
462             index for (index, frags_400, _, _) in self.pkt_infos
463             if len(frags_400) > 1)
464
465         self.vapi.ip_reassembly_set(timeout_ms=100, max_reassemblies=1000,
466                                     max_reassembly_length=1000,
467                                     expire_walk_interval_ms=50)
468
469         self.pg_enable_capture()
470         self.src_if.add_stream(fragments)
471         self.pg_start()
472
473         self.sleep(.25, "wait before sending rest of fragments")
474
475         self.src_if.add_stream(fragments2)
476         self.pg_start()
477
478         packets = self.dst_if.get_capture(
479             len(self.pkt_infos) - len(dropped_packet_indexes))
480         self.verify_capture(packets, dropped_packet_indexes)
481         self.src_if.assert_nothing_captured()
482
483     def test_disabled(self):
484         """ reassembly disabled """
485
486         dropped_packet_indexes = set(
487             index for (index, frags_400, _, _) in self.pkt_infos
488             if len(frags_400) > 1)
489
490         self.vapi.ip_reassembly_set(timeout_ms=1000, max_reassemblies=0,
491                                     max_reassembly_length=3,
492                                     expire_walk_interval_ms=10000)
493
494         self.pg_enable_capture()
495         self.src_if.add_stream(self.fragments_400)
496         self.pg_start()
497
498         packets = self.dst_if.get_capture(
499             len(self.pkt_infos) - len(dropped_packet_indexes))
500         self.verify_capture(packets, dropped_packet_indexes)
501         self.src_if.assert_nothing_captured()
502
503
504 class TestIPv4MWReassembly(VppTestCase):
505     """ IPv4 Reassembly (multiple workers) """
506     worker_config = "workers %d" % worker_count
507
508     @classmethod
509     def setUpClass(cls):
510         super(TestIPv4MWReassembly, cls).setUpClass()
511
512         cls.create_pg_interfaces(range(worker_count+1))
513         cls.src_if = cls.pg0
514         cls.send_ifs = cls.pg_interfaces[:-1]
515         cls.dst_if = cls.pg_interfaces[-1]
516
517         # setup all interfaces
518         for i in cls.pg_interfaces:
519             i.admin_up()
520             i.config_ip4()
521             i.resolve_arp()
522
523         # packets sizes reduced here because we are generating packets without
524         # Ethernet headers, which are added later (diff fragments go via
525         # different interfaces)
526         cls.packet_sizes = [64-len(Ether()), 512-len(Ether()),
527                             1518-len(Ether()), 9018-len(Ether())]
528         cls.padding = " abcdefghijklmn"
529         cls.create_stream(cls.packet_sizes)
530         cls.create_fragments()
531
532     @classmethod
533     def tearDownClass(cls):
534         super(TestIPv4MWReassembly, cls).tearDownClass()
535
536     def setUp(self):
537         """ Test setup - force timeout on existing reassemblies """
538         super(TestIPv4MWReassembly, self).setUp()
539         for intf in self.send_ifs:
540             self.vapi.ip_reassembly_enable_disable(
541                 sw_if_index=intf.sw_if_index, enable_ip4=True)
542         self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000,
543                                     max_reassembly_length=1000,
544                                     expire_walk_interval_ms=10)
545         self.sleep(.25)
546         self.vapi.ip_reassembly_set(timeout_ms=1000000, max_reassemblies=1000,
547                                     max_reassembly_length=1000,
548                                     expire_walk_interval_ms=10000)
549
550     def tearDown(self):
551         super(TestIPv4MWReassembly, self).tearDown()
552
553     def show_commands_at_teardown(self):
554         self.logger.debug(self.vapi.ppcli("show ip4-reassembly details"))
555         self.logger.debug(self.vapi.ppcli("show buffers"))
556
557     @classmethod
558     def create_stream(cls, packet_sizes, packet_count=test_packet_count):
559         """Create input packet stream
560
561         :param list packet_sizes: Required packet sizes.
562         """
563         for i in range(0, packet_count):
564             info = cls.create_packet_info(cls.src_if, cls.src_if)
565             payload = cls.info_to_payload(info)
566             p = (IP(id=info.index, src=cls.src_if.remote_ip4,
567                     dst=cls.dst_if.remote_ip4) /
568                  UDP(sport=1234, dport=5678) /
569                  Raw(payload))
570             size = packet_sizes[(i // 2) % len(packet_sizes)]
571             cls.extend_packet(p, size, cls.padding)
572             info.data = p
573
574     @classmethod
575     def create_fragments(cls):
576         infos = cls._packet_infos
577         cls.pkt_infos = []
578         for index, info in six.iteritems(infos):
579             p = info.data
580             # cls.logger.debug(ppp("Packet:",
581             #                      p.__class__(scapy.compat.raw(p))))
582             fragments_400 = fragment_rfc791(p, 400)
583             cls.pkt_infos.append((index, fragments_400))
584         cls.fragments_400 = [
585             x for (_, frags) in cls.pkt_infos for x in frags]
586         cls.logger.debug("Fragmented %s packets into %s 400-byte fragments, " %
587                          (len(infos), len(cls.fragments_400)))
588
589     def verify_capture(self, capture, dropped_packet_indexes=[]):
590         """Verify captured packet stream.
591
592         :param list capture: Captured packet stream.
593         """
594         info = None
595         seen = set()
596         for packet in capture:
597             try:
598                 self.logger.debug(ppp("Got packet:", packet))
599                 ip = packet[IP]
600                 udp = packet[UDP]
601                 payload_info = self.payload_to_info(packet[Raw])
602                 packet_index = payload_info.index
603                 self.assertTrue(
604                     packet_index not in dropped_packet_indexes,
605                     ppp("Packet received, but should be dropped:", packet))
606                 if packet_index in seen:
607                     raise Exception(ppp("Duplicate packet received", packet))
608                 seen.add(packet_index)
609                 self.assertEqual(payload_info.dst, self.src_if.sw_if_index)
610                 info = self._packet_infos[packet_index]
611                 self.assertTrue(info is not None)
612                 self.assertEqual(packet_index, info.index)
613                 saved_packet = info.data
614                 self.assertEqual(ip.src, saved_packet[IP].src)
615                 self.assertEqual(ip.dst, saved_packet[IP].dst)
616                 self.assertEqual(udp.payload, saved_packet[UDP].payload)
617             except Exception:
618                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
619                 raise
620         for index in self._packet_infos:
621             self.assertTrue(index in seen or index in dropped_packet_indexes,
622                             "Packet with packet_index %d not received" % index)
623
624     def send_packets(self, packets):
625         for counter in range(worker_count):
626             if 0 == len(packets[counter]):
627                 continue
628             send_if = self.send_ifs[counter]
629             send_if.add_stream(
630                 (Ether(dst=send_if.local_mac, src=send_if.remote_mac) / x
631                  for x in packets[counter]),
632                 worker=counter)
633         self.pg_start()
634
635     def test_worker_conflict(self):
636         """ 1st and FO=0 fragments on different workers """
637
638         # in first wave we send fragments which don't start at offset 0
639         # then we send fragments with offset 0 on a different thread
640         # then the rest of packets on a random thread
641         first_packets = [[] for n in range(worker_count)]
642         second_packets = [[] for n in range(worker_count)]
643         rest_of_packets = [[] for n in range(worker_count)]
644         for (_, p) in self.pkt_infos:
645             wi = randrange(worker_count)
646             second_packets[wi].append(p[0])
647             if len(p) <= 1:
648                 continue
649             wi2 = wi
650             while wi2 == wi:
651                 wi2 = randrange(worker_count)
652             first_packets[wi2].append(p[1])
653             wi3 = randrange(worker_count)
654             rest_of_packets[wi3].extend(p[2:])
655
656         self.pg_enable_capture()
657         self.send_packets(first_packets)
658         self.send_packets(second_packets)
659         self.send_packets(rest_of_packets)
660
661         packets = self.dst_if.get_capture(len(self.pkt_infos))
662         self.verify_capture(packets)
663         for send_if in self.send_ifs:
664             send_if.assert_nothing_captured()
665
666         self.pg_enable_capture()
667         self.send_packets(first_packets)
668         self.send_packets(second_packets)
669         self.send_packets(rest_of_packets)
670
671         packets = self.dst_if.get_capture(len(self.pkt_infos))
672         self.verify_capture(packets)
673         for send_if in self.send_ifs:
674             send_if.assert_nothing_captured()
675
676
677 class TestIPv6Reassembly(VppTestCase):
678     """ IPv6 Reassembly """
679
680     @classmethod
681     def setUpClass(cls):
682         super(TestIPv6Reassembly, cls).setUpClass()
683
684         cls.create_pg_interfaces([0, 1])
685         cls.src_if = cls.pg0
686         cls.dst_if = cls.pg1
687
688         # setup all interfaces
689         for i in cls.pg_interfaces:
690             i.admin_up()
691             i.config_ip6()
692             i.resolve_ndp()
693
694         # packet sizes
695         cls.packet_sizes = [64, 512, 1518, 9018]
696         cls.padding = " abcdefghijklmn"
697         cls.create_stream(cls.packet_sizes)
698         cls.create_fragments()
699
700     @classmethod
701     def tearDownClass(cls):
702         super(TestIPv6Reassembly, cls).tearDownClass()
703
704     def setUp(self):
705         """ Test setup - force timeout on existing reassemblies """
706         super(TestIPv6Reassembly, self).setUp()
707         self.vapi.ip_reassembly_enable_disable(
708             sw_if_index=self.src_if.sw_if_index, enable_ip6=True)
709         self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000,
710                                     max_reassembly_length=1000,
711                                     expire_walk_interval_ms=10, is_ip6=1)
712         self.sleep(.25)
713         self.vapi.ip_reassembly_set(timeout_ms=1000000, max_reassemblies=1000,
714                                     max_reassembly_length=1000,
715                                     expire_walk_interval_ms=10000, is_ip6=1)
716         self.logger.debug(self.vapi.ppcli("show ip6-reassembly details"))
717         self.logger.debug(self.vapi.ppcli("show buffers"))
718
719     def tearDown(self):
720         super(TestIPv6Reassembly, self).tearDown()
721
722     def show_commands_at_teardown(self):
723         self.logger.debug(self.vapi.ppcli("show ip6-reassembly details"))
724         self.logger.debug(self.vapi.ppcli("show buffers"))
725
726     @classmethod
727     def create_stream(cls, packet_sizes, packet_count=test_packet_count):
728         """Create input packet stream for defined interface.
729
730         :param list packet_sizes: Required packet sizes.
731         """
732         for i in range(0, packet_count):
733             info = cls.create_packet_info(cls.src_if, cls.src_if)
734             payload = cls.info_to_payload(info)
735             p = (Ether(dst=cls.src_if.local_mac, src=cls.src_if.remote_mac) /
736                  IPv6(src=cls.src_if.remote_ip6,
737                       dst=cls.dst_if.remote_ip6) /
738                  UDP(sport=1234, dport=5678) /
739                  Raw(payload))
740             size = packet_sizes[(i // 2) % len(packet_sizes)]
741             cls.extend_packet(p, size, cls.padding)
742             info.data = p
743
744     @classmethod
745     def create_fragments(cls):
746         infos = cls._packet_infos
747         cls.pkt_infos = []
748         for index, info in six.iteritems(infos):
749             p = info.data
750             # cls.logger.debug(ppp("Packet:",
751             #                      p.__class__(scapy.compat.raw(p))))
752             fragments_400 = fragment_rfc8200(p, info.index, 400)
753             fragments_300 = fragment_rfc8200(p, info.index, 300)
754             cls.pkt_infos.append((index, fragments_400, fragments_300))
755         cls.fragments_400 = [
756             x for _, frags, _ in cls.pkt_infos for x in frags]
757         cls.fragments_300 = [
758             x for _, _, frags in cls.pkt_infos for x in frags]
759         cls.logger.debug("Fragmented %s packets into %s 400-byte fragments, "
760                          "and %s 300-byte fragments" %
761                          (len(infos), len(cls.fragments_400),
762                              len(cls.fragments_300)))
763
764     def verify_capture(self, capture, dropped_packet_indexes=[]):
765         """Verify captured packet strea .
766
767         :param list capture: Captured packet stream.
768         """
769         info = None
770         seen = set()
771         for packet in capture:
772             try:
773                 self.logger.debug(ppp("Got packet:", packet))
774                 ip = packet[IPv6]
775                 udp = packet[UDP]
776                 payload_info = self.payload_to_info(packet[Raw])
777                 packet_index = payload_info.index
778                 self.assertTrue(
779                     packet_index not in dropped_packet_indexes,
780                     ppp("Packet received, but should be dropped:", packet))
781                 if packet_index in seen:
782                     raise Exception(ppp("Duplicate packet received", packet))
783                 seen.add(packet_index)
784                 self.assertEqual(payload_info.dst, self.src_if.sw_if_index)
785                 info = self._packet_infos[packet_index]
786                 self.assertTrue(info is not None)
787                 self.assertEqual(packet_index, info.index)
788                 saved_packet = info.data
789                 self.assertEqual(ip.src, saved_packet[IPv6].src)
790                 self.assertEqual(ip.dst, saved_packet[IPv6].dst)
791                 self.assertEqual(udp.payload, saved_packet[UDP].payload)
792             except Exception:
793                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
794                 raise
795         for index in self._packet_infos:
796             self.assertTrue(index in seen or index in dropped_packet_indexes,
797                             "Packet with packet_index %d not received" % index)
798
799     def test_reassembly(self):
800         """ basic reassembly """
801
802         self.pg_enable_capture()
803         self.src_if.add_stream(self.fragments_400)
804         self.pg_start()
805
806         packets = self.dst_if.get_capture(len(self.pkt_infos))
807         self.verify_capture(packets)
808         self.src_if.assert_nothing_captured()
809
810         # run it all again to verify correctness
811         self.pg_enable_capture()
812         self.src_if.add_stream(self.fragments_400)
813         self.pg_start()
814
815         packets = self.dst_if.get_capture(len(self.pkt_infos))
816         self.verify_capture(packets)
817         self.src_if.assert_nothing_captured()
818
819     def test_reversed(self):
820         """ reverse order reassembly """
821
822         fragments = list(self.fragments_400)
823         fragments.reverse()
824
825         self.pg_enable_capture()
826         self.src_if.add_stream(fragments)
827         self.pg_start()
828
829         packets = self.dst_if.get_capture(len(self.pkt_infos))
830         self.verify_capture(packets)
831         self.src_if.assert_nothing_captured()
832
833         # run it all again to verify correctness
834         self.pg_enable_capture()
835         self.src_if.add_stream(fragments)
836         self.pg_start()
837
838         packets = self.dst_if.get_capture(len(self.pkt_infos))
839         self.verify_capture(packets)
840         self.src_if.assert_nothing_captured()
841
842     def test_random(self):
843         """ random order reassembly """
844
845         fragments = list(self.fragments_400)
846         shuffle(fragments)
847
848         self.pg_enable_capture()
849         self.src_if.add_stream(fragments)
850         self.pg_start()
851
852         packets = self.dst_if.get_capture(len(self.pkt_infos))
853         self.verify_capture(packets)
854         self.src_if.assert_nothing_captured()
855
856         # run it all again to verify correctness
857         self.pg_enable_capture()
858         self.src_if.add_stream(fragments)
859         self.pg_start()
860
861         packets = self.dst_if.get_capture(len(self.pkt_infos))
862         self.verify_capture(packets)
863         self.src_if.assert_nothing_captured()
864
865     def test_duplicates(self):
866         """ duplicate fragments """
867
868         fragments = [
869             x for (_, frags, _) in self.pkt_infos
870             for x in frags
871             for _ in range(0, min(2, len(frags)))
872         ]
873
874         self.pg_enable_capture()
875         self.src_if.add_stream(fragments)
876         self.pg_start()
877
878         packets = self.dst_if.get_capture(len(self.pkt_infos))
879         self.verify_capture(packets)
880         self.src_if.assert_nothing_captured()
881
882     def test_long_fragment_chain(self):
883         """ long fragment chain """
884
885         error_cnt_str = \
886             "/err/ip6-reassembly-feature/fragment chain too long (drop)"
887
888         error_cnt = self.statistics.get_err_counter(error_cnt_str)
889
890         self.vapi.ip_reassembly_set(timeout_ms=100, max_reassemblies=1000,
891                                     max_reassembly_length=3,
892                                     expire_walk_interval_ms=50, is_ip6=1)
893
894         p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
895              IPv6(src=self.src_if.remote_ip6,
896                   dst=self.dst_if.remote_ip6) /
897              UDP(sport=1234, dport=5678) /
898              Raw("X" * 1000))
899         frags = fragment_rfc8200(p, 1, 300) + fragment_rfc8200(p, 2, 500)
900
901         self.pg_enable_capture()
902         self.src_if.add_stream(frags)
903         self.pg_start()
904
905         self.dst_if.get_capture(1)
906         self.assert_error_counter_equal(error_cnt_str, error_cnt + 1)
907
908     def test_overlap1(self):
909         """ overlapping fragments case #1 """
910
911         fragments = []
912         for _, frags_400, frags_300 in self.pkt_infos:
913             if len(frags_300) == 1:
914                 fragments.extend(frags_400)
915             else:
916                 for i, j in zip(frags_300, frags_400):
917                     fragments.extend(i)
918                     fragments.extend(j)
919
920         dropped_packet_indexes = set(
921             index for (index, _, frags) in self.pkt_infos if len(frags) > 1
922         )
923
924         self.pg_enable_capture()
925         self.src_if.add_stream(fragments)
926         self.pg_start()
927
928         packets = self.dst_if.get_capture(
929             len(self.pkt_infos) - len(dropped_packet_indexes))
930         self.verify_capture(packets, dropped_packet_indexes)
931         self.src_if.assert_nothing_captured()
932
933     def test_overlap2(self):
934         """ overlapping fragments case #2 """
935
936         fragments = []
937         for _, frags_400, frags_300 in self.pkt_infos:
938             if len(frags_400) == 1:
939                 fragments.extend(frags_400)
940             else:
941                 # care must be taken here so that there are no fragments
942                 # received by vpp after reassembly is finished, otherwise
943                 # new reassemblies will be started and packet generator will
944                 # freak out when it detects unfreed buffers
945                 zipped = zip(frags_400, frags_300)
946                 for i, j in zipped:
947                     fragments.extend(i)
948                     fragments.extend(j)
949                 fragments.pop()
950
951         dropped_packet_indexes = set(
952             index for (index, _, frags) in self.pkt_infos if len(frags) > 1
953         )
954
955         self.pg_enable_capture()
956         self.src_if.add_stream(fragments)
957         self.pg_start()
958
959         packets = self.dst_if.get_capture(
960             len(self.pkt_infos) - len(dropped_packet_indexes))
961         self.verify_capture(packets, dropped_packet_indexes)
962         self.src_if.assert_nothing_captured()
963
964     def test_timeout_inline(self):
965         """ timeout (inline) """
966
967         dropped_packet_indexes = set(
968             index for (index, frags, _) in self.pkt_infos if len(frags) > 1
969         )
970
971         self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000,
972                                     max_reassembly_length=3,
973                                     expire_walk_interval_ms=10000, is_ip6=1)
974
975         self.pg_enable_capture()
976         self.src_if.add_stream(self.fragments_400)
977         self.pg_start()
978
979         packets = self.dst_if.get_capture(
980             len(self.pkt_infos) - len(dropped_packet_indexes))
981         self.verify_capture(packets, dropped_packet_indexes)
982         pkts = self.src_if.get_capture(
983             expected_count=len(dropped_packet_indexes))
984         for icmp in pkts:
985             self.assertIn(ICMPv6TimeExceeded, icmp)
986             self.assertIn(IPv6ExtHdrFragment, icmp)
987             self.assertIn(icmp[IPv6ExtHdrFragment].id, dropped_packet_indexes)
988             dropped_packet_indexes.remove(icmp[IPv6ExtHdrFragment].id)
989
990     def test_timeout_cleanup(self):
991         """ timeout (cleanup) """
992
993         # whole packets + fragmented packets sans last fragment
994         fragments = [
995             x for (_, frags_400, _) in self.pkt_infos
996             for x in frags_400[:-1 if len(frags_400) > 1 else None]
997         ]
998
999         # last fragments for fragmented packets
1000         fragments2 = [frags_400[-1]
1001                       for (_, frags_400, _) in self.pkt_infos
1002                       if len(frags_400) > 1]
1003
1004         dropped_packet_indexes = set(
1005             index for (index, frags_400, _) in self.pkt_infos
1006             if len(frags_400) > 1)
1007
1008         self.vapi.ip_reassembly_set(timeout_ms=100, max_reassemblies=1000,
1009                                     max_reassembly_length=1000,
1010                                     expire_walk_interval_ms=50)
1011
1012         self.vapi.ip_reassembly_set(timeout_ms=100, max_reassemblies=1000,
1013                                     max_reassembly_length=1000,
1014                                     expire_walk_interval_ms=50, is_ip6=1)
1015
1016         self.pg_enable_capture()
1017         self.src_if.add_stream(fragments)
1018         self.pg_start()
1019
1020         self.sleep(.25, "wait before sending rest of fragments")
1021
1022         self.src_if.add_stream(fragments2)
1023         self.pg_start()
1024
1025         packets = self.dst_if.get_capture(
1026             len(self.pkt_infos) - len(dropped_packet_indexes))
1027         self.verify_capture(packets, dropped_packet_indexes)
1028         pkts = self.src_if.get_capture(
1029             expected_count=len(dropped_packet_indexes))
1030         for icmp in pkts:
1031             self.assertIn(ICMPv6TimeExceeded, icmp)
1032             self.assertIn(IPv6ExtHdrFragment, icmp)
1033             self.assertIn(icmp[IPv6ExtHdrFragment].id, dropped_packet_indexes)
1034             dropped_packet_indexes.remove(icmp[IPv6ExtHdrFragment].id)
1035
1036     def test_disabled(self):
1037         """ reassembly disabled """
1038
1039         dropped_packet_indexes = set(
1040             index for (index, frags_400, _) in self.pkt_infos
1041             if len(frags_400) > 1)
1042
1043         self.vapi.ip_reassembly_set(timeout_ms=1000, max_reassemblies=0,
1044                                     max_reassembly_length=3,
1045                                     expire_walk_interval_ms=10000, is_ip6=1)
1046
1047         self.pg_enable_capture()
1048         self.src_if.add_stream(self.fragments_400)
1049         self.pg_start()
1050
1051         packets = self.dst_if.get_capture(
1052             len(self.pkt_infos) - len(dropped_packet_indexes))
1053         self.verify_capture(packets, dropped_packet_indexes)
1054         self.src_if.assert_nothing_captured()
1055
1056     def test_missing_upper(self):
1057         """ missing upper layer """
1058         p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
1059              IPv6(src=self.src_if.remote_ip6,
1060                   dst=self.src_if.local_ip6) /
1061              UDP(sport=1234, dport=5678) /
1062              Raw())
1063         self.extend_packet(p, 1000, self.padding)
1064         fragments = fragment_rfc8200(p, 1, 500)
1065         bad_fragment = p.__class__(scapy.compat.raw(fragments[1]))
1066         bad_fragment[IPv6ExtHdrFragment].nh = 59
1067         bad_fragment[IPv6ExtHdrFragment].offset = 0
1068         self.pg_enable_capture()
1069         self.src_if.add_stream([bad_fragment])
1070         self.pg_start()
1071         pkts = self.src_if.get_capture(expected_count=1)
1072         icmp = pkts[0]
1073         self.assertIn(ICMPv6ParamProblem, icmp)
1074         self.assert_equal(icmp[ICMPv6ParamProblem].code, 3, "ICMP code")
1075
1076     def test_invalid_frag_size(self):
1077         """ fragment size not a multiple of 8 """
1078         p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
1079              IPv6(src=self.src_if.remote_ip6,
1080                   dst=self.src_if.local_ip6) /
1081              UDP(sport=1234, dport=5678) /
1082              Raw())
1083         self.extend_packet(p, 1000, self.padding)
1084         fragments = fragment_rfc8200(p, 1, 500)
1085         bad_fragment = fragments[0]
1086         self.extend_packet(bad_fragment, len(bad_fragment) + 5)
1087         self.pg_enable_capture()
1088         self.src_if.add_stream([bad_fragment])
1089         self.pg_start()
1090         pkts = self.src_if.get_capture(expected_count=1)
1091         icmp = pkts[0]
1092         self.assertIn(ICMPv6ParamProblem, icmp)
1093         self.assert_equal(icmp[ICMPv6ParamProblem].code, 0, "ICMP code")
1094
1095     def test_invalid_packet_size(self):
1096         """ total packet size > 65535 """
1097         p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
1098              IPv6(src=self.src_if.remote_ip6,
1099                   dst=self.src_if.local_ip6) /
1100              UDP(sport=1234, dport=5678) /
1101              Raw())
1102         self.extend_packet(p, 1000, self.padding)
1103         fragments = fragment_rfc8200(p, 1, 500)
1104         bad_fragment = fragments[1]
1105         bad_fragment[IPv6ExtHdrFragment].offset = 65500
1106         self.pg_enable_capture()
1107         self.src_if.add_stream([bad_fragment])
1108         self.pg_start()
1109         pkts = self.src_if.get_capture(expected_count=1)
1110         icmp = pkts[0]
1111         self.assertIn(ICMPv6ParamProblem, icmp)
1112         self.assert_equal(icmp[ICMPv6ParamProblem].code, 0, "ICMP code")
1113
1114
1115 class TestIPv6MWReassembly(VppTestCase):
1116     """ IPv6 Reassembly (multiple workers) """
1117     worker_config = "workers %d" % worker_count
1118
1119     @classmethod
1120     def setUpClass(cls):
1121         super(TestIPv6MWReassembly, cls).setUpClass()
1122
1123         cls.create_pg_interfaces(range(worker_count+1))
1124         cls.src_if = cls.pg0
1125         cls.send_ifs = cls.pg_interfaces[:-1]
1126         cls.dst_if = cls.pg_interfaces[-1]
1127
1128         # setup all interfaces
1129         for i in cls.pg_interfaces:
1130             i.admin_up()
1131             i.config_ip6()
1132             i.resolve_ndp()
1133
1134         # packets sizes reduced here because we are generating packets without
1135         # Ethernet headers, which are added later (diff fragments go via
1136         # different interfaces)
1137         cls.packet_sizes = [64-len(Ether()), 512-len(Ether()),
1138                             1518-len(Ether()), 9018-len(Ether())]
1139         cls.padding = " abcdefghijklmn"
1140         cls.create_stream(cls.packet_sizes)
1141         cls.create_fragments()
1142
1143     @classmethod
1144     def tearDownClass(cls):
1145         super(TestIPv6MWReassembly, cls).tearDownClass()
1146
1147     def setUp(self):
1148         """ Test setup - force timeout on existing reassemblies """
1149         super(TestIPv6MWReassembly, self).setUp()
1150         for intf in self.send_ifs:
1151             self.vapi.ip_reassembly_enable_disable(
1152                 sw_if_index=intf.sw_if_index, enable_ip6=True)
1153         self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000,
1154                                     max_reassembly_length=1000,
1155                                     expire_walk_interval_ms=10, is_ip6=1)
1156         self.sleep(.25)
1157         self.vapi.ip_reassembly_set(timeout_ms=1000000, max_reassemblies=1000,
1158                                     max_reassembly_length=1000,
1159                                     expire_walk_interval_ms=1000, is_ip6=1)
1160
1161     def tearDown(self):
1162         super(TestIPv6MWReassembly, self).tearDown()
1163
1164     def show_commands_at_teardown(self):
1165         self.logger.debug(self.vapi.ppcli("show ip6-reassembly details"))
1166         self.logger.debug(self.vapi.ppcli("show buffers"))
1167
1168     @classmethod
1169     def create_stream(cls, packet_sizes, packet_count=test_packet_count):
1170         """Create input packet stream
1171
1172         :param list packet_sizes: Required packet sizes.
1173         """
1174         for i in range(0, packet_count):
1175             info = cls.create_packet_info(cls.src_if, cls.src_if)
1176             payload = cls.info_to_payload(info)
1177             p = (IPv6(src=cls.src_if.remote_ip6,
1178                       dst=cls.dst_if.remote_ip6) /
1179                  UDP(sport=1234, dport=5678) /
1180                  Raw(payload))
1181             size = packet_sizes[(i // 2) % len(packet_sizes)]
1182             cls.extend_packet(p, size, cls.padding)
1183             info.data = p
1184
1185     @classmethod
1186     def create_fragments(cls):
1187         infos = cls._packet_infos
1188         cls.pkt_infos = []
1189         for index, info in six.iteritems(infos):
1190             p = info.data
1191             # cls.logger.debug(ppp("Packet:",
1192             #                      p.__class__(scapy.compat.raw(p))))
1193             fragments_400 = fragment_rfc8200(p, index, 400)
1194             cls.pkt_infos.append((index, fragments_400))
1195         cls.fragments_400 = [
1196             x for (_, frags) in cls.pkt_infos for x in frags]
1197         cls.logger.debug("Fragmented %s packets into %s 400-byte fragments, " %
1198                          (len(infos), len(cls.fragments_400)))
1199
1200     def verify_capture(self, capture, dropped_packet_indexes=[]):
1201         """Verify captured packet strea .
1202
1203         :param list capture: Captured packet stream.
1204         """
1205         info = None
1206         seen = set()
1207         for packet in capture:
1208             try:
1209                 self.logger.debug(ppp("Got packet:", packet))
1210                 ip = packet[IPv6]
1211                 udp = packet[UDP]
1212                 payload_info = self.payload_to_info(packet[Raw])
1213                 packet_index = payload_info.index
1214                 self.assertTrue(
1215                     packet_index not in dropped_packet_indexes,
1216                     ppp("Packet received, but should be dropped:", packet))
1217                 if packet_index in seen:
1218                     raise Exception(ppp("Duplicate packet received", packet))
1219                 seen.add(packet_index)
1220                 self.assertEqual(payload_info.dst, self.src_if.sw_if_index)
1221                 info = self._packet_infos[packet_index]
1222                 self.assertTrue(info is not None)
1223                 self.assertEqual(packet_index, info.index)
1224                 saved_packet = info.data
1225                 self.assertEqual(ip.src, saved_packet[IPv6].src)
1226                 self.assertEqual(ip.dst, saved_packet[IPv6].dst)
1227                 self.assertEqual(udp.payload, saved_packet[UDP].payload)
1228             except Exception:
1229                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1230                 raise
1231         for index in self._packet_infos:
1232             self.assertTrue(index in seen or index in dropped_packet_indexes,
1233                             "Packet with packet_index %d not received" % index)
1234
1235     def send_packets(self, packets):
1236         for counter in range(worker_count):
1237             if 0 == len(packets[counter]):
1238                 continue
1239             send_if = self.send_ifs[counter]
1240             send_if.add_stream(
1241                 (Ether(dst=send_if.local_mac, src=send_if.remote_mac) / x
1242                  for x in packets[counter]),
1243                 worker=counter)
1244         self.pg_start()
1245
1246     def test_worker_conflict(self):
1247         """ 1st and FO=0 fragments on different workers """
1248
1249         # in first wave we send fragments which don't start at offset 0
1250         # then we send fragments with offset 0 on a different thread
1251         # then the rest of packets on a random thread
1252         first_packets = [[] for n in range(worker_count)]
1253         second_packets = [[] for n in range(worker_count)]
1254         rest_of_packets = [[] for n in range(worker_count)]
1255         for (_, p) in self.pkt_infos:
1256             wi = randrange(worker_count)
1257             second_packets[wi].append(p[0])
1258             if len(p) <= 1:
1259                 continue
1260             wi2 = wi
1261             while wi2 == wi:
1262                 wi2 = randrange(worker_count)
1263             first_packets[wi2].append(p[1])
1264             wi3 = randrange(worker_count)
1265             rest_of_packets[wi3].extend(p[2:])
1266
1267         self.pg_enable_capture()
1268         self.send_packets(first_packets)
1269         self.send_packets(second_packets)
1270         self.send_packets(rest_of_packets)
1271
1272         packets = self.dst_if.get_capture(len(self.pkt_infos))
1273         self.verify_capture(packets)
1274         for send_if in self.send_ifs:
1275             send_if.assert_nothing_captured()
1276
1277         self.pg_enable_capture()
1278         self.send_packets(first_packets)
1279         self.send_packets(second_packets)
1280         self.send_packets(rest_of_packets)
1281
1282         packets = self.dst_if.get_capture(len(self.pkt_infos))
1283         self.verify_capture(packets)
1284         for send_if in self.send_ifs:
1285             send_if.assert_nothing_captured()
1286
1287
1288 class TestIPv4ReassemblyLocalNode(VppTestCase):
1289     """ IPv4 Reassembly for packets coming to ip4-local node """
1290
1291     @classmethod
1292     def setUpClass(cls):
1293         super(TestIPv4ReassemblyLocalNode, cls).setUpClass()
1294
1295         cls.create_pg_interfaces([0])
1296         cls.src_dst_if = cls.pg0
1297
1298         # setup all interfaces
1299         for i in cls.pg_interfaces:
1300             i.admin_up()
1301             i.config_ip4()
1302             i.resolve_arp()
1303
1304         cls.padding = " abcdefghijklmn"
1305         cls.create_stream()
1306         cls.create_fragments()
1307
1308     @classmethod
1309     def tearDownClass(cls):
1310         super(TestIPv4ReassemblyLocalNode, cls).tearDownClass()
1311
1312     def setUp(self):
1313         """ Test setup - force timeout on existing reassemblies """
1314         super(TestIPv4ReassemblyLocalNode, self).setUp()
1315         self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000,
1316                                     max_reassembly_length=1000,
1317                                     expire_walk_interval_ms=10)
1318         self.sleep(.25)
1319         self.vapi.ip_reassembly_set(timeout_ms=1000000, max_reassemblies=1000,
1320                                     max_reassembly_length=1000,
1321                                     expire_walk_interval_ms=10000)
1322
1323     def tearDown(self):
1324         super(TestIPv4ReassemblyLocalNode, self).tearDown()
1325
1326     def show_commands_at_teardown(self):
1327         self.logger.debug(self.vapi.ppcli("show ip4-reassembly details"))
1328         self.logger.debug(self.vapi.ppcli("show buffers"))
1329
1330     @classmethod
1331     def create_stream(cls, packet_count=test_packet_count):
1332         """Create input packet stream for defined interface.
1333
1334         :param list packet_sizes: Required packet sizes.
1335         """
1336         for i in range(0, packet_count):
1337             info = cls.create_packet_info(cls.src_dst_if, cls.src_dst_if)
1338             payload = cls.info_to_payload(info)
1339             p = (Ether(dst=cls.src_dst_if.local_mac,
1340                        src=cls.src_dst_if.remote_mac) /
1341                  IP(id=info.index, src=cls.src_dst_if.remote_ip4,
1342                     dst=cls.src_dst_if.local_ip4) /
1343                  ICMP(type='echo-request', id=1234) /
1344                  Raw(payload))
1345             cls.extend_packet(p, 1518, cls.padding)
1346             info.data = p
1347
1348     @classmethod
1349     def create_fragments(cls):
1350         infos = cls._packet_infos
1351         cls.pkt_infos = []
1352         for index, info in six.iteritems(infos):
1353             p = info.data
1354             # cls.logger.debug(ppp("Packet:",
1355             #                      p.__class__(scapy.compat.raw(p))))
1356             fragments_300 = fragment_rfc791(p, 300)
1357             cls.pkt_infos.append((index, fragments_300))
1358         cls.fragments_300 = [x for (_, frags) in cls.pkt_infos for x in frags]
1359         cls.logger.debug("Fragmented %s packets into %s 300-byte fragments" %
1360                          (len(infos), len(cls.fragments_300)))
1361
1362     def verify_capture(self, capture):
1363         """Verify captured packet stream.
1364
1365         :param list capture: Captured packet stream.
1366         """
1367         info = None
1368         seen = set()
1369         for packet in capture:
1370             try:
1371                 self.logger.debug(ppp("Got packet:", packet))
1372                 ip = packet[IP]
1373                 icmp = packet[ICMP]
1374                 payload_info = self.payload_to_info(packet[Raw])
1375                 packet_index = payload_info.index
1376                 if packet_index in seen:
1377                     raise Exception(ppp("Duplicate packet received", packet))
1378                 seen.add(packet_index)
1379                 self.assertEqual(payload_info.dst, self.src_dst_if.sw_if_index)
1380                 info = self._packet_infos[packet_index]
1381                 self.assertIsNotNone(info)
1382                 self.assertEqual(packet_index, info.index)
1383                 saved_packet = info.data
1384                 self.assertEqual(ip.src, saved_packet[IP].dst)
1385                 self.assertEqual(ip.dst, saved_packet[IP].src)
1386                 self.assertEqual(icmp.type, 0)  # echo reply
1387                 self.assertEqual(icmp.id, saved_packet[ICMP].id)
1388                 self.assertEqual(icmp.payload, saved_packet[ICMP].payload)
1389             except Exception:
1390                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1391                 raise
1392         for index in self._packet_infos:
1393             self.assertIn(index, seen,
1394                           "Packet with packet_index %d not received" % index)
1395
1396     def test_reassembly(self):
1397         """ basic reassembly """
1398
1399         self.pg_enable_capture()
1400         self.src_dst_if.add_stream(self.fragments_300)
1401         self.pg_start()
1402
1403         packets = self.src_dst_if.get_capture(len(self.pkt_infos))
1404         self.verify_capture(packets)
1405
1406         # run it all again to verify correctness
1407         self.pg_enable_capture()
1408         self.src_dst_if.add_stream(self.fragments_300)
1409         self.pg_start()
1410
1411         packets = self.src_dst_if.get_capture(len(self.pkt_infos))
1412         self.verify_capture(packets)
1413
1414
1415 class TestFIFReassembly(VppTestCase):
1416     """ Fragments in fragments reassembly """
1417
1418     @classmethod
1419     def setUpClass(cls):
1420         super(TestFIFReassembly, cls).setUpClass()
1421
1422         cls.create_pg_interfaces([0, 1])
1423         cls.src_if = cls.pg0
1424         cls.dst_if = cls.pg1
1425         for i in cls.pg_interfaces:
1426             i.admin_up()
1427             i.config_ip4()
1428             i.resolve_arp()
1429             i.config_ip6()
1430             i.resolve_ndp()
1431
1432         cls.packet_sizes = [64, 512, 1518, 9018]
1433         cls.padding = " abcdefghijklmn"
1434
1435     @classmethod
1436     def tearDownClass(cls):
1437         super(TestFIFReassembly, cls).tearDownClass()
1438
1439     def setUp(self):
1440         """ Test setup - force timeout on existing reassemblies """
1441         super(TestFIFReassembly, self).setUp()
1442         self.vapi.ip_reassembly_enable_disable(
1443             sw_if_index=self.src_if.sw_if_index, enable_ip4=True,
1444             enable_ip6=True)
1445         self.vapi.ip_reassembly_enable_disable(
1446             sw_if_index=self.dst_if.sw_if_index, enable_ip4=True,
1447             enable_ip6=True)
1448         self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000,
1449                                     max_reassembly_length=1000,
1450                                     expire_walk_interval_ms=10)
1451         self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000,
1452                                     max_reassembly_length=1000,
1453                                     expire_walk_interval_ms=10, is_ip6=1)
1454         self.sleep(.25)
1455         self.vapi.ip_reassembly_set(timeout_ms=1000000, max_reassemblies=1000,
1456                                     max_reassembly_length=1000,
1457                                     expire_walk_interval_ms=10000)
1458         self.vapi.ip_reassembly_set(timeout_ms=1000000, max_reassemblies=1000,
1459                                     max_reassembly_length=1000,
1460                                     expire_walk_interval_ms=10000, is_ip6=1)
1461
1462     def tearDown(self):
1463         super(TestFIFReassembly, self).tearDown()
1464
1465     def show_commands_at_teardown(self):
1466         self.logger.debug(self.vapi.ppcli("show ip4-reassembly details"))
1467         self.logger.debug(self.vapi.ppcli("show ip6-reassembly details"))
1468         self.logger.debug(self.vapi.ppcli("show buffers"))
1469
1470     def verify_capture(self, capture, ip_class, dropped_packet_indexes=[]):
1471         """Verify captured packet stream.
1472
1473         :param list capture: Captured packet stream.
1474         """
1475         info = None
1476         seen = set()
1477         for packet in capture:
1478             try:
1479                 self.logger.debug(ppp("Got packet:", packet))
1480                 ip = packet[ip_class]
1481                 udp = packet[UDP]
1482                 payload_info = self.payload_to_info(packet[Raw])
1483                 packet_index = payload_info.index
1484                 self.assertTrue(
1485                     packet_index not in dropped_packet_indexes,
1486                     ppp("Packet received, but should be dropped:", packet))
1487                 if packet_index in seen:
1488                     raise Exception(ppp("Duplicate packet received", packet))
1489                 seen.add(packet_index)
1490                 self.assertEqual(payload_info.dst, self.dst_if.sw_if_index)
1491                 info = self._packet_infos[packet_index]
1492                 self.assertTrue(info is not None)
1493                 self.assertEqual(packet_index, info.index)
1494                 saved_packet = info.data
1495                 self.assertEqual(ip.src, saved_packet[ip_class].src)
1496                 self.assertEqual(ip.dst, saved_packet[ip_class].dst)
1497                 self.assertEqual(udp.payload, saved_packet[UDP].payload)
1498             except Exception:
1499                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1500                 raise
1501         for index in self._packet_infos:
1502             self.assertTrue(index in seen or index in dropped_packet_indexes,
1503                             "Packet with packet_index %d not received" % index)
1504
1505     def test_fif4(self):
1506         """ Fragments in fragments (4o4) """
1507
1508         # TODO this should be ideally in setUpClass, but then we hit a bug
1509         # with VppIpRoute incorrectly reporting it's present when it's not
1510         # so we need to manually remove the vpp config, thus we cannot have
1511         # it shared for multiple test cases
1512         self.tun_ip4 = "1.1.1.2"
1513
1514         self.gre4 = VppGreInterface(self, self.src_if.local_ip4, self.tun_ip4)
1515         self.gre4.add_vpp_config()
1516         self.gre4.admin_up()
1517         self.gre4.config_ip4()
1518
1519         self.vapi.ip_reassembly_enable_disable(
1520             sw_if_index=self.gre4.sw_if_index, enable_ip4=True)
1521
1522         self.route4 = VppIpRoute(self, self.tun_ip4, 32,
1523                                  [VppRoutePath(self.src_if.remote_ip4,
1524                                                self.src_if.sw_if_index)])
1525         self.route4.add_vpp_config()
1526
1527         self.reset_packet_infos()
1528         for i in range(test_packet_count):
1529             info = self.create_packet_info(self.src_if, self.dst_if)
1530             payload = self.info_to_payload(info)
1531             # Ethernet header here is only for size calculation, thus it
1532             # doesn't matter how it's initialized. This is to ensure that
1533             # reassembled packet is not > 9000 bytes, so that it's not dropped
1534             p = (Ether() /
1535                  IP(id=i, src=self.src_if.remote_ip4,
1536                     dst=self.dst_if.remote_ip4) /
1537                  UDP(sport=1234, dport=5678) /
1538                  Raw(payload))
1539             size = self.packet_sizes[(i // 2) % len(self.packet_sizes)]
1540             self.extend_packet(p, size, self.padding)
1541             info.data = p[IP]  # use only IP part, without ethernet header
1542
1543         fragments = [x for _, p in six.iteritems(self._packet_infos)
1544                      for x in fragment_rfc791(p.data, 400)]
1545
1546         encapped_fragments = \
1547             [Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
1548              IP(src=self.tun_ip4, dst=self.src_if.local_ip4) /
1549                 GRE() /
1550                 p
1551                 for p in fragments]
1552
1553         fragmented_encapped_fragments = \
1554             [x for p in encapped_fragments
1555              for x in fragment_rfc791(p, 200)]
1556
1557         self.src_if.add_stream(fragmented_encapped_fragments)
1558
1559         self.pg_enable_capture(self.pg_interfaces)
1560         self.pg_start()
1561
1562         self.src_if.assert_nothing_captured()
1563         packets = self.dst_if.get_capture(len(self._packet_infos))
1564         self.verify_capture(packets, IP)
1565
1566         # TODO remove gre vpp config by hand until VppIpRoute gets fixed
1567         # so that it's query_vpp_config() works as it should
1568         self.gre4.remove_vpp_config()
1569         self.logger.debug(self.vapi.ppcli("show interface"))
1570
1571     def test_fif6(self):
1572         """ Fragments in fragments (6o6) """
1573         # TODO this should be ideally in setUpClass, but then we hit a bug
1574         # with VppIpRoute incorrectly reporting it's present when it's not
1575         # so we need to manually remove the vpp config, thus we cannot have
1576         # it shared for multiple test cases
1577         self.tun_ip6 = "1002::1"
1578
1579         self.gre6 = VppGreInterface(self, self.src_if.local_ip6, self.tun_ip6)
1580         self.gre6.add_vpp_config()
1581         self.gre6.admin_up()
1582         self.gre6.config_ip6()
1583
1584         self.vapi.ip_reassembly_enable_disable(
1585             sw_if_index=self.gre6.sw_if_index, enable_ip6=True)
1586
1587         self.route6 = VppIpRoute(self, self.tun_ip6, 128,
1588                                  [VppRoutePath(
1589                                      self.src_if.remote_ip6,
1590                                      self.src_if.sw_if_index)])
1591         self.route6.add_vpp_config()
1592
1593         self.reset_packet_infos()
1594         for i in range(test_packet_count):
1595             info = self.create_packet_info(self.src_if, self.dst_if)
1596             payload = self.info_to_payload(info)
1597             # Ethernet header here is only for size calculation, thus it
1598             # doesn't matter how it's initialized. This is to ensure that
1599             # reassembled packet is not > 9000 bytes, so that it's not dropped
1600             p = (Ether() /
1601                  IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6) /
1602                  UDP(sport=1234, dport=5678) /
1603                  Raw(payload))
1604             size = self.packet_sizes[(i // 2) % len(self.packet_sizes)]
1605             self.extend_packet(p, size, self.padding)
1606             info.data = p[IPv6]  # use only IPv6 part, without ethernet header
1607
1608         fragments = [x for _, i in six.iteritems(self._packet_infos)
1609                      for x in fragment_rfc8200(
1610                          i.data, i.index, 400)]
1611
1612         encapped_fragments = \
1613             [Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
1614              IPv6(src=self.tun_ip6, dst=self.src_if.local_ip6) /
1615                 GRE() /
1616                 p
1617                 for p in fragments]
1618
1619         fragmented_encapped_fragments = \
1620             [x for p in encapped_fragments for x in (
1621                 fragment_rfc8200(
1622                     p,
1623                     2 * len(self._packet_infos) + p[IPv6ExtHdrFragment].id,
1624                     200)
1625                 if IPv6ExtHdrFragment in p else [p]
1626             )
1627             ]
1628
1629         self.src_if.add_stream(fragmented_encapped_fragments)
1630
1631         self.pg_enable_capture(self.pg_interfaces)
1632         self.pg_start()
1633
1634         self.src_if.assert_nothing_captured()
1635         packets = self.dst_if.get_capture(len(self._packet_infos))
1636         self.verify_capture(packets, IPv6)
1637
1638         # TODO remove gre vpp config by hand until VppIpRoute gets fixed
1639         # so that it's query_vpp_config() works as it should
1640         self.gre6.remove_vpp_config()
1641
1642
1643 if __name__ == '__main__':
1644     unittest.main(testRunner=VppTestRunner)