tests: Use errno value rather than a specific int
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python3
2 """ BFD tests """
3
4 from __future__ import division
5
6 import binascii
7 from collections import namedtuple
8 import hashlib
9 import ipaddress
10 import reprlib
11 import time
12 import unittest
13 from random import randint, shuffle, getrandbits
14 from socket import AF_INET, AF_INET6
15
16 import scapy.compat
17 from scapy.layers.inet import UDP, IP
18 from scapy.layers.inet6 import IPv6
19 from scapy.layers.l2 import Ether, GRE
20 from scapy.packet import Raw
21
22 from config import config
23 from bfd import (
24     VppBFDAuthKey,
25     BFD,
26     BFDAuthType,
27     VppBFDUDPSession,
28     BFDDiagCode,
29     BFDState,
30     BFD_vpp_echo,
31 )
32 from framework import VppTestCase
33 from asfframework import (
34     tag_fixme_vpp_workers,
35     tag_fixme_debian11,
36     tag_run_solo,
37     VppTestRunner,
38 )
39 from util import ppp
40 from vpp_ip import DpoProto
41 from vpp_ip_route import VppIpRoute, VppRoutePath
42 from vpp_lo_interface import VppLoInterface
43 from vpp_papi_provider import UnexpectedApiReturnValueError, CliFailedCommandError
44 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
45 from vpp_gre_interface import VppGreInterface
46
47 USEC_IN_SEC = 1000000
48
49
50 class AuthKeyFactory(object):
51     """Factory class for creating auth keys with unique conf key ID"""
52
53     def __init__(self):
54         self._conf_key_ids = {}
55
56     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
57         """create a random key with unique conf key id"""
58         conf_key_id = randint(0, 0xFFFFFFFF)
59         while conf_key_id in self._conf_key_ids:
60             conf_key_id = randint(0, 0xFFFFFFFF)
61         self._conf_key_ids[conf_key_id] = 1
62         key = scapy.compat.raw(
63             bytearray([randint(0, 255) for _ in range(randint(1, 20))])
64         )
65         return VppBFDAuthKey(
66             test=test, auth_type=auth_type, conf_key_id=conf_key_id, key=key
67         )
68
69
70 class BFDAPITestCase(VppTestCase):
71     """Bidirectional Forwarding Detection (BFD) - API"""
72
73     pg0 = None
74     pg1 = None
75
76     @classmethod
77     def setUpClass(cls):
78         super(BFDAPITestCase, cls).setUpClass()
79         cls.vapi.cli("set log class bfd level debug")
80         try:
81             cls.create_pg_interfaces(range(2))
82             for i in cls.pg_interfaces:
83                 i.config_ip4()
84                 i.config_ip6()
85                 i.resolve_arp()
86
87         except Exception:
88             super(BFDAPITestCase, cls).tearDownClass()
89             raise
90
91     @classmethod
92     def tearDownClass(cls):
93         super(BFDAPITestCase, cls).tearDownClass()
94
95     def setUp(self):
96         super(BFDAPITestCase, self).setUp()
97         self.factory = AuthKeyFactory()
98
99     def test_add_bfd(self):
100         """create a BFD session"""
101         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
102         session.add_vpp_config()
103         self.logger.debug("Session state is %s", session.state)
104         session.remove_vpp_config()
105         session.add_vpp_config()
106         self.logger.debug("Session state is %s", session.state)
107         session.remove_vpp_config()
108
109     def test_double_add(self):
110         """create the same BFD session twice (negative case)"""
111         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
112         session.add_vpp_config()
113
114         with self.vapi.assert_negative_api_retval():
115             session.add_vpp_config()
116
117         session.remove_vpp_config()
118
119     def test_add_bfd6(self):
120         """create IPv6 BFD session"""
121         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
122         session.add_vpp_config()
123         self.logger.debug("Session state is %s", session.state)
124         session.remove_vpp_config()
125         session.add_vpp_config()
126         self.logger.debug("Session state is %s", session.state)
127         session.remove_vpp_config()
128
129     def test_mod_bfd(self):
130         """modify BFD session parameters"""
131         session = VppBFDUDPSession(
132             self,
133             self.pg0,
134             self.pg0.remote_ip4,
135             desired_min_tx=50000,
136             required_min_rx=10000,
137             detect_mult=1,
138         )
139         session.add_vpp_config()
140         s = session.get_bfd_udp_session_dump_entry()
141         self.assert_equal(
142             session.desired_min_tx, s.desired_min_tx, "desired min transmit interval"
143         )
144         self.assert_equal(
145             session.required_min_rx, s.required_min_rx, "required min receive interval"
146         )
147         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
148         session.modify_parameters(
149             desired_min_tx=session.desired_min_tx * 2,
150             required_min_rx=session.required_min_rx * 2,
151             detect_mult=session.detect_mult * 2,
152         )
153         s = session.get_bfd_udp_session_dump_entry()
154         self.assert_equal(
155             session.desired_min_tx, s.desired_min_tx, "desired min transmit interval"
156         )
157         self.assert_equal(
158             session.required_min_rx, s.required_min_rx, "required min receive interval"
159         )
160         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
161
162     def test_upd_bfd(self):
163         """Create/Modify w/ Update BFD session parameters"""
164         session = VppBFDUDPSession(
165             self,
166             self.pg0,
167             self.pg0.remote_ip4,
168             desired_min_tx=50000,
169             required_min_rx=10000,
170             detect_mult=1,
171         )
172         session.upd_vpp_config()
173         s = session.get_bfd_udp_session_dump_entry()
174         self.assert_equal(
175             session.desired_min_tx, s.desired_min_tx, "desired min transmit interval"
176         )
177         self.assert_equal(
178             session.required_min_rx, s.required_min_rx, "required min receive interval"
179         )
180
181         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
182         session.upd_vpp_config(
183             desired_min_tx=session.desired_min_tx * 2,
184             required_min_rx=session.required_min_rx * 2,
185             detect_mult=session.detect_mult * 2,
186         )
187         s = session.get_bfd_udp_session_dump_entry()
188         self.assert_equal(
189             session.desired_min_tx, s.desired_min_tx, "desired min transmit interval"
190         )
191         self.assert_equal(
192             session.required_min_rx, s.required_min_rx, "required min receive interval"
193         )
194         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
195
196     def test_add_sha1_keys(self):
197         """add SHA1 keys"""
198         key_count = 10
199         keys = [self.factory.create_random_key(self) for i in range(0, key_count)]
200         for key in keys:
201             self.assertFalse(key.query_vpp_config())
202         for key in keys:
203             key.add_vpp_config()
204         for key in keys:
205             self.assertTrue(key.query_vpp_config())
206         # remove randomly
207         indexes = list(range(key_count))
208         shuffle(indexes)
209         removed = []
210         for i in indexes:
211             key = keys[i]
212             key.remove_vpp_config()
213             removed.append(i)
214             for j in range(key_count):
215                 key = keys[j]
216                 if j in removed:
217                     self.assertFalse(key.query_vpp_config())
218                 else:
219                     self.assertTrue(key.query_vpp_config())
220         # should be removed now
221         for key in keys:
222             self.assertFalse(key.query_vpp_config())
223         # add back and remove again
224         for key in keys:
225             key.add_vpp_config()
226         for key in keys:
227             self.assertTrue(key.query_vpp_config())
228         for key in keys:
229             key.remove_vpp_config()
230         for key in keys:
231             self.assertFalse(key.query_vpp_config())
232
233     def test_add_bfd_sha1(self):
234         """create a BFD session (SHA1)"""
235         key = self.factory.create_random_key(self)
236         key.add_vpp_config()
237         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
238         session.add_vpp_config()
239         self.logger.debug("Session state is %s", session.state)
240         session.remove_vpp_config()
241         session.add_vpp_config()
242         self.logger.debug("Session state is %s", session.state)
243         session.remove_vpp_config()
244
245     def test_double_add_sha1(self):
246         """create the same BFD session twice (negative case) (SHA1)"""
247         key = self.factory.create_random_key(self)
248         key.add_vpp_config()
249         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
250         session.add_vpp_config()
251         with self.assertRaises(Exception):
252             session.add_vpp_config()
253
254     def test_add_auth_nonexistent_key(self):
255         """create BFD session using non-existent SHA1 (negative case)"""
256         session = VppBFDUDPSession(
257             self,
258             self.pg0,
259             self.pg0.remote_ip4,
260             sha1_key=self.factory.create_random_key(self),
261         )
262         with self.assertRaises(Exception):
263             session.add_vpp_config()
264
265     def test_shared_sha1_key(self):
266         """single SHA1 key shared by multiple BFD sessions"""
267         key = self.factory.create_random_key(self)
268         key.add_vpp_config()
269         sessions = [
270             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4, sha1_key=key),
271             VppBFDUDPSession(
272                 self, self.pg0, self.pg0.remote_ip6, sha1_key=key, af=AF_INET6
273             ),
274             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4, sha1_key=key),
275             VppBFDUDPSession(
276                 self, self.pg1, self.pg1.remote_ip6, sha1_key=key, af=AF_INET6
277             ),
278         ]
279         for s in sessions:
280             s.add_vpp_config()
281         removed = 0
282         for s in sessions:
283             e = key.get_bfd_auth_keys_dump_entry()
284             self.assert_equal(
285                 e.use_count, len(sessions) - removed, "Use count for shared key"
286             )
287             s.remove_vpp_config()
288             removed += 1
289         e = key.get_bfd_auth_keys_dump_entry()
290         self.assert_equal(
291             e.use_count, len(sessions) - removed, "Use count for shared key"
292         )
293
294     def test_activate_auth(self):
295         """activate SHA1 authentication"""
296         key = self.factory.create_random_key(self)
297         key.add_vpp_config()
298         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
299         session.add_vpp_config()
300         session.activate_auth(key)
301
302     def test_deactivate_auth(self):
303         """deactivate SHA1 authentication"""
304         key = self.factory.create_random_key(self)
305         key.add_vpp_config()
306         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
307         session.add_vpp_config()
308         session.activate_auth(key)
309         session.deactivate_auth()
310
311     def test_change_key(self):
312         """change SHA1 key"""
313         key1 = self.factory.create_random_key(self)
314         key2 = self.factory.create_random_key(self)
315         while key2.conf_key_id == key1.conf_key_id:
316             key2 = self.factory.create_random_key(self)
317         key1.add_vpp_config()
318         key2.add_vpp_config()
319         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4, sha1_key=key1)
320         session.add_vpp_config()
321         session.activate_auth(key2)
322
323     def test_set_del_udp_echo_source(self):
324         """set/del udp echo source"""
325         self.create_loopback_interfaces(1)
326         self.loopback0 = self.lo_interfaces[0]
327         self.loopback0.admin_up()
328         echo_source = self.vapi.bfd_udp_get_echo_source()
329         self.assertFalse(echo_source.is_set)
330         self.assertFalse(echo_source.have_usable_ip4)
331         self.assertFalse(echo_source.have_usable_ip6)
332
333         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
334         echo_source = self.vapi.bfd_udp_get_echo_source()
335         self.assertTrue(echo_source.is_set)
336         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
337         self.assertFalse(echo_source.have_usable_ip4)
338         self.assertFalse(echo_source.have_usable_ip6)
339
340         self.loopback0.config_ip4()
341         echo_ip4 = ipaddress.IPv4Address(
342             int(ipaddress.IPv4Address(self.loopback0.local_ip4)) ^ 1
343         ).packed
344         echo_source = self.vapi.bfd_udp_get_echo_source()
345         self.assertTrue(echo_source.is_set)
346         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
347         self.assertTrue(echo_source.have_usable_ip4)
348         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
349         self.assertFalse(echo_source.have_usable_ip6)
350
351         self.loopback0.config_ip6()
352         echo_ip6 = ipaddress.IPv6Address(
353             int(ipaddress.IPv6Address(self.loopback0.local_ip6)) ^ 1
354         ).packed
355
356         echo_source = self.vapi.bfd_udp_get_echo_source()
357         self.assertTrue(echo_source.is_set)
358         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
359         self.assertTrue(echo_source.have_usable_ip4)
360         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
361         self.assertTrue(echo_source.have_usable_ip6)
362         self.assertEqual(echo_source.ip6_addr.packed, echo_ip6)
363
364         self.vapi.bfd_udp_del_echo_source()
365         echo_source = self.vapi.bfd_udp_get_echo_source()
366         self.assertFalse(echo_source.is_set)
367         self.assertFalse(echo_source.have_usable_ip4)
368         self.assertFalse(echo_source.have_usable_ip6)
369
370
371 class BFDTestSession(object):
372     """BFD session as seen from test framework side"""
373
374     def __init__(
375         self,
376         test,
377         interface,
378         af,
379         detect_mult=3,
380         sha1_key=None,
381         bfd_key_id=None,
382         our_seq_number=None,
383         tunnel_header=None,
384         phy_interface=None,
385     ):
386         self.test = test
387         self.af = af
388         self.sha1_key = sha1_key
389         self.bfd_key_id = bfd_key_id
390         self.interface = interface
391         if phy_interface:
392             self.phy_interface = phy_interface
393         else:
394             self.phy_interface = self.interface
395         self.udp_sport = randint(49152, 65535)
396         if our_seq_number is None:
397             self.our_seq_number = randint(0, 40000000)
398         else:
399             self.our_seq_number = our_seq_number
400         self.vpp_seq_number = None
401         self.my_discriminator = 0
402         self.desired_min_tx = 300000
403         self.required_min_rx = 300000
404         self.required_min_echo_rx = None
405         self.detect_mult = detect_mult
406         self.diag = BFDDiagCode.no_diagnostic
407         self.your_discriminator = None
408         self.state = BFDState.down
409         self.auth_type = BFDAuthType.no_auth
410         self.tunnel_header = tunnel_header
411         self.tx_packets = 0
412         self.rx_packets = 0
413         self.tx_packets_echo = 0
414         self.rx_packets_echo = 0
415
416     def inc_seq_num(self):
417         """increment sequence number, wrapping if needed"""
418         if self.our_seq_number == 0xFFFFFFFF:
419             self.our_seq_number = 0
420         else:
421             self.our_seq_number += 1
422
423     def update(
424         self,
425         my_discriminator=None,
426         your_discriminator=None,
427         desired_min_tx=None,
428         required_min_rx=None,
429         required_min_echo_rx=None,
430         detect_mult=None,
431         diag=None,
432         state=None,
433         auth_type=None,
434     ):
435         """update BFD parameters associated with session"""
436         if my_discriminator is not None:
437             self.my_discriminator = my_discriminator
438         if your_discriminator is not None:
439             self.your_discriminator = your_discriminator
440         if required_min_rx is not None:
441             self.required_min_rx = required_min_rx
442         if required_min_echo_rx is not None:
443             self.required_min_echo_rx = required_min_echo_rx
444         if desired_min_tx is not None:
445             self.desired_min_tx = desired_min_tx
446         if detect_mult is not None:
447             self.detect_mult = detect_mult
448         if diag is not None:
449             self.diag = diag
450         if state is not None:
451             self.state = state
452         if auth_type is not None:
453             self.auth_type = auth_type
454
455     def fill_packet_fields(self, packet):
456         """set packet fields with known values in packet"""
457         bfd = packet[BFD]
458         if self.my_discriminator:
459             self.test.logger.debug(
460                 "BFD: setting packet.my_discriminator=%s", self.my_discriminator
461             )
462             bfd.my_discriminator = self.my_discriminator
463         if self.your_discriminator:
464             self.test.logger.debug(
465                 "BFD: setting packet.your_discriminator=%s", self.your_discriminator
466             )
467             bfd.your_discriminator = self.your_discriminator
468         if self.required_min_rx:
469             self.test.logger.debug(
470                 "BFD: setting packet.required_min_rx_interval=%s", self.required_min_rx
471             )
472             bfd.required_min_rx_interval = self.required_min_rx
473         if self.required_min_echo_rx:
474             self.test.logger.debug(
475                 "BFD: setting packet.required_min_echo_rx=%s", self.required_min_echo_rx
476             )
477             bfd.required_min_echo_rx_interval = self.required_min_echo_rx
478         if self.desired_min_tx:
479             self.test.logger.debug(
480                 "BFD: setting packet.desired_min_tx_interval=%s", self.desired_min_tx
481             )
482             bfd.desired_min_tx_interval = self.desired_min_tx
483         if self.detect_mult:
484             self.test.logger.debug(
485                 "BFD: setting packet.detect_mult=%s", self.detect_mult
486             )
487             bfd.detect_mult = self.detect_mult
488         if self.diag:
489             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
490             bfd.diag = self.diag
491         if self.state:
492             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
493             bfd.state = self.state
494         if self.auth_type:
495             # this is used by a negative test-case
496             self.test.logger.debug("BFD: setting packet.auth_type=%s", self.auth_type)
497             bfd.auth_type = self.auth_type
498
499     def create_packet(self):
500         """create a BFD packet, reflecting the current state of session"""
501         if self.sha1_key:
502             bfd = BFD(flags="A")
503             bfd.auth_type = self.sha1_key.auth_type
504             bfd.auth_len = BFD.sha1_auth_len
505             bfd.auth_key_id = self.bfd_key_id
506             bfd.auth_seq_num = self.our_seq_number
507             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
508         else:
509             bfd = BFD()
510         packet = Ether(
511             src=self.phy_interface.remote_mac, dst=self.phy_interface.local_mac
512         )
513         if self.tunnel_header:
514             packet = packet / self.tunnel_header
515         if self.af == AF_INET6:
516             packet = (
517                 packet
518                 / IPv6(
519                     src=self.interface.remote_ip6,
520                     dst=self.interface.local_ip6,
521                     hlim=255,
522                 )
523                 / UDP(sport=self.udp_sport, dport=BFD.udp_dport)
524                 / bfd
525             )
526         else:
527             packet = (
528                 packet
529                 / IP(
530                     src=self.interface.remote_ip4, dst=self.interface.local_ip4, ttl=255
531                 )
532                 / UDP(sport=self.udp_sport, dport=BFD.udp_dport)
533                 / bfd
534             )
535         self.test.logger.debug("BFD: Creating packet")
536         self.fill_packet_fields(packet)
537         if self.sha1_key:
538             hash_material = (
539                 scapy.compat.raw(packet[BFD])[:32]
540                 + self.sha1_key.key
541                 + b"\0" * (20 - len(self.sha1_key.key))
542             )
543             self.test.logger.debug(
544                 "BFD: Calculated SHA1 hash: %s"
545                 % hashlib.sha1(hash_material).hexdigest()
546             )
547             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
548         return packet
549
550     def send_packet(self, packet=None, interface=None):
551         """send packet on interface, creating the packet if needed"""
552         if packet is None:
553             packet = self.create_packet()
554         if interface is None:
555             interface = self.phy_interface
556         self.test.logger.debug(ppp("Sending packet:", packet))
557         interface.add_stream(packet)
558         self.tx_packets += 1
559         self.test.pg_start()
560
561     def verify_sha1_auth(self, packet):
562         """Verify correctness of authentication in BFD layer."""
563         bfd = packet[BFD]
564         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
565         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type, BFDAuthType)
566         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
567         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
568         if self.vpp_seq_number is None:
569             self.vpp_seq_number = bfd.auth_seq_num
570             self.test.logger.debug(
571                 "Received initial sequence number: %s" % self.vpp_seq_number
572             )
573         else:
574             recvd_seq_num = bfd.auth_seq_num
575             self.test.logger.debug(
576                 "Received followup sequence number: %s" % recvd_seq_num
577             )
578             if self.vpp_seq_number < 0xFFFFFFFF:
579                 if self.sha1_key.auth_type == BFDAuthType.meticulous_keyed_sha1:
580                     self.test.assert_equal(
581                         recvd_seq_num, self.vpp_seq_number + 1, "BFD sequence number"
582                     )
583                 else:
584                     self.test.assert_in_range(
585                         recvd_seq_num,
586                         self.vpp_seq_number,
587                         self.vpp_seq_number + 1,
588                         "BFD sequence number",
589                     )
590             else:
591                 if self.sha1_key.auth_type == BFDAuthType.meticulous_keyed_sha1:
592                     self.test.assert_equal(recvd_seq_num, 0, "BFD sequence number")
593                 else:
594                     self.test.assertIn(
595                         recvd_seq_num,
596                         (self.vpp_seq_number, 0),
597                         "BFD sequence number not one of "
598                         "(%s, 0)" % self.vpp_seq_number,
599                     )
600             self.vpp_seq_number = recvd_seq_num
601         # last 20 bytes represent the hash - so replace them with the key,
602         # pad the result with zeros and hash the result
603         hash_material = (
604             bfd.original[:-20]
605             + self.sha1_key.key
606             + b"\0" * (20 - len(self.sha1_key.key))
607         )
608         expected_hash = hashlib.sha1(hash_material).hexdigest()
609         self.test.assert_equal(
610             binascii.hexlify(bfd.auth_key_hash), expected_hash.encode(), "Auth key hash"
611         )
612
613     def verify_bfd(self, packet):
614         """Verify correctness of BFD layer."""
615         bfd = packet[BFD]
616         self.test.assert_equal(bfd.version, 1, "BFD version")
617         self.test.assert_equal(
618             bfd.your_discriminator, self.my_discriminator, "BFD - your discriminator"
619         )
620         if self.sha1_key:
621             self.verify_sha1_auth(packet)
622
623
624 def bfd_session_up(test):
625     """Bring BFD session up"""
626     test.logger.info("BFD: Waiting for slow hello")
627     p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
628     old_offset = None
629     if hasattr(test, "vpp_clock_offset"):
630         old_offset = test.vpp_clock_offset
631     test.vpp_clock_offset = time.time() - float(p.time)
632     test.logger.debug("BFD: Calculated vpp clock offset: %s", test.vpp_clock_offset)
633     if old_offset:
634         test.assertAlmostEqual(
635             old_offset,
636             test.vpp_clock_offset,
637             delta=0.5,
638             msg="vpp clock offset not stable (new: %s, old: %s)"
639             % (test.vpp_clock_offset, old_offset),
640         )
641     test.logger.info("BFD: Sending Init")
642     test.test_session.update(
643         my_discriminator=randint(0, 40000000),
644         your_discriminator=p[BFD].my_discriminator,
645         state=BFDState.init,
646     )
647     if (
648         test.test_session.sha1_key
649         and test.test_session.sha1_key.auth_type == BFDAuthType.meticulous_keyed_sha1
650     ):
651         test.test_session.inc_seq_num()
652     test.test_session.send_packet()
653     test.logger.info("BFD: Waiting for event")
654     e = test.vapi.wait_for_event(1, "bfd_udp_session_event")
655     verify_event(test, e, expected_state=BFDState.up)
656     test.logger.info("BFD: Session is Up")
657     test.test_session.update(state=BFDState.up)
658     if (
659         test.test_session.sha1_key
660         and test.test_session.sha1_key.auth_type == BFDAuthType.meticulous_keyed_sha1
661     ):
662         test.test_session.inc_seq_num()
663     test.test_session.send_packet()
664     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
665
666
667 def bfd_session_down(test):
668     """Bring BFD session down"""
669     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
670     test.test_session.update(state=BFDState.down)
671     if (
672         test.test_session.sha1_key
673         and test.test_session.sha1_key.auth_type == BFDAuthType.meticulous_keyed_sha1
674     ):
675         test.test_session.inc_seq_num()
676     test.test_session.send_packet()
677     test.logger.info("BFD: Waiting for event")
678     e = test.vapi.wait_for_event(1, "bfd_udp_session_event")
679     verify_event(test, e, expected_state=BFDState.down)
680     test.logger.info("BFD: Session is Down")
681     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
682
683
684 def verify_bfd_session_config(test, session, state=None):
685     dump = session.get_bfd_udp_session_dump_entry()
686     test.assertIsNotNone(dump)
687     # since dump is not none, we have verified that sw_if_index and addresses
688     # are valid (in get_bfd_udp_session_dump_entry)
689     if state:
690         test.assert_equal(dump.state, state, "session state")
691     test.assert_equal(
692         dump.required_min_rx, session.required_min_rx, "required min rx interval"
693     )
694     test.assert_equal(
695         dump.desired_min_tx, session.desired_min_tx, "desired min tx interval"
696     )
697     test.assert_equal(dump.detect_mult, session.detect_mult, "detect multiplier")
698     if session.sha1_key is None:
699         test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
700     else:
701         test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
702         test.assert_equal(dump.bfd_key_id, session.bfd_key_id, "bfd key id")
703         test.assert_equal(
704             dump.conf_key_id, session.sha1_key.conf_key_id, "config key id"
705         )
706
707
708 def verify_ip(test, packet):
709     """Verify correctness of IP layer."""
710     if test.vpp_session.af == AF_INET6:
711         ip = packet[IPv6]
712         local_ip = test.vpp_session.interface.local_ip6
713         remote_ip = test.vpp_session.interface.remote_ip6
714         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
715     else:
716         ip = packet[IP]
717         local_ip = test.vpp_session.interface.local_ip4
718         remote_ip = test.vpp_session.interface.remote_ip4
719         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
720     test.assert_equal(ip.src, local_ip, "IP source address")
721     test.assert_equal(ip.dst, remote_ip, "IP destination address")
722
723
724 def verify_udp(test, packet):
725     """Verify correctness of UDP layer."""
726     udp = packet[UDP]
727     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
728     test.assert_in_range(
729         udp.sport, BFD.udp_sport_min, BFD.udp_sport_max, "UDP source port"
730     )
731
732
733 def verify_event(test, event, expected_state):
734     """Verify correctness of event values."""
735     e = event
736     test.logger.debug("BFD: Event: %s" % reprlib.repr(e))
737     test.assert_equal(
738         e.sw_if_index, test.vpp_session.interface.sw_if_index, "BFD interface index"
739     )
740
741     test.assert_equal(
742         str(e.local_addr), test.vpp_session.local_addr, "Local IPv6 address"
743     )
744     test.assert_equal(str(e.peer_addr), test.vpp_session.peer_addr, "Peer IPv6 address")
745     test.assert_equal(e.state, expected_state, BFDState)
746
747
748 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
749     """wait for BFD packet and verify its correctness
750
751     :param timeout: how long to wait
752     :param pcap_time_min: ignore packets with pcap timestamp lower than this
753
754     :returns: tuple (packet, time spent waiting for packet)
755     """
756     test.logger.info("BFD: Waiting for BFD packet")
757     deadline = time.time() + timeout
758     counter = 0
759     while True:
760         counter += 1
761         # sanity check
762         test.assert_in_range(counter, 0, 100, "number of packets ignored")
763         time_left = deadline - time.time()
764         if time_left < 0:
765             raise CaptureTimeoutError("Packet did not arrive within timeout")
766         p = test.pg0.wait_for_packet(timeout=time_left)
767         test.test_session.rx_packets += 1
768         test.logger.debug(ppp("BFD: Got packet:", p))
769         if pcap_time_min is not None and p.time < pcap_time_min:
770             test.logger.debug(
771                 ppp(
772                     "BFD: ignoring packet (pcap time %s < "
773                     "pcap time min %s):" % (p.time, pcap_time_min),
774                     p,
775                 )
776             )
777         else:
778             break
779     test.logger.debug(test.vapi.ppcli("show trace"))
780     if is_tunnel:
781         # strip an IP layer and move to the next
782         p = p[IP].payload
783
784     bfd = p[BFD]
785     if bfd is None:
786         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
787     if bfd.payload:
788         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
789     verify_ip(test, p)
790     verify_udp(test, p)
791     test.test_session.verify_bfd(p)
792     return p
793
794
795 BFDStats = namedtuple("BFDStats", "rx rx_echo tx tx_echo")
796
797
798 def bfd_grab_stats_snapshot(test, bs_idx=0, thread_index=None):
799     s = test.statistics
800     ti = thread_index
801     if ti is None:
802         rx = s["/bfd/rx-session-counters"][:, bs_idx].sum_packets()
803         rx_echo = s["/bfd/rx-session-echo-counters"][:, bs_idx].sum_packets()
804         tx = s["/bfd/tx-session-counters"][:, bs_idx].sum_packets()
805         tx_echo = s["/bfd/tx-session-echo-counters"][:, bs_idx].sum_packets()
806     else:
807         rx = s["/bfd/rx-session-counters"][ti, bs_idx].sum_packets()
808         rx_echo = s["/bfd/rx-session-echo-counters"][ti, bs_idx].sum_packets()
809         tx = s["/bfd/tx-session-counters"][ti, bs_idx].sum_packets()
810         tx_echo = s["/bfd/tx-session-echo-counters"][ti, bs_idx].sum_packets()
811     return BFDStats(rx, rx_echo, tx, tx_echo)
812
813
814 def bfd_stats_diff(stats_before, stats_after):
815     rx = stats_after.rx - stats_before.rx
816     rx_echo = stats_after.rx_echo - stats_before.rx_echo
817     tx = stats_after.tx - stats_before.tx
818     tx_echo = stats_after.tx_echo - stats_before.tx_echo
819     return BFDStats(rx, rx_echo, tx, tx_echo)
820
821
822 @tag_run_solo
823 @tag_fixme_debian11
824 class BFD4TestCase(VppTestCase):
825     """Bidirectional Forwarding Detection (BFD)"""
826
827     pg0 = None
828     vpp_clock_offset = None
829     vpp_session = None
830     test_session = None
831
832     @classmethod
833     def setUpClass(cls):
834         super(BFD4TestCase, cls).setUpClass()
835         cls.vapi.cli("set log class bfd level debug")
836         try:
837             cls.create_pg_interfaces([0])
838             cls.create_loopback_interfaces(1)
839             cls.loopback0 = cls.lo_interfaces[0]
840             cls.loopback0.config_ip4()
841             cls.loopback0.admin_up()
842             cls.pg0.config_ip4()
843             cls.pg0.configure_ipv4_neighbors()
844             cls.pg0.admin_up()
845             cls.pg0.resolve_arp()
846
847         except Exception:
848             super(BFD4TestCase, cls).tearDownClass()
849             raise
850
851     @classmethod
852     def tearDownClass(cls):
853         super(BFD4TestCase, cls).tearDownClass()
854
855     def setUp(self):
856         super(BFD4TestCase, self).setUp()
857         self.factory = AuthKeyFactory()
858         self.vapi.want_bfd_events()
859         self.pg0.enable_capture()
860         try:
861             self.bfd_udp4_sessions = self.statistics["/bfd/udp4/sessions"]
862             self.bfd_udp6_sessions = self.statistics["/bfd/udp6/sessions"]
863             self.vapi.cli("trace add bfd-process 500")
864             self.vpp_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
865             self.vpp_session.add_vpp_config()
866             self.vpp_session.admin_up()
867             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
868         except BaseException:
869             self.vapi.want_bfd_events(enable_disable=0)
870             raise
871
872     def tearDown(self):
873         if not self.vpp_dead:
874             self.vapi.want_bfd_events(enable_disable=0)
875         self.vapi.collect_events()  # clear the event queue
876         super(BFD4TestCase, self).tearDown()
877
878     def test_session_up(self):
879         """bring BFD session up"""
880         bfd_session_up(self)
881         bfd_udp4_sessions = self.statistics["/bfd/udp4/sessions"]
882         bfd_udp6_sessions = self.statistics["/bfd/udp6/sessions"]
883         self.assert_equal(bfd_udp4_sessions - self.bfd_udp4_sessions, 1)
884         self.assert_equal(bfd_udp6_sessions, self.bfd_udp6_sessions)
885
886     def test_session_up_by_ip(self):
887         """bring BFD session up - first frame looked up by address pair"""
888         self.logger.info("BFD: Sending Slow control frame")
889         self.test_session.update(my_discriminator=randint(0, 40000000))
890         self.test_session.send_packet()
891         self.pg0.enable_capture()
892         p = self.pg0.wait_for_packet(1)
893         self.assert_equal(
894             p[BFD].your_discriminator,
895             self.test_session.my_discriminator,
896             "BFD - your discriminator",
897         )
898         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
899         self.test_session.update(
900             your_discriminator=p[BFD].my_discriminator, state=BFDState.up
901         )
902         self.logger.info("BFD: Waiting for event")
903         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
904         verify_event(self, e, expected_state=BFDState.init)
905         self.logger.info("BFD: Sending Up")
906         self.test_session.send_packet()
907         self.logger.info("BFD: Waiting for event")
908         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
909         verify_event(self, e, expected_state=BFDState.up)
910         self.logger.info("BFD: Session is Up")
911         self.test_session.update(state=BFDState.up)
912         self.test_session.send_packet()
913         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
914
915     def test_session_down(self):
916         """bring BFD session down"""
917         bfd_session_up(self)
918         bfd_session_down(self)
919
920     def test_hold_up(self):
921         """hold BFD session up"""
922         bfd_session_up(self)
923         for dummy in range(self.test_session.detect_mult * 2):
924             wait_for_bfd_packet(self)
925             self.test_session.send_packet()
926         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
927
928     def test_slow_timer(self):
929         """verify slow periodic control frames while session down"""
930         packet_count = 3
931         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
932         prev_packet = wait_for_bfd_packet(self, 2)
933         for dummy in range(packet_count):
934             next_packet = wait_for_bfd_packet(self, 2)
935             time_diff = next_packet.time - prev_packet.time
936             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
937             # to work around timing issues
938             self.assert_in_range(time_diff, 0.70, 1.05, "time between slow packets")
939             prev_packet = next_packet
940
941     def test_zero_remote_min_rx(self):
942         """no packets when zero remote required min rx interval"""
943         bfd_session_up(self)
944         self.test_session.update(required_min_rx=0)
945         self.test_session.send_packet()
946         for dummy in range(self.test_session.detect_mult):
947             self.sleep(
948                 self.vpp_session.required_min_rx / USEC_IN_SEC,
949                 "sleep before transmitting bfd packet",
950             )
951             self.test_session.send_packet()
952             try:
953                 p = wait_for_bfd_packet(self, timeout=0)
954                 self.logger.error(ppp("Received unexpected packet:", p))
955             except CaptureTimeoutError:
956                 pass
957         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
958         self.test_session.update(required_min_rx=300000)
959         for dummy in range(3):
960             self.test_session.send_packet()
961             wait_for_bfd_packet(
962                 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC
963             )
964         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
965
966     def test_conn_down(self):
967         """verify session goes down after inactivity"""
968         bfd_session_up(self)
969         detection_time = (
970             self.test_session.detect_mult
971             * self.vpp_session.required_min_rx
972             / USEC_IN_SEC
973         )
974         self.sleep(detection_time, "waiting for BFD session time-out")
975         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
976         verify_event(self, e, expected_state=BFDState.down)
977
978     def test_peer_discr_reset_sess_down(self):
979         """peer discriminator reset after session goes down"""
980         bfd_session_up(self)
981         detection_time = (
982             self.test_session.detect_mult
983             * self.vpp_session.required_min_rx
984             / USEC_IN_SEC
985         )
986         self.sleep(detection_time, "waiting for BFD session time-out")
987         self.test_session.my_discriminator = 0
988         wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
989
990     def test_large_required_min_rx(self):
991         """large remote required min rx interval"""
992         bfd_session_up(self)
993         p = wait_for_bfd_packet(self)
994         interval = 3000000
995         self.test_session.update(required_min_rx=interval)
996         self.test_session.send_packet()
997         time_mark = time.time()
998         count = 0
999         # busy wait here, trying to collect a packet or event, vpp is not
1000         # allowed to send packets and the session will timeout first - so the
1001         # Up->Down event must arrive before any packets do
1002         while time.time() < time_mark + interval / USEC_IN_SEC:
1003             try:
1004                 p = wait_for_bfd_packet(self, timeout=0)
1005                 # if vpp managed to send a packet before we did the session
1006                 # session update, then that's fine, ignore it
1007                 if p.time < time_mark - self.vpp_clock_offset:
1008                     continue
1009                 self.logger.error(ppp("Received unexpected packet:", p))
1010                 count += 1
1011             except CaptureTimeoutError:
1012                 pass
1013             events = self.vapi.collect_events()
1014             if len(events) > 0:
1015                 verify_event(self, events[0], BFDState.down)
1016                 break
1017         self.assert_equal(count, 0, "number of packets received")
1018
1019     def test_immediate_remote_min_rx_reduction(self):
1020         """immediately honor remote required min rx reduction"""
1021         self.vpp_session.remove_vpp_config()
1022         self.vpp_session = VppBFDUDPSession(
1023             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000
1024         )
1025         self.pg0.enable_capture()
1026         self.vpp_session.add_vpp_config()
1027         self.test_session.update(desired_min_tx=1000000, required_min_rx=1000000)
1028         bfd_session_up(self)
1029         reference_packet = wait_for_bfd_packet(self)
1030         time_mark = time.time()
1031         interval = 300000
1032         self.test_session.update(required_min_rx=interval)
1033         self.test_session.send_packet()
1034         extra_time = time.time() - time_mark
1035         p = wait_for_bfd_packet(self)
1036         # first packet is allowed to be late by time we spent doing the update
1037         # calculated in extra_time
1038         self.assert_in_range(
1039             p.time - reference_packet.time,
1040             0.95 * 0.75 * interval / USEC_IN_SEC,
1041             1.05 * interval / USEC_IN_SEC + extra_time,
1042             "time between BFD packets",
1043         )
1044         reference_packet = p
1045         for dummy in range(3):
1046             p = wait_for_bfd_packet(self)
1047             diff = p.time - reference_packet.time
1048             self.assert_in_range(
1049                 diff,
1050                 0.95 * 0.75 * interval / USEC_IN_SEC,
1051                 1.05 * interval / USEC_IN_SEC,
1052                 "time between BFD packets",
1053             )
1054             reference_packet = p
1055
1056     def test_modify_req_min_rx_double(self):
1057         """modify session - double required min rx"""
1058         bfd_session_up(self)
1059         p = wait_for_bfd_packet(self)
1060         self.test_session.update(desired_min_tx=10000, required_min_rx=10000)
1061         self.test_session.send_packet()
1062         # double required min rx
1063         self.vpp_session.modify_parameters(
1064             required_min_rx=2 * self.vpp_session.required_min_rx
1065         )
1066         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1067         # poll bit needs to be set
1068         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet")
1069         # finish poll sequence with final packet
1070         final = self.test_session.create_packet()
1071         final[BFD].flags = "F"
1072         timeout = (
1073             self.test_session.detect_mult
1074             * max(self.test_session.desired_min_tx, self.vpp_session.required_min_rx)
1075             / USEC_IN_SEC
1076         )
1077         self.test_session.send_packet(final)
1078         time_mark = time.time()
1079         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_event")
1080         verify_event(self, e, expected_state=BFDState.down)
1081         time_to_event = time.time() - time_mark
1082         self.assert_in_range(
1083             time_to_event, 0.9 * timeout, 1.1 * timeout, "session timeout"
1084         )
1085
1086     def test_modify_req_min_rx_halve(self):
1087         """modify session - halve required min rx"""
1088         self.vpp_session.modify_parameters(
1089             required_min_rx=2 * self.vpp_session.required_min_rx
1090         )
1091         bfd_session_up(self)
1092         p = wait_for_bfd_packet(self)
1093         self.test_session.update(desired_min_tx=10000, required_min_rx=10000)
1094         self.test_session.send_packet()
1095         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1096         # halve required min rx
1097         old_required_min_rx = self.vpp_session.required_min_rx
1098         self.vpp_session.modify_parameters(
1099             required_min_rx=self.vpp_session.required_min_rx // 2
1100         )
1101         # now we wait 0.8*3*old-req-min-rx and the session should still be up
1102         self.sleep(
1103             0.8 * self.vpp_session.detect_mult * old_required_min_rx / USEC_IN_SEC,
1104             "wait before finishing poll sequence",
1105         )
1106         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
1107         p = wait_for_bfd_packet(self)
1108         # poll bit needs to be set
1109         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet")
1110         # finish poll sequence with final packet
1111         final = self.test_session.create_packet()
1112         final[BFD].flags = "F"
1113         self.test_session.send_packet(final)
1114         # now the session should time out under new conditions
1115         detection_time = (
1116             self.test_session.detect_mult
1117             * self.vpp_session.required_min_rx
1118             / USEC_IN_SEC
1119         )
1120         before = time.time()
1121         e = self.vapi.wait_for_event(2 * detection_time, "bfd_udp_session_event")
1122         after = time.time()
1123         self.assert_in_range(
1124             after - before,
1125             0.9 * detection_time,
1126             1.1 * detection_time,
1127             "time before bfd session goes down",
1128         )
1129         verify_event(self, e, expected_state=BFDState.down)
1130
1131     def test_modify_detect_mult(self):
1132         """modify detect multiplier"""
1133         bfd_session_up(self)
1134         p = wait_for_bfd_packet(self)
1135         self.vpp_session.modify_parameters(detect_mult=1)
1136         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1137         self.assert_equal(
1138             self.vpp_session.detect_mult, p[BFD].detect_mult, "detect mult"
1139         )
1140         # poll bit must not be set
1141         self.assertNotIn(
1142             "P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet"
1143         )
1144         self.vpp_session.modify_parameters(detect_mult=10)
1145         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1146         self.assert_equal(
1147             self.vpp_session.detect_mult, p[BFD].detect_mult, "detect mult"
1148         )
1149         # poll bit must not be set
1150         self.assertNotIn(
1151             "P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet"
1152         )
1153
1154     def test_queued_poll(self):
1155         """test poll sequence queueing"""
1156         bfd_session_up(self)
1157         p = wait_for_bfd_packet(self)
1158         self.vpp_session.modify_parameters(
1159             required_min_rx=2 * self.vpp_session.required_min_rx
1160         )
1161         p = wait_for_bfd_packet(self)
1162         poll_sequence_start = time.time()
1163         poll_sequence_length_min = 0.5
1164         send_final_after = time.time() + poll_sequence_length_min
1165         # poll bit needs to be set
1166         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet")
1167         self.assert_equal(
1168             p[BFD].required_min_rx_interval,
1169             self.vpp_session.required_min_rx,
1170             "BFD required min rx interval",
1171         )
1172         self.vpp_session.modify_parameters(
1173             required_min_rx=2 * self.vpp_session.required_min_rx
1174         )
1175         # 2nd poll sequence should be queued now
1176         # don't send the reply back yet, wait for some time to emulate
1177         # longer round-trip time
1178         packet_count = 0
1179         while time.time() < send_final_after:
1180             self.test_session.send_packet()
1181             p = wait_for_bfd_packet(self)
1182             self.assert_equal(
1183                 len(self.vapi.collect_events()), 0, "number of bfd events"
1184             )
1185             self.assert_equal(
1186                 p[BFD].required_min_rx_interval,
1187                 self.vpp_session.required_min_rx,
1188                 "BFD required min rx interval",
1189             )
1190             packet_count += 1
1191             # poll bit must be set
1192             self.assertIn(
1193                 "P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet"
1194             )
1195         final = self.test_session.create_packet()
1196         final[BFD].flags = "F"
1197         self.test_session.send_packet(final)
1198         # finish 1st with final
1199         poll_sequence_length = time.time() - poll_sequence_start
1200         # vpp must wait for some time before starting new poll sequence
1201         poll_no_2_started = False
1202         for dummy in range(2 * packet_count):
1203             p = wait_for_bfd_packet(self)
1204             self.assert_equal(
1205                 len(self.vapi.collect_events()), 0, "number of bfd events"
1206             )
1207             if "P" in p.sprintf("%BFD.flags%"):
1208                 poll_no_2_started = True
1209                 if time.time() < poll_sequence_start + poll_sequence_length:
1210                     raise Exception("VPP started 2nd poll sequence too soon")
1211                 final = self.test_session.create_packet()
1212                 final[BFD].flags = "F"
1213                 self.test_session.send_packet(final)
1214                 break
1215             else:
1216                 self.test_session.send_packet()
1217         self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1218         # finish 2nd with final
1219         final = self.test_session.create_packet()
1220         final[BFD].flags = "F"
1221         self.test_session.send_packet(final)
1222         p = wait_for_bfd_packet(self)
1223         # poll bit must not be set
1224         self.assertNotIn("P", p.sprintf("%BFD.flags%"), "Poll bit set in BFD packet")
1225
1226     # returning inconsistent results requiring retries in per-patch tests
1227     @unittest.skipUnless(config.extended, "part of extended tests")
1228     def test_poll_response(self):
1229         """test correct response to control frame with poll bit set"""
1230         bfd_session_up(self)
1231         poll = self.test_session.create_packet()
1232         poll[BFD].flags = "P"
1233         self.test_session.send_packet(poll)
1234         final = wait_for_bfd_packet(
1235             self, pcap_time_min=time.time() - self.vpp_clock_offset
1236         )
1237         self.assertIn("F", final.sprintf("%BFD.flags%"))
1238
1239     def test_no_periodic_if_remote_demand(self):
1240         """no periodic frames outside poll sequence if remote demand set"""
1241         bfd_session_up(self)
1242         demand = self.test_session.create_packet()
1243         demand[BFD].flags = "D"
1244         self.test_session.send_packet(demand)
1245         transmit_time = (
1246             0.9
1247             * max(self.vpp_session.required_min_rx, self.test_session.desired_min_tx)
1248             / USEC_IN_SEC
1249         )
1250         count = 0
1251         for dummy in range(self.test_session.detect_mult * 2):
1252             self.sleep(transmit_time)
1253             self.test_session.send_packet(demand)
1254             try:
1255                 p = wait_for_bfd_packet(self, timeout=0)
1256                 self.logger.error(ppp("Received unexpected packet:", p))
1257                 count += 1
1258             except CaptureTimeoutError:
1259                 pass
1260         events = self.vapi.collect_events()
1261         for e in events:
1262             self.logger.error("Received unexpected event: %s", e)
1263         self.assert_equal(count, 0, "number of packets received")
1264         self.assert_equal(len(events), 0, "number of events received")
1265
1266     def test_echo_looped_back(self):
1267         """echo packets looped back"""
1268         bfd_session_up(self)
1269         stats_before = bfd_grab_stats_snapshot(self)
1270         self.pg0.enable_capture()
1271         echo_packet_count = 10
1272         # random source port low enough to increment a few times..
1273         udp_sport_tx = randint(1, 50000)
1274         udp_sport_rx = udp_sport_tx
1275         echo_packet = (
1276             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1277             / IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4)
1278             / UDP(dport=BFD.udp_dport_echo)
1279             / Raw("this should be looped back")
1280         )
1281         for dummy in range(echo_packet_count):
1282             self.sleep(0.01, "delay between echo packets")
1283             echo_packet[UDP].sport = udp_sport_tx
1284             udp_sport_tx += 1
1285             self.logger.debug(ppp("Sending packet:", echo_packet))
1286             self.pg0.add_stream(echo_packet)
1287             self.pg_start()
1288             self.logger.debug(self.vapi.ppcli("show trace"))
1289         counter = 0
1290         bfd_control_packets_rx = 0
1291         while counter < echo_packet_count:
1292             p = self.pg0.wait_for_packet(1)
1293             self.logger.debug(ppp("Got packet:", p))
1294             ether = p[Ether]
1295             self.assert_equal(self.pg0.remote_mac, ether.dst, "Destination MAC")
1296             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1297             ip = p[IP]
1298             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1299             udp = p[UDP]
1300             if udp.dport == BFD.udp_dport:
1301                 bfd_control_packets_rx += 1
1302                 continue
1303             self.assert_equal(self.pg0.remote_ip4, ip.src, "Source IP")
1304             self.assert_equal(udp.dport, BFD.udp_dport_echo, "UDP destination port")
1305             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1306             udp_sport_rx += 1
1307             # need to compare the hex payload here, otherwise BFD_vpp_echo
1308             # gets in way
1309             self.assertEqual(
1310                 scapy.compat.raw(p[UDP].payload),
1311                 scapy.compat.raw(echo_packet[UDP].payload),
1312                 "Received packet is not the echo packet sent",
1313             )
1314             counter += 1
1315         self.assert_equal(
1316             udp_sport_tx,
1317             udp_sport_rx,
1318             "UDP source port (== ECHO packet identifier for test purposes)",
1319         )
1320         stats_after = bfd_grab_stats_snapshot(self)
1321         diff = bfd_stats_diff(stats_before, stats_after)
1322         self.assertEqual(0, diff.rx, "RX counter bumped but no BFD packets sent")
1323         self.assertEqual(
1324             0, diff.rx_echo, "RX echo counter bumped but no BFD session exists"
1325         )
1326         self.assertEqual(
1327             0, diff.tx_echo, "TX echo counter bumped but no BFD session exists"
1328         )
1329
1330     def test_echo(self):
1331         """echo function"""
1332         stats_before = bfd_grab_stats_snapshot(self)
1333         bfd_session_up(self)
1334         self.test_session.update(required_min_echo_rx=150000)
1335         self.test_session.send_packet()
1336         detection_time = (
1337             self.test_session.detect_mult
1338             * self.vpp_session.required_min_rx
1339             / USEC_IN_SEC
1340         )
1341         # echo shouldn't work without echo source set
1342         for dummy in range(10):
1343             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1344             self.sleep(sleep, "delay before sending bfd packet")
1345             self.test_session.send_packet()
1346         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1347         self.assert_equal(
1348             p[BFD].required_min_rx_interval,
1349             self.vpp_session.required_min_rx,
1350             "BFD required min rx interval",
1351         )
1352         self.test_session.send_packet()
1353         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1354         echo_seen = False
1355         # should be turned on - loopback echo packets
1356         for dummy in range(3):
1357             loop_until = time.time() + 0.75 * detection_time
1358             while time.time() < loop_until:
1359                 p = self.pg0.wait_for_packet(1)
1360                 self.logger.debug(ppp("Got packet:", p))
1361                 if p[UDP].dport == BFD.udp_dport_echo:
1362                     self.assert_equal(p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1363                     self.assertNotEqual(
1364                         p[IP].src,
1365                         self.loopback0.local_ip4,
1366                         "BFD ECHO src IP equal to loopback IP",
1367                     )
1368                     self.logger.debug(ppp("Looping back packet:", p))
1369                     self.assert_equal(
1370                         p[Ether].dst,
1371                         self.pg0.remote_mac,
1372                         "ECHO packet destination MAC address",
1373                     )
1374                     p[Ether].dst = self.pg0.local_mac
1375                     self.pg0.add_stream(p)
1376                     self.test_session.rx_packets_echo += 1
1377                     self.test_session.tx_packets_echo += 1
1378                     self.pg_start()
1379                     echo_seen = True
1380                 elif p.haslayer(BFD):
1381                     self.test_session.rx_packets += 1
1382                     if echo_seen:
1383                         self.assertGreaterEqual(
1384                             p[BFD].required_min_rx_interval, 1000000
1385                         )
1386                     if "P" in p.sprintf("%BFD.flags%"):
1387                         final = self.test_session.create_packet()
1388                         final[BFD].flags = "F"
1389                         self.test_session.send_packet(final)
1390                 else:
1391                     raise Exception(ppp("Received unknown packet:", p))
1392
1393                 self.assert_equal(
1394                     len(self.vapi.collect_events()), 0, "number of bfd events"
1395                 )
1396             self.test_session.send_packet()
1397         self.assertTrue(echo_seen, "No echo packets received")
1398
1399         stats_after = bfd_grab_stats_snapshot(self)
1400         diff = bfd_stats_diff(stats_before, stats_after)
1401         # our rx is vpp tx and vice versa, also tolerate one packet off
1402         self.assert_in_range(
1403             self.test_session.tx_packets, diff.rx - 1, diff.rx + 1, "RX counter"
1404         )
1405         self.assert_in_range(
1406             self.test_session.rx_packets, diff.tx - 1, diff.tx + 1, "TX counter"
1407         )
1408         self.assert_in_range(
1409             self.test_session.tx_packets_echo,
1410             diff.rx_echo - 1,
1411             diff.rx_echo + 1,
1412             "RX echo counter",
1413         )
1414         self.assert_in_range(
1415             self.test_session.rx_packets_echo,
1416             diff.tx_echo - 1,
1417             diff.tx_echo + 1,
1418             "TX echo counter",
1419         )
1420
1421     def test_echo_fail(self):
1422         """session goes down if echo function fails"""
1423         bfd_session_up(self)
1424         self.test_session.update(required_min_echo_rx=150000)
1425         self.test_session.send_packet()
1426         detection_time = (
1427             self.test_session.detect_mult
1428             * self.vpp_session.required_min_rx
1429             / USEC_IN_SEC
1430         )
1431         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1432         # echo function should be used now, but we will drop the echo packets
1433         verified_diag = False
1434         for dummy in range(3):
1435             loop_until = time.time() + 0.75 * detection_time
1436             while time.time() < loop_until:
1437                 p = self.pg0.wait_for_packet(1)
1438                 self.logger.debug(ppp("Got packet:", p))
1439                 if p[UDP].dport == BFD.udp_dport_echo:
1440                     # dropped
1441                     pass
1442                 elif p.haslayer(BFD):
1443                     if "P" in p.sprintf("%BFD.flags%"):
1444                         self.assertGreaterEqual(
1445                             p[BFD].required_min_rx_interval, 1000000
1446                         )
1447                         final = self.test_session.create_packet()
1448                         final[BFD].flags = "F"
1449                         self.test_session.send_packet(final)
1450                     if p[BFD].state == BFDState.down:
1451                         self.assert_equal(
1452                             p[BFD].diag, BFDDiagCode.echo_function_failed, BFDDiagCode
1453                         )
1454                         verified_diag = True
1455                 else:
1456                     raise Exception(ppp("Received unknown packet:", p))
1457             self.test_session.send_packet()
1458         events = self.vapi.collect_events()
1459         self.assert_equal(len(events), 1, "number of bfd events")
1460         self.assert_equal(events[0].state, BFDState.down, BFDState)
1461         self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1462
1463     def test_echo_stop(self):
1464         """echo function stops if peer sets required min echo rx zero"""
1465         bfd_session_up(self)
1466         self.test_session.update(required_min_echo_rx=150000)
1467         self.test_session.send_packet()
1468         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1469         # wait for first echo packet
1470         while True:
1471             p = self.pg0.wait_for_packet(1)
1472             self.logger.debug(ppp("Got packet:", p))
1473             if p[UDP].dport == BFD.udp_dport_echo:
1474                 self.logger.debug(ppp("Looping back packet:", p))
1475                 p[Ether].dst = self.pg0.local_mac
1476                 self.pg0.add_stream(p)
1477                 self.pg_start()
1478                 break
1479             elif p.haslayer(BFD):
1480                 # ignore BFD
1481                 pass
1482             else:
1483                 raise Exception(ppp("Received unknown packet:", p))
1484         self.test_session.update(required_min_echo_rx=0)
1485         self.test_session.send_packet()
1486         # echo packets shouldn't arrive anymore
1487         for dummy in range(5):
1488             wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1489             self.test_session.send_packet()
1490             events = self.vapi.collect_events()
1491             self.assert_equal(len(events), 0, "number of bfd events")
1492
1493     def test_echo_source_removed(self):
1494         """echo function stops if echo source is removed"""
1495         bfd_session_up(self)
1496         self.test_session.update(required_min_echo_rx=150000)
1497         self.test_session.send_packet()
1498         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1499         # wait for first echo packet
1500         while True:
1501             p = self.pg0.wait_for_packet(1)
1502             self.logger.debug(ppp("Got packet:", p))
1503             if p[UDP].dport == BFD.udp_dport_echo:
1504                 self.logger.debug(ppp("Looping back packet:", p))
1505                 p[Ether].dst = self.pg0.local_mac
1506                 self.pg0.add_stream(p)
1507                 self.pg_start()
1508                 break
1509             elif p.haslayer(BFD):
1510                 # ignore BFD
1511                 pass
1512             else:
1513                 raise Exception(ppp("Received unknown packet:", p))
1514         self.vapi.bfd_udp_del_echo_source()
1515         self.test_session.send_packet()
1516         # echo packets shouldn't arrive anymore
1517         for dummy in range(5):
1518             wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1519             self.test_session.send_packet()
1520             events = self.vapi.collect_events()
1521             self.assert_equal(len(events), 0, "number of bfd events")
1522
1523     def test_stale_echo(self):
1524         """stale echo packets don't keep a session up"""
1525         bfd_session_up(self)
1526         self.test_session.update(required_min_echo_rx=150000)
1527         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1528         self.test_session.send_packet()
1529         # should be turned on - loopback echo packets
1530         echo_packet = None
1531         timeout_at = None
1532         timeout_ok = False
1533         for dummy in range(10 * self.vpp_session.detect_mult):
1534             p = self.pg0.wait_for_packet(1)
1535             if p[UDP].dport == BFD.udp_dport_echo:
1536                 if echo_packet is None:
1537                     self.logger.debug(ppp("Got first echo packet:", p))
1538                     echo_packet = p
1539                     timeout_at = (
1540                         time.time()
1541                         + self.vpp_session.detect_mult
1542                         * self.test_session.required_min_echo_rx
1543                         / USEC_IN_SEC
1544                     )
1545                 else:
1546                     self.logger.debug(ppp("Got followup echo packet:", p))
1547                 self.logger.debug(ppp("Looping back first echo packet:", p))
1548                 echo_packet[Ether].dst = self.pg0.local_mac
1549                 self.pg0.add_stream(echo_packet)
1550                 self.pg_start()
1551             elif p.haslayer(BFD):
1552                 self.logger.debug(ppp("Got packet:", p))
1553                 if "P" in p.sprintf("%BFD.flags%"):
1554                     final = self.test_session.create_packet()
1555                     final[BFD].flags = "F"
1556                     self.test_session.send_packet(final)
1557                 if p[BFD].state == BFDState.down:
1558                     self.assertIsNotNone(
1559                         timeout_at,
1560                         "Session went down before first echo packet received",
1561                     )
1562                     now = time.time()
1563                     self.assertGreaterEqual(
1564                         now,
1565                         timeout_at,
1566                         "Session timeout at %s, but is expected at %s"
1567                         % (now, timeout_at),
1568                     )
1569                     self.assert_equal(
1570                         p[BFD].diag, BFDDiagCode.echo_function_failed, BFDDiagCode
1571                     )
1572                     events = self.vapi.collect_events()
1573                     self.assert_equal(len(events), 1, "number of bfd events")
1574                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1575                     timeout_ok = True
1576                     break
1577             else:
1578                 raise Exception(ppp("Received unknown packet:", p))
1579             self.test_session.send_packet()
1580         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1581
1582     def test_invalid_echo_checksum(self):
1583         """echo packets with invalid checksum don't keep a session up"""
1584         bfd_session_up(self)
1585         self.test_session.update(required_min_echo_rx=150000)
1586         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1587         self.test_session.send_packet()
1588         # should be turned on - loopback echo packets
1589         timeout_at = None
1590         timeout_ok = False
1591         for dummy in range(10 * self.vpp_session.detect_mult):
1592             p = self.pg0.wait_for_packet(1)
1593             if p[UDP].dport == BFD.udp_dport_echo:
1594                 self.logger.debug(ppp("Got echo packet:", p))
1595                 if timeout_at is None:
1596                     timeout_at = (
1597                         time.time()
1598                         + self.vpp_session.detect_mult
1599                         * self.test_session.required_min_echo_rx
1600                         / USEC_IN_SEC
1601                     )
1602                 p[BFD_vpp_echo].checksum = getrandbits(64)
1603                 p[Ether].dst = self.pg0.local_mac
1604                 self.logger.debug(ppp("Looping back modified echo packet:", p))
1605                 self.pg0.add_stream(p)
1606                 self.pg_start()
1607             elif p.haslayer(BFD):
1608                 self.logger.debug(ppp("Got packet:", p))
1609                 if "P" in p.sprintf("%BFD.flags%"):
1610                     final = self.test_session.create_packet()
1611                     final[BFD].flags = "F"
1612                     self.test_session.send_packet(final)
1613                 if p[BFD].state == BFDState.down:
1614                     self.assertIsNotNone(
1615                         timeout_at,
1616                         "Session went down before first echo packet received",
1617                     )
1618                     now = time.time()
1619                     self.assertGreaterEqual(
1620                         now,
1621                         timeout_at,
1622                         "Session timeout at %s, but is expected at %s"
1623                         % (now, timeout_at),
1624                     )
1625                     self.assert_equal(
1626                         p[BFD].diag, BFDDiagCode.echo_function_failed, BFDDiagCode
1627                     )
1628                     events = self.vapi.collect_events()
1629                     self.assert_equal(len(events), 1, "number of bfd events")
1630                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1631                     timeout_ok = True
1632                     break
1633             else:
1634                 raise Exception(ppp("Received unknown packet:", p))
1635             self.test_session.send_packet()
1636         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1637
1638     def test_admin_up_down(self):
1639         """put session admin-up and admin-down"""
1640         bfd_session_up(self)
1641         self.vpp_session.admin_down()
1642         self.pg0.enable_capture()
1643         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1644         verify_event(self, e, expected_state=BFDState.admin_down)
1645         for dummy in range(2):
1646             p = wait_for_bfd_packet(self)
1647             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1648         # try to bring session up - shouldn't be possible
1649         self.test_session.update(state=BFDState.init)
1650         self.test_session.send_packet()
1651         for dummy in range(2):
1652             p = wait_for_bfd_packet(self)
1653             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1654         self.vpp_session.admin_up()
1655         self.test_session.update(state=BFDState.down)
1656         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1657         verify_event(self, e, expected_state=BFDState.down)
1658         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1659         self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1660         self.test_session.send_packet()
1661         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1662         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1663         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1664         verify_event(self, e, expected_state=BFDState.init)
1665         self.test_session.update(state=BFDState.up)
1666         self.test_session.send_packet()
1667         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1668         self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1669         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1670         verify_event(self, e, expected_state=BFDState.up)
1671
1672     def test_config_change_remote_demand(self):
1673         """configuration change while peer in demand mode"""
1674         bfd_session_up(self)
1675         demand = self.test_session.create_packet()
1676         demand[BFD].flags = "D"
1677         self.test_session.send_packet(demand)
1678         self.vpp_session.modify_parameters(
1679             required_min_rx=2 * self.vpp_session.required_min_rx
1680         )
1681         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1682         # poll bit must be set
1683         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1684         # terminate poll sequence
1685         final = self.test_session.create_packet()
1686         final[BFD].flags = "D+F"
1687         self.test_session.send_packet(final)
1688         # vpp should be quiet now again
1689         transmit_time = (
1690             0.9
1691             * max(self.vpp_session.required_min_rx, self.test_session.desired_min_tx)
1692             / USEC_IN_SEC
1693         )
1694         count = 0
1695         for dummy in range(self.test_session.detect_mult * 2):
1696             self.sleep(transmit_time)
1697             self.test_session.send_packet(demand)
1698             try:
1699                 p = wait_for_bfd_packet(self, timeout=0)
1700                 self.logger.error(ppp("Received unexpected packet:", p))
1701                 count += 1
1702             except CaptureTimeoutError:
1703                 pass
1704         events = self.vapi.collect_events()
1705         for e in events:
1706             self.logger.error("Received unexpected event: %s", e)
1707         self.assert_equal(count, 0, "number of packets received")
1708         self.assert_equal(len(events), 0, "number of events received")
1709
1710     def test_intf_deleted(self):
1711         """interface with bfd session deleted"""
1712         intf = VppLoInterface(self)
1713         intf.config_ip4()
1714         intf.admin_up()
1715         sw_if_index = intf.sw_if_index
1716         vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1717         vpp_session.add_vpp_config()
1718         vpp_session.admin_up()
1719         intf.remove_vpp_config()
1720         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1721         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1722         self.assertFalse(vpp_session.query_vpp_config())
1723
1724
1725 @tag_run_solo
1726 @tag_fixme_vpp_workers
1727 class BFD6TestCase(VppTestCase):
1728     """Bidirectional Forwarding Detection (BFD) (IPv6)"""
1729
1730     pg0 = None
1731     vpp_clock_offset = None
1732     vpp_session = None
1733     test_session = None
1734
1735     @classmethod
1736     def setUpClass(cls):
1737         super(BFD6TestCase, cls).setUpClass()
1738         cls.vapi.cli("set log class bfd level debug")
1739         try:
1740             cls.create_pg_interfaces([0])
1741             cls.pg0.config_ip6()
1742             cls.pg0.configure_ipv6_neighbors()
1743             cls.pg0.admin_up()
1744             cls.pg0.resolve_ndp()
1745             cls.create_loopback_interfaces(1)
1746             cls.loopback0 = cls.lo_interfaces[0]
1747             cls.loopback0.config_ip6()
1748             cls.loopback0.admin_up()
1749
1750         except Exception:
1751             super(BFD6TestCase, cls).tearDownClass()
1752             raise
1753
1754     @classmethod
1755     def tearDownClass(cls):
1756         super(BFD6TestCase, cls).tearDownClass()
1757
1758     def setUp(self):
1759         super(BFD6TestCase, self).setUp()
1760         self.factory = AuthKeyFactory()
1761         self.vapi.want_bfd_events()
1762         self.pg0.enable_capture()
1763         try:
1764             self.bfd_udp4_sessions = self.statistics["/bfd/udp4/sessions"]
1765             self.bfd_udp6_sessions = self.statistics["/bfd/udp6/sessions"]
1766             self.vpp_session = VppBFDUDPSession(
1767                 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6
1768             )
1769             self.vpp_session.add_vpp_config()
1770             self.vpp_session.admin_up()
1771             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1772             self.logger.debug(self.vapi.cli("show adj nbr"))
1773         except BaseException:
1774             self.vapi.want_bfd_events(enable_disable=0)
1775             raise
1776
1777     def tearDown(self):
1778         if not self.vpp_dead:
1779             self.vapi.want_bfd_events(enable_disable=0)
1780         self.vapi.collect_events()  # clear the event queue
1781         super(BFD6TestCase, self).tearDown()
1782
1783     def test_session_up(self):
1784         """bring BFD session up"""
1785         bfd_session_up(self)
1786         bfd_udp4_sessions = self.statistics["/bfd/udp4/sessions"]
1787         bfd_udp6_sessions = self.statistics["/bfd/udp6/sessions"]
1788         self.assert_equal(bfd_udp4_sessions, self.bfd_udp4_sessions)
1789         self.assert_equal(bfd_udp6_sessions - self.bfd_udp6_sessions, 1)
1790
1791     def test_session_up_by_ip(self):
1792         """bring BFD session up - first frame looked up by address pair"""
1793         self.logger.info("BFD: Sending Slow control frame")
1794         self.test_session.update(my_discriminator=randint(0, 40000000))
1795         self.test_session.send_packet()
1796         self.pg0.enable_capture()
1797         p = self.pg0.wait_for_packet(1)
1798         self.assert_equal(
1799             p[BFD].your_discriminator,
1800             self.test_session.my_discriminator,
1801             "BFD - your discriminator",
1802         )
1803         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1804         self.test_session.update(
1805             your_discriminator=p[BFD].my_discriminator, state=BFDState.up
1806         )
1807         self.logger.info("BFD: Waiting for event")
1808         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1809         verify_event(self, e, expected_state=BFDState.init)
1810         self.logger.info("BFD: Sending Up")
1811         self.test_session.send_packet()
1812         self.logger.info("BFD: Waiting for event")
1813         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1814         verify_event(self, e, expected_state=BFDState.up)
1815         self.logger.info("BFD: Session is Up")
1816         self.test_session.update(state=BFDState.up)
1817         self.test_session.send_packet()
1818         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1819
1820     def test_hold_up(self):
1821         """hold BFD session up"""
1822         bfd_session_up(self)
1823         for dummy in range(self.test_session.detect_mult * 2):
1824             wait_for_bfd_packet(self)
1825             self.test_session.send_packet()
1826         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
1827         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1828
1829     def test_echo_looped_back(self):
1830         """echo packets looped back"""
1831         bfd_session_up(self)
1832         stats_before = bfd_grab_stats_snapshot(self)
1833         self.pg0.enable_capture()
1834         echo_packet_count = 10
1835         # random source port low enough to increment a few times..
1836         udp_sport_tx = randint(1, 50000)
1837         udp_sport_rx = udp_sport_tx
1838         echo_packet = (
1839             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1840             / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.remote_ip6)
1841             / UDP(dport=BFD.udp_dport_echo)
1842             / Raw("this should be looped back")
1843         )
1844         for dummy in range(echo_packet_count):
1845             self.sleep(0.01, "delay between echo packets")
1846             echo_packet[UDP].sport = udp_sport_tx
1847             udp_sport_tx += 1
1848             self.logger.debug(ppp("Sending packet:", echo_packet))
1849             self.pg0.add_stream(echo_packet)
1850             self.pg_start()
1851         counter = 0
1852         bfd_control_packets_rx = 0
1853         while counter < echo_packet_count:
1854             p = self.pg0.wait_for_packet(1)
1855             self.logger.debug(ppp("Got packet:", p))
1856             ether = p[Ether]
1857             self.assert_equal(self.pg0.remote_mac, ether.dst, "Destination MAC")
1858             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1859             ip = p[IPv6]
1860             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1861             udp = p[UDP]
1862             if udp.dport == BFD.udp_dport:
1863                 bfd_control_packets_rx += 1
1864                 continue
1865             self.assert_equal(self.pg0.remote_ip6, ip.src, "Source IP")
1866             self.assert_equal(udp.dport, BFD.udp_dport_echo, "UDP destination port")
1867             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1868             udp_sport_rx += 1
1869             # need to compare the hex payload here, otherwise BFD_vpp_echo
1870             # gets in way
1871             self.assertEqual(
1872                 scapy.compat.raw(p[UDP].payload),
1873                 scapy.compat.raw(echo_packet[UDP].payload),
1874                 "Received packet is not the echo packet sent",
1875             )
1876             counter += 1
1877         self.assert_equal(
1878             udp_sport_tx,
1879             udp_sport_rx,
1880             "UDP source port (== ECHO packet identifier for test purposes)",
1881         )
1882         stats_after = bfd_grab_stats_snapshot(self)
1883         diff = bfd_stats_diff(stats_before, stats_after)
1884         self.assertEqual(0, diff.rx, "RX counter bumped but no BFD packets sent")
1885         self.assertEqual(
1886             0, diff.rx_echo, "RX echo counter bumped but no BFD session exists"
1887         )
1888         self.assertEqual(
1889             0, diff.tx_echo, "TX echo counter bumped but no BFD session exists"
1890         )
1891
1892     def test_echo(self):
1893         """echo function"""
1894         stats_before = bfd_grab_stats_snapshot(self)
1895         bfd_session_up(self)
1896         self.test_session.update(required_min_echo_rx=150000)
1897         self.test_session.send_packet()
1898         detection_time = (
1899             self.test_session.detect_mult
1900             * self.vpp_session.required_min_rx
1901             / USEC_IN_SEC
1902         )
1903         # echo shouldn't work without echo source set
1904         for dummy in range(10):
1905             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1906             self.sleep(sleep, "delay before sending bfd packet")
1907             self.test_session.send_packet()
1908         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1909         self.assert_equal(
1910             p[BFD].required_min_rx_interval,
1911             self.vpp_session.required_min_rx,
1912             "BFD required min rx interval",
1913         )
1914         self.test_session.send_packet()
1915         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1916         echo_seen = False
1917         # should be turned on - loopback echo packets
1918         for dummy in range(3):
1919             loop_until = time.time() + 0.75 * detection_time
1920             while time.time() < loop_until:
1921                 p = self.pg0.wait_for_packet(1)
1922                 self.logger.debug(ppp("Got packet:", p))
1923                 if p[UDP].dport == BFD.udp_dport_echo:
1924                     self.assert_equal(
1925                         p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP"
1926                     )
1927                     self.assertNotEqual(
1928                         p[IPv6].src,
1929                         self.loopback0.local_ip6,
1930                         "BFD ECHO src IP equal to loopback IP",
1931                     )
1932                     self.logger.debug(ppp("Looping back packet:", p))
1933                     self.assert_equal(
1934                         p[Ether].dst,
1935                         self.pg0.remote_mac,
1936                         "ECHO packet destination MAC address",
1937                     )
1938                     self.test_session.rx_packets_echo += 1
1939                     self.test_session.tx_packets_echo += 1
1940                     p[Ether].dst = self.pg0.local_mac
1941                     self.pg0.add_stream(p)
1942                     self.pg_start()
1943                     echo_seen = True
1944                 elif p.haslayer(BFD):
1945                     self.test_session.rx_packets += 1
1946                     if echo_seen:
1947                         self.assertGreaterEqual(
1948                             p[BFD].required_min_rx_interval, 1000000
1949                         )
1950                     if "P" in p.sprintf("%BFD.flags%"):
1951                         final = self.test_session.create_packet()
1952                         final[BFD].flags = "F"
1953                         self.test_session.send_packet(final)
1954                 else:
1955                     raise Exception(ppp("Received unknown packet:", p))
1956
1957                 self.assert_equal(
1958                     len(self.vapi.collect_events()), 0, "number of bfd events"
1959                 )
1960             self.test_session.send_packet()
1961         self.assertTrue(echo_seen, "No echo packets received")
1962
1963         stats_after = bfd_grab_stats_snapshot(self)
1964         diff = bfd_stats_diff(stats_before, stats_after)
1965         # our rx is vpp tx and vice versa, also tolerate one packet off
1966         self.assert_in_range(
1967             self.test_session.tx_packets, diff.rx - 1, diff.rx + 1, "RX counter"
1968         )
1969         self.assert_in_range(
1970             self.test_session.rx_packets, diff.tx - 1, diff.tx + 1, "TX counter"
1971         )
1972         self.assert_in_range(
1973             self.test_session.tx_packets_echo,
1974             diff.rx_echo - 1,
1975             diff.rx_echo + 1,
1976             "RX echo counter",
1977         )
1978         self.assert_in_range(
1979             self.test_session.rx_packets_echo,
1980             diff.tx_echo - 1,
1981             diff.tx_echo + 1,
1982             "TX echo counter",
1983         )
1984
1985     def test_intf_deleted(self):
1986         """interface with bfd session deleted"""
1987         intf = VppLoInterface(self)
1988         intf.config_ip6()
1989         intf.admin_up()
1990         sw_if_index = intf.sw_if_index
1991         vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip6, af=AF_INET6)
1992         vpp_session.add_vpp_config()
1993         vpp_session.admin_up()
1994         intf.remove_vpp_config()
1995         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1996         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1997         self.assertFalse(vpp_session.query_vpp_config())
1998
1999
2000 @tag_run_solo
2001 class BFDFIBTestCase(VppTestCase):
2002     """BFD-FIB interactions (IPv6)"""
2003
2004     vpp_session = None
2005     test_session = None
2006
2007     @classmethod
2008     def setUpClass(cls):
2009         super(BFDFIBTestCase, cls).setUpClass()
2010
2011     @classmethod
2012     def tearDownClass(cls):
2013         super(BFDFIBTestCase, cls).tearDownClass()
2014
2015     def setUp(self):
2016         super(BFDFIBTestCase, self).setUp()
2017         self.create_pg_interfaces(range(1))
2018
2019         self.vapi.want_bfd_events()
2020         self.pg0.enable_capture()
2021
2022         for i in self.pg_interfaces:
2023             i.admin_up()
2024             i.config_ip6()
2025             i.configure_ipv6_neighbors()
2026
2027     def tearDown(self):
2028         if not self.vpp_dead:
2029             self.vapi.want_bfd_events(enable_disable=False)
2030
2031         super(BFDFIBTestCase, self).tearDown()
2032
2033     @staticmethod
2034     def pkt_is_not_data_traffic(p):
2035         """not data traffic implies BFD or the usual IPv6 ND/RA"""
2036         if p.haslayer(BFD) or is_ipv6_misc(p):
2037             return True
2038         return False
2039
2040     def test_session_with_fib(self):
2041         """BFD-FIB interactions"""
2042
2043         # packets to match against both of the routes
2044         p = [
2045             (
2046                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2047                 / IPv6(src="3001::1", dst="2001::1")
2048                 / UDP(sport=1234, dport=1234)
2049                 / Raw(b"\xa5" * 100)
2050             ),
2051             (
2052                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2053                 / IPv6(src="3001::1", dst="2002::1")
2054                 / UDP(sport=1234, dport=1234)
2055                 / Raw(b"\xa5" * 100)
2056             ),
2057         ]
2058
2059         # A recursive and a non-recursive route via a next-hop that
2060         # will have a BFD session
2061         ip_2001_s_64 = VppIpRoute(
2062             self,
2063             "2001::",
2064             64,
2065             [VppRoutePath(self.pg0.remote_ip6, self.pg0.sw_if_index)],
2066         )
2067         ip_2002_s_64 = VppIpRoute(
2068             self, "2002::", 64, [VppRoutePath(self.pg0.remote_ip6, 0xFFFFFFFF)]
2069         )
2070         ip_2001_s_64.add_vpp_config()
2071         ip_2002_s_64.add_vpp_config()
2072
2073         # bring the session up now the routes are present
2074         self.vpp_session = VppBFDUDPSession(
2075             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6
2076         )
2077         self.vpp_session.add_vpp_config()
2078         self.vpp_session.admin_up()
2079         self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
2080
2081         # session is up - traffic passes
2082         bfd_session_up(self)
2083
2084         self.pg0.add_stream(p)
2085         self.pg_start()
2086         for packet in p:
2087             captured = self.pg0.wait_for_packet(
2088                 1, filter_out_fn=self.pkt_is_not_data_traffic
2089             )
2090             self.assertEqual(captured[IPv6].dst, packet[IPv6].dst)
2091
2092         # session is up - traffic is dropped
2093         bfd_session_down(self)
2094
2095         self.pg0.add_stream(p)
2096         self.pg_start()
2097         with self.assertRaises(CaptureTimeoutError):
2098             self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
2099
2100         # session is up - traffic passes
2101         bfd_session_up(self)
2102
2103         self.pg0.add_stream(p)
2104         self.pg_start()
2105         for packet in p:
2106             captured = self.pg0.wait_for_packet(
2107                 1, filter_out_fn=self.pkt_is_not_data_traffic
2108             )
2109             self.assertEqual(captured[IPv6].dst, packet[IPv6].dst)
2110
2111
2112 @unittest.skipUnless(config.extended, "part of extended tests")
2113 class BFDTunTestCase(VppTestCase):
2114     """BFD over GRE tunnel"""
2115
2116     vpp_session = None
2117     test_session = None
2118
2119     @classmethod
2120     def setUpClass(cls):
2121         super(BFDTunTestCase, cls).setUpClass()
2122
2123     @classmethod
2124     def tearDownClass(cls):
2125         super(BFDTunTestCase, cls).tearDownClass()
2126
2127     def setUp(self):
2128         super(BFDTunTestCase, self).setUp()
2129         self.create_pg_interfaces(range(1))
2130
2131         self.vapi.want_bfd_events()
2132         self.pg0.enable_capture()
2133
2134         for i in self.pg_interfaces:
2135             i.admin_up()
2136             i.config_ip4()
2137             i.resolve_arp()
2138
2139     def tearDown(self):
2140         if not self.vpp_dead:
2141             self.vapi.want_bfd_events(enable_disable=0)
2142
2143         super(BFDTunTestCase, self).tearDown()
2144
2145     @staticmethod
2146     def pkt_is_not_data_traffic(p):
2147         """not data traffic implies BFD or the usual IPv6 ND/RA"""
2148         if p.haslayer(BFD) or is_ipv6_misc(p):
2149             return True
2150         return False
2151
2152     def test_bfd_o_gre(self):
2153         """BFD-o-GRE"""
2154
2155         # A GRE interface over which to run a BFD session
2156         gre_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
2157         gre_if.add_vpp_config()
2158         gre_if.admin_up()
2159         gre_if.config_ip4()
2160
2161         # bring the session up now the routes are present
2162         self.vpp_session = VppBFDUDPSession(
2163             self, gre_if, gre_if.remote_ip4, is_tunnel=True
2164         )
2165         self.vpp_session.add_vpp_config()
2166         self.vpp_session.admin_up()
2167
2168         self.test_session = BFDTestSession(
2169             self,
2170             gre_if,
2171             AF_INET,
2172             tunnel_header=(IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / GRE()),
2173             phy_interface=self.pg0,
2174         )
2175
2176         # packets to match against both of the routes
2177         p = [
2178             (
2179                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2180                 / IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4)
2181                 / UDP(sport=1234, dport=1234)
2182                 / Raw(b"\xa5" * 100)
2183             )
2184         ]
2185
2186         # session is up - traffic passes
2187         bfd_session_up(self)
2188
2189         self.send_and_expect(self.pg0, p, self.pg0)
2190
2191         # bring session down
2192         bfd_session_down(self)
2193
2194
2195 @tag_run_solo
2196 class BFDSHA1TestCase(VppTestCase):
2197     """Bidirectional Forwarding Detection (BFD) (SHA1 auth)"""
2198
2199     pg0 = None
2200     vpp_clock_offset = None
2201     vpp_session = None
2202     test_session = None
2203
2204     @classmethod
2205     def setUpClass(cls):
2206         super(BFDSHA1TestCase, cls).setUpClass()
2207         cls.vapi.cli("set log class bfd level debug")
2208         try:
2209             cls.create_pg_interfaces([0])
2210             cls.pg0.config_ip4()
2211             cls.pg0.admin_up()
2212             cls.pg0.resolve_arp()
2213
2214         except Exception:
2215             super(BFDSHA1TestCase, cls).tearDownClass()
2216             raise
2217
2218     @classmethod
2219     def tearDownClass(cls):
2220         super(BFDSHA1TestCase, cls).tearDownClass()
2221
2222     def setUp(self):
2223         super(BFDSHA1TestCase, self).setUp()
2224         self.factory = AuthKeyFactory()
2225         self.vapi.want_bfd_events()
2226         self.pg0.enable_capture()
2227
2228     def tearDown(self):
2229         if not self.vpp_dead:
2230             self.vapi.want_bfd_events(enable_disable=False)
2231         self.vapi.collect_events()  # clear the event queue
2232         super(BFDSHA1TestCase, self).tearDown()
2233
2234     def test_session_up(self):
2235         """bring BFD session up"""
2236         key = self.factory.create_random_key(self)
2237         key.add_vpp_config()
2238         self.vpp_session = VppBFDUDPSession(
2239             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2240         )
2241         self.vpp_session.add_vpp_config()
2242         self.vpp_session.admin_up()
2243         self.test_session = BFDTestSession(
2244             self,
2245             self.pg0,
2246             AF_INET,
2247             sha1_key=key,
2248             bfd_key_id=self.vpp_session.bfd_key_id,
2249         )
2250         bfd_session_up(self)
2251
2252     def test_hold_up(self):
2253         """hold BFD session up"""
2254         key = self.factory.create_random_key(self)
2255         key.add_vpp_config()
2256         self.vpp_session = VppBFDUDPSession(
2257             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2258         )
2259         self.vpp_session.add_vpp_config()
2260         self.vpp_session.admin_up()
2261         self.test_session = BFDTestSession(
2262             self,
2263             self.pg0,
2264             AF_INET,
2265             sha1_key=key,
2266             bfd_key_id=self.vpp_session.bfd_key_id,
2267         )
2268         bfd_session_up(self)
2269         for dummy in range(self.test_session.detect_mult * 2):
2270             wait_for_bfd_packet(self)
2271             self.test_session.send_packet()
2272         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2273
2274     def test_hold_up_meticulous(self):
2275         """hold BFD session up - meticulous auth"""
2276         key = self.factory.create_random_key(self, BFDAuthType.meticulous_keyed_sha1)
2277         key.add_vpp_config()
2278         self.vpp_session = VppBFDUDPSession(
2279             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2280         )
2281         self.vpp_session.add_vpp_config()
2282         self.vpp_session.admin_up()
2283         # specify sequence number so that it wraps
2284         self.test_session = BFDTestSession(
2285             self,
2286             self.pg0,
2287             AF_INET,
2288             sha1_key=key,
2289             bfd_key_id=self.vpp_session.bfd_key_id,
2290             our_seq_number=0xFFFFFFFF - 4,
2291         )
2292         bfd_session_up(self)
2293         for dummy in range(30):
2294             wait_for_bfd_packet(self)
2295             self.test_session.inc_seq_num()
2296             self.test_session.send_packet()
2297         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2298
2299     def test_send_bad_seq_number(self):
2300         """session is not kept alive by msgs with bad sequence numbers"""
2301         key = self.factory.create_random_key(self, BFDAuthType.meticulous_keyed_sha1)
2302         key.add_vpp_config()
2303         self.vpp_session = VppBFDUDPSession(
2304             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2305         )
2306         self.vpp_session.add_vpp_config()
2307         self.test_session = BFDTestSession(
2308             self,
2309             self.pg0,
2310             AF_INET,
2311             sha1_key=key,
2312             bfd_key_id=self.vpp_session.bfd_key_id,
2313         )
2314         bfd_session_up(self)
2315         detection_time = (
2316             self.test_session.detect_mult
2317             * self.vpp_session.required_min_rx
2318             / USEC_IN_SEC
2319         )
2320         send_until = time.time() + 2 * detection_time
2321         while time.time() < send_until:
2322             self.test_session.send_packet()
2323             self.sleep(
2324                 0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
2325                 "time between bfd packets",
2326             )
2327         e = self.vapi.collect_events()
2328         # session should be down now, because the sequence numbers weren't
2329         # updated
2330         self.assert_equal(len(e), 1, "number of bfd events")
2331         verify_event(self, e[0], expected_state=BFDState.down)
2332
2333     def execute_rogue_session_scenario(
2334         self,
2335         vpp_bfd_udp_session,
2336         legitimate_test_session,
2337         rogue_test_session,
2338         rogue_bfd_values=None,
2339     ):
2340         """execute a rogue session interaction scenario
2341
2342         1. create vpp session, add config
2343         2. bring the legitimate session up
2344         3. copy the bfd values from legitimate session to rogue session
2345         4. apply rogue_bfd_values to rogue session
2346         5. set rogue session state to down
2347         6. send message to take the session down from the rogue session
2348         7. assert that the legitimate session is unaffected
2349         """
2350
2351         self.vpp_session = vpp_bfd_udp_session
2352         self.vpp_session.add_vpp_config()
2353         self.test_session = legitimate_test_session
2354         # bring vpp session up
2355         bfd_session_up(self)
2356         # send packet from rogue session
2357         rogue_test_session.update(
2358             my_discriminator=self.test_session.my_discriminator,
2359             your_discriminator=self.test_session.your_discriminator,
2360             desired_min_tx=self.test_session.desired_min_tx,
2361             required_min_rx=self.test_session.required_min_rx,
2362             detect_mult=self.test_session.detect_mult,
2363             diag=self.test_session.diag,
2364             state=self.test_session.state,
2365             auth_type=self.test_session.auth_type,
2366         )
2367         if rogue_bfd_values:
2368             rogue_test_session.update(**rogue_bfd_values)
2369         rogue_test_session.update(state=BFDState.down)
2370         rogue_test_session.send_packet()
2371         wait_for_bfd_packet(self)
2372         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2373
2374     def test_mismatch_auth(self):
2375         """session is not brought down by unauthenticated msg"""
2376         key = self.factory.create_random_key(self)
2377         key.add_vpp_config()
2378         vpp_session = VppBFDUDPSession(
2379             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2380         )
2381         legitimate_test_session = BFDTestSession(
2382             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=vpp_session.bfd_key_id
2383         )
2384         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2385         self.execute_rogue_session_scenario(
2386             vpp_session, legitimate_test_session, rogue_test_session
2387         )
2388
2389     def test_mismatch_bfd_key_id(self):
2390         """session is not brought down by msg with non-existent key-id"""
2391         key = self.factory.create_random_key(self)
2392         key.add_vpp_config()
2393         vpp_session = VppBFDUDPSession(
2394             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2395         )
2396         # pick a different random bfd key id
2397         x = randint(0, 255)
2398         while x == vpp_session.bfd_key_id:
2399             x = randint(0, 255)
2400         legitimate_test_session = BFDTestSession(
2401             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=vpp_session.bfd_key_id
2402         )
2403         rogue_test_session = BFDTestSession(
2404             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x
2405         )
2406         self.execute_rogue_session_scenario(
2407             vpp_session, legitimate_test_session, rogue_test_session
2408         )
2409
2410     def test_mismatched_auth_type(self):
2411         """session is not brought down by msg with wrong auth type"""
2412         key = self.factory.create_random_key(self)
2413         key.add_vpp_config()
2414         vpp_session = VppBFDUDPSession(
2415             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2416         )
2417         legitimate_test_session = BFDTestSession(
2418             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=vpp_session.bfd_key_id
2419         )
2420         rogue_test_session = BFDTestSession(
2421             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=vpp_session.bfd_key_id
2422         )
2423         self.execute_rogue_session_scenario(
2424             vpp_session,
2425             legitimate_test_session,
2426             rogue_test_session,
2427             {"auth_type": BFDAuthType.keyed_md5},
2428         )
2429
2430     def test_restart(self):
2431         """simulate remote peer restart and resynchronization"""
2432         key = self.factory.create_random_key(self, BFDAuthType.meticulous_keyed_sha1)
2433         key.add_vpp_config()
2434         self.vpp_session = VppBFDUDPSession(
2435             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2436         )
2437         self.vpp_session.add_vpp_config()
2438         self.test_session = BFDTestSession(
2439             self,
2440             self.pg0,
2441             AF_INET,
2442             sha1_key=key,
2443             bfd_key_id=self.vpp_session.bfd_key_id,
2444             our_seq_number=0,
2445         )
2446         bfd_session_up(self)
2447         # don't send any packets for 2*detection_time
2448         detection_time = (
2449             self.test_session.detect_mult
2450             * self.vpp_session.required_min_rx
2451             / USEC_IN_SEC
2452         )
2453         self.sleep(2 * detection_time, "simulating peer restart")
2454         events = self.vapi.collect_events()
2455         self.assert_equal(len(events), 1, "number of bfd events")
2456         verify_event(self, events[0], expected_state=BFDState.down)
2457         self.test_session.update(state=BFDState.down)
2458         # reset sequence number
2459         self.test_session.our_seq_number = 0
2460         self.test_session.vpp_seq_number = None
2461         # now throw away any pending packets
2462         self.pg0.enable_capture()
2463         self.test_session.my_discriminator = 0
2464         bfd_session_up(self)
2465
2466
2467 @tag_run_solo
2468 class BFDAuthOnOffTestCase(VppTestCase):
2469     """Bidirectional Forwarding Detection (BFD) (changing auth)"""
2470
2471     pg0 = None
2472     vpp_session = None
2473     test_session = None
2474
2475     @classmethod
2476     def setUpClass(cls):
2477         super(BFDAuthOnOffTestCase, cls).setUpClass()
2478         cls.vapi.cli("set log class bfd level debug")
2479         try:
2480             cls.create_pg_interfaces([0])
2481             cls.pg0.config_ip4()
2482             cls.pg0.admin_up()
2483             cls.pg0.resolve_arp()
2484
2485         except Exception:
2486             super(BFDAuthOnOffTestCase, cls).tearDownClass()
2487             raise
2488
2489     @classmethod
2490     def tearDownClass(cls):
2491         super(BFDAuthOnOffTestCase, cls).tearDownClass()
2492
2493     def setUp(self):
2494         super(BFDAuthOnOffTestCase, self).setUp()
2495         self.factory = AuthKeyFactory()
2496         self.vapi.want_bfd_events()
2497         self.pg0.enable_capture()
2498
2499     def tearDown(self):
2500         if not self.vpp_dead:
2501             self.vapi.want_bfd_events(enable_disable=False)
2502         self.vapi.collect_events()  # clear the event queue
2503         super(BFDAuthOnOffTestCase, self).tearDown()
2504
2505     def test_auth_on_immediate(self):
2506         """turn auth on without disturbing session state (immediate)"""
2507         key = self.factory.create_random_key(self)
2508         key.add_vpp_config()
2509         self.vpp_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2510         self.vpp_session.add_vpp_config()
2511         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2512         bfd_session_up(self)
2513         for dummy in range(self.test_session.detect_mult * 2):
2514             p = wait_for_bfd_packet(self)
2515             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2516             self.test_session.send_packet()
2517         self.vpp_session.activate_auth(key)
2518         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2519         self.test_session.sha1_key = key
2520         for dummy in range(self.test_session.detect_mult * 2):
2521             p = wait_for_bfd_packet(self)
2522             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2523             self.test_session.send_packet()
2524         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2525         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
2526
2527     def test_auth_off_immediate(self):
2528         """turn auth off without disturbing session state (immediate)"""
2529         key = self.factory.create_random_key(self)
2530         key.add_vpp_config()
2531         self.vpp_session = VppBFDUDPSession(
2532             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2533         )
2534         self.vpp_session.add_vpp_config()
2535         self.test_session = BFDTestSession(
2536             self,
2537             self.pg0,
2538             AF_INET,
2539             sha1_key=key,
2540             bfd_key_id=self.vpp_session.bfd_key_id,
2541         )
2542         bfd_session_up(self)
2543         # self.vapi.want_bfd_events(enable_disable=0)
2544         for dummy in range(self.test_session.detect_mult * 2):
2545             p = wait_for_bfd_packet(self)
2546             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2547             self.test_session.inc_seq_num()
2548             self.test_session.send_packet()
2549         self.vpp_session.deactivate_auth()
2550         self.test_session.bfd_key_id = None
2551         self.test_session.sha1_key = None
2552         for dummy in range(self.test_session.detect_mult * 2):
2553             p = wait_for_bfd_packet(self)
2554             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2555             self.test_session.inc_seq_num()
2556             self.test_session.send_packet()
2557         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2558         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
2559
2560     def test_auth_change_key_immediate(self):
2561         """change auth key without disturbing session state (immediate)"""
2562         key1 = self.factory.create_random_key(self)
2563         key1.add_vpp_config()
2564         key2 = self.factory.create_random_key(self)
2565         key2.add_vpp_config()
2566         self.vpp_session = VppBFDUDPSession(
2567             self, self.pg0, self.pg0.remote_ip4, sha1_key=key1
2568         )
2569         self.vpp_session.add_vpp_config()
2570         self.test_session = BFDTestSession(
2571             self,
2572             self.pg0,
2573             AF_INET,
2574             sha1_key=key1,
2575             bfd_key_id=self.vpp_session.bfd_key_id,
2576         )
2577         bfd_session_up(self)
2578         for dummy in range(self.test_session.detect_mult * 2):
2579             p = wait_for_bfd_packet(self)
2580             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2581             self.test_session.send_packet()
2582         self.vpp_session.activate_auth(key2)
2583         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2584         self.test_session.sha1_key = key2
2585         for dummy in range(self.test_session.detect_mult * 2):
2586             p = wait_for_bfd_packet(self)
2587             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2588             self.test_session.send_packet()
2589         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2590         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
2591
2592     def test_auth_on_delayed(self):
2593         """turn auth on without disturbing session state (delayed)"""
2594         key = self.factory.create_random_key(self)
2595         key.add_vpp_config()
2596         self.vpp_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2597         self.vpp_session.add_vpp_config()
2598         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2599         bfd_session_up(self)
2600         for dummy in range(self.test_session.detect_mult * 2):
2601             wait_for_bfd_packet(self)
2602             self.test_session.send_packet()
2603         self.vpp_session.activate_auth(key, delayed=True)
2604         for dummy in range(self.test_session.detect_mult * 2):
2605             p = wait_for_bfd_packet(self)
2606             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2607             self.test_session.send_packet()
2608         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2609         self.test_session.sha1_key = key
2610         self.test_session.send_packet()
2611         for dummy in range(self.test_session.detect_mult * 2):
2612             p = wait_for_bfd_packet(self)
2613             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2614             self.test_session.send_packet()
2615         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2616         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
2617
2618     def test_auth_off_delayed(self):
2619         """turn auth off without disturbing session state (delayed)"""
2620         key = self.factory.create_random_key(self)
2621         key.add_vpp_config()
2622         self.vpp_session = VppBFDUDPSession(
2623             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2624         )
2625         self.vpp_session.add_vpp_config()
2626         self.test_session = BFDTestSession(
2627             self,
2628             self.pg0,
2629             AF_INET,
2630             sha1_key=key,
2631             bfd_key_id=self.vpp_session.bfd_key_id,
2632         )
2633         bfd_session_up(self)
2634         for dummy in range(self.test_session.detect_mult * 2):
2635             p = wait_for_bfd_packet(self)
2636             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2637             self.test_session.send_packet()
2638         self.vpp_session.deactivate_auth(delayed=True)
2639         for dummy in range(self.test_session.detect_mult * 2):
2640             p = wait_for_bfd_packet(self)
2641             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2642             self.test_session.send_packet()
2643         self.test_session.bfd_key_id = None
2644         self.test_session.sha1_key = None
2645         self.test_session.send_packet()
2646         for dummy in range(self.test_session.detect_mult * 2):
2647             p = wait_for_bfd_packet(self)
2648             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2649             self.test_session.send_packet()
2650         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2651         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
2652
2653     def test_auth_change_key_delayed(self):
2654         """change auth key without disturbing session state (delayed)"""
2655         key1 = self.factory.create_random_key(self)
2656         key1.add_vpp_config()
2657         key2 = self.factory.create_random_key(self)
2658         key2.add_vpp_config()
2659         self.vpp_session = VppBFDUDPSession(
2660             self, self.pg0, self.pg0.remote_ip4, sha1_key=key1
2661         )
2662         self.vpp_session.add_vpp_config()
2663         self.vpp_session.admin_up()
2664         self.test_session = BFDTestSession(
2665             self,
2666             self.pg0,
2667             AF_INET,
2668             sha1_key=key1,
2669             bfd_key_id=self.vpp_session.bfd_key_id,
2670         )
2671         bfd_session_up(self)
2672         for dummy in range(self.test_session.detect_mult * 2):
2673             p = wait_for_bfd_packet(self)
2674             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2675             self.test_session.send_packet()
2676         self.vpp_session.activate_auth(key2, delayed=True)
2677         for dummy in range(self.test_session.detect_mult * 2):
2678             p = wait_for_bfd_packet(self)
2679             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2680             self.test_session.send_packet()
2681         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2682         self.test_session.sha1_key = key2
2683         self.test_session.send_packet()
2684         for dummy in range(self.test_session.detect_mult * 2):
2685             p = wait_for_bfd_packet(self)
2686             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2687             self.test_session.send_packet()
2688         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2689         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
2690
2691
2692 @tag_run_solo
2693 class BFDCLITestCase(VppTestCase):
2694     """Bidirectional Forwarding Detection (BFD) (CLI)"""
2695
2696     pg0 = None
2697
2698     @classmethod
2699     def setUpClass(cls):
2700         super(BFDCLITestCase, cls).setUpClass()
2701         cls.vapi.cli("set log class bfd level debug")
2702         try:
2703             cls.create_pg_interfaces((0,))
2704             cls.pg0.config_ip4()
2705             cls.pg0.config_ip6()
2706             cls.pg0.resolve_arp()
2707             cls.pg0.resolve_ndp()
2708
2709         except Exception:
2710             super(BFDCLITestCase, cls).tearDownClass()
2711             raise
2712
2713     @classmethod
2714     def tearDownClass(cls):
2715         super(BFDCLITestCase, cls).tearDownClass()
2716
2717     def setUp(self):
2718         super(BFDCLITestCase, self).setUp()
2719         self.factory = AuthKeyFactory()
2720         self.pg0.enable_capture()
2721
2722     def tearDown(self):
2723         try:
2724             self.vapi.want_bfd_events(enable_disable=False)
2725         except UnexpectedApiReturnValueError:
2726             # some tests aren't subscribed, so this is not an issue
2727             pass
2728         self.vapi.collect_events()  # clear the event queue
2729         super(BFDCLITestCase, self).tearDown()
2730
2731     def cli_verify_no_response(self, cli):
2732         """execute a CLI, asserting that the response is empty"""
2733         self.assert_equal(self.vapi.cli(cli), "", "CLI command response")
2734
2735     def cli_verify_response(self, cli, expected):
2736         """execute a CLI, asserting that the response matches expectation"""
2737         try:
2738             reply = self.vapi.cli(cli)
2739         except CliFailedCommandError as cli_error:
2740             reply = str(cli_error)
2741         self.assert_equal(reply.strip(), expected, "CLI command response")
2742
2743     def test_show(self):
2744         """show commands"""
2745         k1 = self.factory.create_random_key(self)
2746         k1.add_vpp_config()
2747         k2 = self.factory.create_random_key(
2748             self, auth_type=BFDAuthType.meticulous_keyed_sha1
2749         )
2750         k2.add_vpp_config()
2751         s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2752         s1.add_vpp_config()
2753         s2 = VppBFDUDPSession(
2754             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=k2
2755         )
2756         s2.add_vpp_config()
2757         self.logger.info(self.vapi.ppcli("show bfd keys"))
2758         self.logger.info(self.vapi.ppcli("show bfd sessions"))
2759         self.logger.info(self.vapi.ppcli("show bfd"))
2760
2761     def test_set_del_sha1_key(self):
2762         """set/delete SHA1 auth key"""
2763         k = self.factory.create_random_key(self)
2764         self.registry.register(k, self.logger)
2765         self.cli_verify_no_response(
2766             "bfd key set conf-key-id %s type keyed-sha1 secret %s"
2767             % (
2768                 k.conf_key_id,
2769                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key),
2770             )
2771         )
2772         self.assertTrue(k.query_vpp_config())
2773         self.vpp_session = VppBFDUDPSession(
2774             self, self.pg0, self.pg0.remote_ip4, sha1_key=k
2775         )
2776         self.vpp_session.add_vpp_config()
2777         self.test_session = BFDTestSession(
2778             self, self.pg0, AF_INET, sha1_key=k, bfd_key_id=self.vpp_session.bfd_key_id
2779         )
2780         self.vapi.want_bfd_events()
2781         bfd_session_up(self)
2782         bfd_session_down(self)
2783         # try to replace the secret for the key - should fail because the key
2784         # is in-use
2785         k2 = self.factory.create_random_key(self)
2786         self.cli_verify_response(
2787             "bfd key set conf-key-id %s type keyed-sha1 secret %s"
2788             % (
2789                 k.conf_key_id,
2790                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key),
2791             ),
2792             "bfd key set: `bfd_auth_set_key' API call failed, "
2793             "rv=-103:BFD object in use",
2794         )
2795         # manipulating the session using old secret should still work
2796         bfd_session_up(self)
2797         bfd_session_down(self)
2798         self.vpp_session.remove_vpp_config()
2799         self.cli_verify_no_response("bfd key del conf-key-id %s" % k.conf_key_id)
2800         self.assertFalse(k.query_vpp_config())
2801
2802     def test_set_del_meticulous_sha1_key(self):
2803         """set/delete meticulous SHA1 auth key"""
2804         k = self.factory.create_random_key(
2805             self, auth_type=BFDAuthType.meticulous_keyed_sha1
2806         )
2807         self.registry.register(k, self.logger)
2808         self.cli_verify_no_response(
2809             "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s"
2810             % (
2811                 k.conf_key_id,
2812                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key),
2813             )
2814         )
2815         self.assertTrue(k.query_vpp_config())
2816         self.vpp_session = VppBFDUDPSession(
2817             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=k
2818         )
2819         self.vpp_session.add_vpp_config()
2820         self.vpp_session.admin_up()
2821         self.test_session = BFDTestSession(
2822             self, self.pg0, AF_INET6, sha1_key=k, bfd_key_id=self.vpp_session.bfd_key_id
2823         )
2824         self.vapi.want_bfd_events()
2825         bfd_session_up(self)
2826         bfd_session_down(self)
2827         # try to replace the secret for the key - should fail because the key
2828         # is in-use
2829         k2 = self.factory.create_random_key(self)
2830         self.cli_verify_response(
2831             "bfd key set conf-key-id %s type keyed-sha1 secret %s"
2832             % (
2833                 k.conf_key_id,
2834                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key),
2835             ),
2836             "bfd key set: `bfd_auth_set_key' API call failed, "
2837             "rv=-103:BFD object in use",
2838         )
2839         # manipulating the session using old secret should still work
2840         bfd_session_up(self)
2841         bfd_session_down(self)
2842         self.vpp_session.remove_vpp_config()
2843         self.cli_verify_no_response("bfd key del conf-key-id %s" % k.conf_key_id)
2844         self.assertFalse(k.query_vpp_config())
2845
2846     def test_add_mod_del_bfd_udp(self):
2847         """create/modify/delete IPv4 BFD UDP session"""
2848         vpp_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2849         self.registry.register(vpp_session, self.logger)
2850         cli_add_cmd = (
2851             "bfd udp session add interface %s local-addr %s "
2852             "peer-addr %s desired-min-tx %s required-min-rx %s "
2853             "detect-mult %s"
2854             % (
2855                 self.pg0.name,
2856                 self.pg0.local_ip4,
2857                 self.pg0.remote_ip4,
2858                 vpp_session.desired_min_tx,
2859                 vpp_session.required_min_rx,
2860                 vpp_session.detect_mult,
2861             )
2862         )
2863         self.cli_verify_no_response(cli_add_cmd)
2864         # 2nd add should fail
2865         self.cli_verify_response(
2866             cli_add_cmd,
2867             "bfd udp session add: `bfd_add_add_session' API call"
2868             " failed, rv=-101:Duplicate BFD object",
2869         )
2870         verify_bfd_session_config(self, vpp_session)
2871         mod_session = VppBFDUDPSession(
2872             self,
2873             self.pg0,
2874             self.pg0.remote_ip4,
2875             required_min_rx=2 * vpp_session.required_min_rx,
2876             desired_min_tx=3 * vpp_session.desired_min_tx,
2877             detect_mult=4 * vpp_session.detect_mult,
2878         )
2879         self.cli_verify_no_response(
2880             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2881             "desired-min-tx %s required-min-rx %s detect-mult %s"
2882             % (
2883                 self.pg0.name,
2884                 self.pg0.local_ip4,
2885                 self.pg0.remote_ip4,
2886                 mod_session.desired_min_tx,
2887                 mod_session.required_min_rx,
2888                 mod_session.detect_mult,
2889             )
2890         )
2891         verify_bfd_session_config(self, mod_session)
2892         cli_del_cmd = (
2893             "bfd udp session del interface %s local-addr %s "
2894             "peer-addr %s" % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2895         )
2896         self.cli_verify_no_response(cli_del_cmd)
2897         # 2nd del is expected to fail
2898         self.cli_verify_response(
2899             cli_del_cmd,
2900             "bfd udp session del: `bfd_udp_del_session' API call"
2901             " failed, rv=-102:No such BFD object",
2902         )
2903         self.assertFalse(vpp_session.query_vpp_config())
2904
2905     def test_add_mod_del_bfd_udp6(self):
2906         """create/modify/delete IPv6 BFD UDP session"""
2907         vpp_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2908         self.registry.register(vpp_session, self.logger)
2909         cli_add_cmd = (
2910             "bfd udp session add interface %s local-addr %s "
2911             "peer-addr %s desired-min-tx %s required-min-rx %s "
2912             "detect-mult %s"
2913             % (
2914                 self.pg0.name,
2915                 self.pg0.local_ip6,
2916                 self.pg0.remote_ip6,
2917                 vpp_session.desired_min_tx,
2918                 vpp_session.required_min_rx,
2919                 vpp_session.detect_mult,
2920             )
2921         )
2922         self.cli_verify_no_response(cli_add_cmd)
2923         # 2nd add should fail
2924         self.cli_verify_response(
2925             cli_add_cmd,
2926             "bfd udp session add: `bfd_add_add_session' API call"
2927             " failed, rv=-101:Duplicate BFD object",
2928         )
2929         verify_bfd_session_config(self, vpp_session)
2930         mod_session = VppBFDUDPSession(
2931             self,
2932             self.pg0,
2933             self.pg0.remote_ip6,
2934             af=AF_INET6,
2935             required_min_rx=2 * vpp_session.required_min_rx,
2936             desired_min_tx=3 * vpp_session.desired_min_tx,
2937             detect_mult=4 * vpp_session.detect_mult,
2938         )
2939         self.cli_verify_no_response(
2940             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2941             "desired-min-tx %s required-min-rx %s detect-mult %s"
2942             % (
2943                 self.pg0.name,
2944                 self.pg0.local_ip6,
2945                 self.pg0.remote_ip6,
2946                 mod_session.desired_min_tx,
2947                 mod_session.required_min_rx,
2948                 mod_session.detect_mult,
2949             )
2950         )
2951         verify_bfd_session_config(self, mod_session)
2952         cli_del_cmd = (
2953             "bfd udp session del interface %s local-addr %s "
2954             "peer-addr %s" % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6)
2955         )
2956         self.cli_verify_no_response(cli_del_cmd)
2957         # 2nd del is expected to fail
2958         self.cli_verify_response(
2959             cli_del_cmd,
2960             "bfd udp session del: `bfd_udp_del_session' API call"
2961             " failed, rv=-102:No such BFD object",
2962         )
2963         self.assertFalse(vpp_session.query_vpp_config())
2964
2965     def test_add_mod_del_bfd_udp_auth(self):
2966         """create/modify/delete IPv4 BFD UDP session (authenticated)"""
2967         key = self.factory.create_random_key(self)
2968         key.add_vpp_config()
2969         vpp_session = VppBFDUDPSession(
2970             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2971         )
2972         self.registry.register(vpp_session, self.logger)
2973         cli_add_cmd = (
2974             "bfd udp session add interface %s local-addr %s "
2975             "peer-addr %s desired-min-tx %s required-min-rx %s "
2976             "detect-mult %s conf-key-id %s bfd-key-id %s"
2977             % (
2978                 self.pg0.name,
2979                 self.pg0.local_ip4,
2980                 self.pg0.remote_ip4,
2981                 vpp_session.desired_min_tx,
2982                 vpp_session.required_min_rx,
2983                 vpp_session.detect_mult,
2984                 key.conf_key_id,
2985                 vpp_session.bfd_key_id,
2986             )
2987         )
2988         self.cli_verify_no_response(cli_add_cmd)
2989         # 2nd add should fail
2990         self.cli_verify_response(
2991             cli_add_cmd,
2992             "bfd udp session add: `bfd_add_add_session' API call"
2993             " failed, rv=-101:Duplicate BFD object",
2994         )
2995         verify_bfd_session_config(self, vpp_session)
2996         mod_session = VppBFDUDPSession(
2997             self,
2998             self.pg0,
2999             self.pg0.remote_ip4,
3000             sha1_key=key,
3001             bfd_key_id=vpp_session.bfd_key_id,
3002             required_min_rx=2 * vpp_session.required_min_rx,
3003             desired_min_tx=3 * vpp_session.desired_min_tx,
3004             detect_mult=4 * vpp_session.detect_mult,
3005         )
3006         self.cli_verify_no_response(
3007             "bfd udp session mod interface %s local-addr %s peer-addr %s "
3008             "desired-min-tx %s required-min-rx %s detect-mult %s"
3009             % (
3010                 self.pg0.name,
3011                 self.pg0.local_ip4,
3012                 self.pg0.remote_ip4,
3013                 mod_session.desired_min_tx,
3014                 mod_session.required_min_rx,
3015                 mod_session.detect_mult,
3016             )
3017         )
3018         verify_bfd_session_config(self, mod_session)
3019         cli_del_cmd = (
3020             "bfd udp session del interface %s local-addr %s "
3021             "peer-addr %s" % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
3022         )
3023         self.cli_verify_no_response(cli_del_cmd)
3024         # 2nd del is expected to fail
3025         self.cli_verify_response(
3026             cli_del_cmd,
3027             "bfd udp session del: `bfd_udp_del_session' API call"
3028             " failed, rv=-102:No such BFD object",
3029         )
3030         self.assertFalse(vpp_session.query_vpp_config())
3031
3032     def test_add_mod_del_bfd_udp6_auth(self):
3033         """create/modify/delete IPv6 BFD UDP session (authenticated)"""
3034         key = self.factory.create_random_key(
3035             self, auth_type=BFDAuthType.meticulous_keyed_sha1
3036         )
3037         key.add_vpp_config()
3038         vpp_session = VppBFDUDPSession(
3039             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key
3040         )
3041         self.registry.register(vpp_session, self.logger)
3042         cli_add_cmd = (
3043             "bfd udp session add interface %s local-addr %s "
3044             "peer-addr %s desired-min-tx %s required-min-rx %s "
3045             "detect-mult %s conf-key-id %s bfd-key-id %s"
3046             % (
3047                 self.pg0.name,
3048                 self.pg0.local_ip6,
3049                 self.pg0.remote_ip6,
3050                 vpp_session.desired_min_tx,
3051                 vpp_session.required_min_rx,
3052                 vpp_session.detect_mult,
3053                 key.conf_key_id,
3054                 vpp_session.bfd_key_id,
3055             )
3056         )
3057         self.cli_verify_no_response(cli_add_cmd)
3058         # 2nd add should fail
3059         self.cli_verify_response(
3060             cli_add_cmd,
3061             "bfd udp session add: `bfd_add_add_session' API call"
3062             " failed, rv=-101:Duplicate BFD object",
3063         )
3064         verify_bfd_session_config(self, vpp_session)
3065         mod_session = VppBFDUDPSession(
3066             self,
3067             self.pg0,
3068             self.pg0.remote_ip6,
3069             af=AF_INET6,
3070             sha1_key=key,
3071             bfd_key_id=vpp_session.bfd_key_id,
3072             required_min_rx=2 * vpp_session.required_min_rx,
3073             desired_min_tx=3 * vpp_session.desired_min_tx,
3074             detect_mult=4 * vpp_session.detect_mult,
3075         )
3076         self.cli_verify_no_response(
3077             "bfd udp session mod interface %s local-addr %s peer-addr %s "
3078             "desired-min-tx %s required-min-rx %s detect-mult %s"
3079             % (
3080                 self.pg0.name,
3081                 self.pg0.local_ip6,
3082                 self.pg0.remote_ip6,
3083                 mod_session.desired_min_tx,
3084                 mod_session.required_min_rx,
3085                 mod_session.detect_mult,
3086             )
3087         )
3088         verify_bfd_session_config(self, mod_session)
3089         cli_del_cmd = (
3090             "bfd udp session del interface %s local-addr %s "
3091             "peer-addr %s" % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6)
3092         )
3093         self.cli_verify_no_response(cli_del_cmd)
3094         # 2nd del is expected to fail
3095         self.cli_verify_response(
3096             cli_del_cmd,
3097             "bfd udp session del: `bfd_udp_del_session' API call"
3098             " failed, rv=-102:No such BFD object",
3099         )
3100         self.assertFalse(vpp_session.query_vpp_config())
3101
3102     def test_auth_on_off(self):
3103         """turn authentication on and off"""
3104         key = self.factory.create_random_key(
3105             self, auth_type=BFDAuthType.meticulous_keyed_sha1
3106         )
3107         key.add_vpp_config()
3108         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
3109         auth_session = VppBFDUDPSession(
3110             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
3111         )
3112         session.add_vpp_config()
3113         cli_activate = (
3114             "bfd udp session auth activate interface %s local-addr %s "
3115             "peer-addr %s conf-key-id %s bfd-key-id %s"
3116             % (
3117                 self.pg0.name,
3118                 self.pg0.local_ip4,
3119                 self.pg0.remote_ip4,
3120                 key.conf_key_id,
3121                 auth_session.bfd_key_id,
3122             )
3123         )
3124         self.cli_verify_no_response(cli_activate)
3125         verify_bfd_session_config(self, auth_session)
3126         self.cli_verify_no_response(cli_activate)
3127         verify_bfd_session_config(self, auth_session)
3128         cli_deactivate = (
3129             "bfd udp session auth deactivate interface %s local-addr %s "
3130             "peer-addr %s " % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
3131         )
3132         self.cli_verify_no_response(cli_deactivate)
3133         verify_bfd_session_config(self, session)
3134         self.cli_verify_no_response(cli_deactivate)
3135         verify_bfd_session_config(self, session)
3136
3137     def test_auth_on_off_delayed(self):
3138         """turn authentication on and off (delayed)"""
3139         key = self.factory.create_random_key(
3140             self, auth_type=BFDAuthType.meticulous_keyed_sha1
3141         )
3142         key.add_vpp_config()
3143         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
3144         auth_session = VppBFDUDPSession(
3145             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
3146         )
3147         session.add_vpp_config()
3148         cli_activate = (
3149             "bfd udp session auth activate interface %s local-addr %s "
3150             "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"
3151             % (
3152                 self.pg0.name,
3153                 self.pg0.local_ip4,
3154                 self.pg0.remote_ip4,
3155                 key.conf_key_id,
3156                 auth_session.bfd_key_id,
3157             )
3158         )
3159         self.cli_verify_no_response(cli_activate)
3160         verify_bfd_session_config(self, auth_session)
3161         self.cli_verify_no_response(cli_activate)
3162         verify_bfd_session_config(self, auth_session)
3163         cli_deactivate = (
3164             "bfd udp session auth deactivate interface %s local-addr %s "
3165             "peer-addr %s delayed yes"
3166             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
3167         )
3168         self.cli_verify_no_response(cli_deactivate)
3169         verify_bfd_session_config(self, session)
3170         self.cli_verify_no_response(cli_deactivate)
3171         verify_bfd_session_config(self, session)
3172
3173     def test_admin_up_down(self):
3174         """put session admin-up and admin-down"""
3175         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
3176         session.add_vpp_config()
3177         cli_down = (
3178             "bfd udp session set-flags admin down interface %s local-addr %s "
3179             "peer-addr %s " % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
3180         )
3181         cli_up = (
3182             "bfd udp session set-flags admin up interface %s local-addr %s "
3183             "peer-addr %s " % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
3184         )
3185         self.cli_verify_no_response(cli_down)
3186         verify_bfd_session_config(self, session, state=BFDState.admin_down)
3187         self.cli_verify_no_response(cli_up)
3188         verify_bfd_session_config(self, session, state=BFDState.down)
3189
3190     def test_set_del_udp_echo_source(self):
3191         """set/del udp echo source"""
3192         self.create_loopback_interfaces(1)
3193         self.loopback0 = self.lo_interfaces[0]
3194         self.loopback0.admin_up()
3195         self.cli_verify_response("show bfd echo-source", "UDP echo source is not set.")
3196         cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
3197         self.cli_verify_no_response(cli_set)
3198         self.cli_verify_response(
3199             "show bfd echo-source",
3200             "UDP echo source is: %s\n"
3201             "IPv4 address usable as echo source: none\n"
3202             "IPv6 address usable as echo source: none" % self.loopback0.name,
3203         )
3204         self.loopback0.config_ip4()
3205         echo_ip4 = str(
3206             ipaddress.IPv4Address(
3207                 int(ipaddress.IPv4Address(self.loopback0.local_ip4)) ^ 1
3208             )
3209         )
3210         self.cli_verify_response(
3211             "show bfd echo-source",
3212             "UDP echo source is: %s\n"
3213             "IPv4 address usable as echo source: %s\n"
3214             "IPv6 address usable as echo source: none"
3215             % (self.loopback0.name, echo_ip4),
3216         )
3217         echo_ip6 = str(
3218             ipaddress.IPv6Address(
3219                 int(ipaddress.IPv6Address(self.loopback0.local_ip6)) ^ 1
3220             )
3221         )
3222         self.loopback0.config_ip6()
3223         self.cli_verify_response(
3224             "show bfd echo-source",
3225             "UDP echo source is: %s\n"
3226             "IPv4 address usable as echo source: %s\n"
3227             "IPv6 address usable as echo source: %s"
3228             % (self.loopback0.name, echo_ip4, echo_ip6),
3229         )
3230         cli_del = "bfd udp echo-source del"
3231         self.cli_verify_no_response(cli_del)
3232         self.cli_verify_response("show bfd echo-source", "UDP echo source is not set.")
3233
3234
3235 if __name__ == "__main__":
3236     unittest.main(testRunner=VppTestRunner)