vlib: add vlib_log_is_enabled
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python3
2 """ BFD tests """
3
4 from __future__ import division
5
6 import binascii
7 from collections import namedtuple
8 import hashlib
9 import ipaddress
10 import reprlib
11 import time
12 import unittest
13 from random import randint, shuffle, getrandbits
14 from socket import AF_INET, AF_INET6, inet_ntop
15 from struct import pack, unpack
16
17 import scapy.compat
18 from scapy.layers.inet import UDP, IP
19 from scapy.layers.inet6 import IPv6
20 from scapy.layers.l2 import Ether, GRE
21 from scapy.packet import Raw
22
23 from config import config
24 from bfd import (
25     VppBFDAuthKey,
26     BFD,
27     BFDAuthType,
28     VppBFDUDPSession,
29     BFDDiagCode,
30     BFDState,
31     BFD_vpp_echo,
32 )
33 from framework import tag_fixme_vpp_workers, tag_fixme_ubuntu2204, tag_fixme_debian11
34 from framework import is_distro_ubuntu2204, is_distro_debian11
35 from framework import VppTestCase, VppTestRunner
36 from framework import tag_run_solo
37 from util import ppp
38 from vpp_ip import DpoProto
39 from vpp_ip_route import VppIpRoute, VppRoutePath
40 from vpp_lo_interface import VppLoInterface
41 from vpp_papi_provider import UnexpectedApiReturnValueError, CliFailedCommandError
42 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
43 from vpp_gre_interface import VppGreInterface
44 from vpp_papi import VppEnum
45
46 USEC_IN_SEC = 1000000
47
48
49 class AuthKeyFactory(object):
50     """Factory class for creating auth keys with unique conf key ID"""
51
52     def __init__(self):
53         self._conf_key_ids = {}
54
55     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
56         """create a random key with unique conf key id"""
57         conf_key_id = randint(0, 0xFFFFFFFF)
58         while conf_key_id in self._conf_key_ids:
59             conf_key_id = randint(0, 0xFFFFFFFF)
60         self._conf_key_ids[conf_key_id] = 1
61         key = scapy.compat.raw(
62             bytearray([randint(0, 255) for _ in range(randint(1, 20))])
63         )
64         return VppBFDAuthKey(
65             test=test, auth_type=auth_type, conf_key_id=conf_key_id, key=key
66         )
67
68
69 class BFDAPITestCase(VppTestCase):
70     """Bidirectional Forwarding Detection (BFD) - API"""
71
72     pg0 = None
73     pg1 = None
74
75     @classmethod
76     def setUpClass(cls):
77         super(BFDAPITestCase, cls).setUpClass()
78         cls.vapi.cli("set log class bfd level debug")
79         try:
80             cls.create_pg_interfaces(range(2))
81             for i in cls.pg_interfaces:
82                 i.config_ip4()
83                 i.config_ip6()
84                 i.resolve_arp()
85
86         except Exception:
87             super(BFDAPITestCase, cls).tearDownClass()
88             raise
89
90     @classmethod
91     def tearDownClass(cls):
92         super(BFDAPITestCase, cls).tearDownClass()
93
94     def setUp(self):
95         super(BFDAPITestCase, self).setUp()
96         self.factory = AuthKeyFactory()
97
98     def test_add_bfd(self):
99         """create a BFD session"""
100         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
101         session.add_vpp_config()
102         self.logger.debug("Session state is %s", session.state)
103         session.remove_vpp_config()
104         session.add_vpp_config()
105         self.logger.debug("Session state is %s", session.state)
106         session.remove_vpp_config()
107
108     def test_double_add(self):
109         """create the same BFD session twice (negative case)"""
110         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
111         session.add_vpp_config()
112
113         with self.vapi.assert_negative_api_retval():
114             session.add_vpp_config()
115
116         session.remove_vpp_config()
117
118     def test_add_bfd6(self):
119         """create IPv6 BFD session"""
120         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
121         session.add_vpp_config()
122         self.logger.debug("Session state is %s", session.state)
123         session.remove_vpp_config()
124         session.add_vpp_config()
125         self.logger.debug("Session state is %s", session.state)
126         session.remove_vpp_config()
127
128     def test_mod_bfd(self):
129         """modify BFD session parameters"""
130         session = VppBFDUDPSession(
131             self,
132             self.pg0,
133             self.pg0.remote_ip4,
134             desired_min_tx=50000,
135             required_min_rx=10000,
136             detect_mult=1,
137         )
138         session.add_vpp_config()
139         s = session.get_bfd_udp_session_dump_entry()
140         self.assert_equal(
141             session.desired_min_tx, s.desired_min_tx, "desired min transmit interval"
142         )
143         self.assert_equal(
144             session.required_min_rx, s.required_min_rx, "required min receive interval"
145         )
146         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
147         session.modify_parameters(
148             desired_min_tx=session.desired_min_tx * 2,
149             required_min_rx=session.required_min_rx * 2,
150             detect_mult=session.detect_mult * 2,
151         )
152         s = session.get_bfd_udp_session_dump_entry()
153         self.assert_equal(
154             session.desired_min_tx, s.desired_min_tx, "desired min transmit interval"
155         )
156         self.assert_equal(
157             session.required_min_rx, s.required_min_rx, "required min receive interval"
158         )
159         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
160
161     def test_upd_bfd(self):
162         """Create/Modify w/ Update BFD session parameters"""
163         session = VppBFDUDPSession(
164             self,
165             self.pg0,
166             self.pg0.remote_ip4,
167             desired_min_tx=50000,
168             required_min_rx=10000,
169             detect_mult=1,
170         )
171         session.upd_vpp_config()
172         s = session.get_bfd_udp_session_dump_entry()
173         self.assert_equal(
174             session.desired_min_tx, s.desired_min_tx, "desired min transmit interval"
175         )
176         self.assert_equal(
177             session.required_min_rx, s.required_min_rx, "required min receive interval"
178         )
179
180         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
181         session.upd_vpp_config(
182             desired_min_tx=session.desired_min_tx * 2,
183             required_min_rx=session.required_min_rx * 2,
184             detect_mult=session.detect_mult * 2,
185         )
186         s = session.get_bfd_udp_session_dump_entry()
187         self.assert_equal(
188             session.desired_min_tx, s.desired_min_tx, "desired min transmit interval"
189         )
190         self.assert_equal(
191             session.required_min_rx, s.required_min_rx, "required min receive interval"
192         )
193         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
194
195     def test_add_sha1_keys(self):
196         """add SHA1 keys"""
197         key_count = 10
198         keys = [self.factory.create_random_key(self) for i in range(0, key_count)]
199         for key in keys:
200             self.assertFalse(key.query_vpp_config())
201         for key in keys:
202             key.add_vpp_config()
203         for key in keys:
204             self.assertTrue(key.query_vpp_config())
205         # remove randomly
206         indexes = list(range(key_count))
207         shuffle(indexes)
208         removed = []
209         for i in indexes:
210             key = keys[i]
211             key.remove_vpp_config()
212             removed.append(i)
213             for j in range(key_count):
214                 key = keys[j]
215                 if j in removed:
216                     self.assertFalse(key.query_vpp_config())
217                 else:
218                     self.assertTrue(key.query_vpp_config())
219         # should be removed now
220         for key in keys:
221             self.assertFalse(key.query_vpp_config())
222         # add back and remove again
223         for key in keys:
224             key.add_vpp_config()
225         for key in keys:
226             self.assertTrue(key.query_vpp_config())
227         for key in keys:
228             key.remove_vpp_config()
229         for key in keys:
230             self.assertFalse(key.query_vpp_config())
231
232     def test_add_bfd_sha1(self):
233         """create a BFD session (SHA1)"""
234         key = self.factory.create_random_key(self)
235         key.add_vpp_config()
236         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
237         session.add_vpp_config()
238         self.logger.debug("Session state is %s", session.state)
239         session.remove_vpp_config()
240         session.add_vpp_config()
241         self.logger.debug("Session state is %s", session.state)
242         session.remove_vpp_config()
243
244     def test_double_add_sha1(self):
245         """create the same BFD session twice (negative case) (SHA1)"""
246         key = self.factory.create_random_key(self)
247         key.add_vpp_config()
248         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
249         session.add_vpp_config()
250         with self.assertRaises(Exception):
251             session.add_vpp_config()
252
253     def test_add_auth_nonexistent_key(self):
254         """create BFD session using non-existent SHA1 (negative case)"""
255         session = VppBFDUDPSession(
256             self,
257             self.pg0,
258             self.pg0.remote_ip4,
259             sha1_key=self.factory.create_random_key(self),
260         )
261         with self.assertRaises(Exception):
262             session.add_vpp_config()
263
264     def test_shared_sha1_key(self):
265         """single SHA1 key shared by multiple BFD sessions"""
266         key = self.factory.create_random_key(self)
267         key.add_vpp_config()
268         sessions = [
269             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4, sha1_key=key),
270             VppBFDUDPSession(
271                 self, self.pg0, self.pg0.remote_ip6, sha1_key=key, af=AF_INET6
272             ),
273             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4, sha1_key=key),
274             VppBFDUDPSession(
275                 self, self.pg1, self.pg1.remote_ip6, sha1_key=key, af=AF_INET6
276             ),
277         ]
278         for s in sessions:
279             s.add_vpp_config()
280         removed = 0
281         for s in sessions:
282             e = key.get_bfd_auth_keys_dump_entry()
283             self.assert_equal(
284                 e.use_count, len(sessions) - removed, "Use count for shared key"
285             )
286             s.remove_vpp_config()
287             removed += 1
288         e = key.get_bfd_auth_keys_dump_entry()
289         self.assert_equal(
290             e.use_count, len(sessions) - removed, "Use count for shared key"
291         )
292
293     def test_activate_auth(self):
294         """activate SHA1 authentication"""
295         key = self.factory.create_random_key(self)
296         key.add_vpp_config()
297         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
298         session.add_vpp_config()
299         session.activate_auth(key)
300
301     def test_deactivate_auth(self):
302         """deactivate SHA1 authentication"""
303         key = self.factory.create_random_key(self)
304         key.add_vpp_config()
305         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
306         session.add_vpp_config()
307         session.activate_auth(key)
308         session.deactivate_auth()
309
310     def test_change_key(self):
311         """change SHA1 key"""
312         key1 = self.factory.create_random_key(self)
313         key2 = self.factory.create_random_key(self)
314         while key2.conf_key_id == key1.conf_key_id:
315             key2 = self.factory.create_random_key(self)
316         key1.add_vpp_config()
317         key2.add_vpp_config()
318         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4, sha1_key=key1)
319         session.add_vpp_config()
320         session.activate_auth(key2)
321
322     def test_set_del_udp_echo_source(self):
323         """set/del udp echo source"""
324         self.create_loopback_interfaces(1)
325         self.loopback0 = self.lo_interfaces[0]
326         self.loopback0.admin_up()
327         echo_source = self.vapi.bfd_udp_get_echo_source()
328         self.assertFalse(echo_source.is_set)
329         self.assertFalse(echo_source.have_usable_ip4)
330         self.assertFalse(echo_source.have_usable_ip6)
331
332         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
333         echo_source = self.vapi.bfd_udp_get_echo_source()
334         self.assertTrue(echo_source.is_set)
335         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
336         self.assertFalse(echo_source.have_usable_ip4)
337         self.assertFalse(echo_source.have_usable_ip6)
338
339         self.loopback0.config_ip4()
340         echo_ip4 = ipaddress.IPv4Address(
341             int(ipaddress.IPv4Address(self.loopback0.local_ip4)) ^ 1
342         ).packed
343         echo_source = self.vapi.bfd_udp_get_echo_source()
344         self.assertTrue(echo_source.is_set)
345         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
346         self.assertTrue(echo_source.have_usable_ip4)
347         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
348         self.assertFalse(echo_source.have_usable_ip6)
349
350         self.loopback0.config_ip6()
351         echo_ip6 = ipaddress.IPv6Address(
352             int(ipaddress.IPv6Address(self.loopback0.local_ip6)) ^ 1
353         ).packed
354
355         echo_source = self.vapi.bfd_udp_get_echo_source()
356         self.assertTrue(echo_source.is_set)
357         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
358         self.assertTrue(echo_source.have_usable_ip4)
359         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
360         self.assertTrue(echo_source.have_usable_ip6)
361         self.assertEqual(echo_source.ip6_addr.packed, echo_ip6)
362
363         self.vapi.bfd_udp_del_echo_source()
364         echo_source = self.vapi.bfd_udp_get_echo_source()
365         self.assertFalse(echo_source.is_set)
366         self.assertFalse(echo_source.have_usable_ip4)
367         self.assertFalse(echo_source.have_usable_ip6)
368
369
370 class BFDTestSession(object):
371     """BFD session as seen from test framework side"""
372
373     def __init__(
374         self,
375         test,
376         interface,
377         af,
378         detect_mult=3,
379         sha1_key=None,
380         bfd_key_id=None,
381         our_seq_number=None,
382         tunnel_header=None,
383         phy_interface=None,
384     ):
385         self.test = test
386         self.af = af
387         self.sha1_key = sha1_key
388         self.bfd_key_id = bfd_key_id
389         self.interface = interface
390         if phy_interface:
391             self.phy_interface = phy_interface
392         else:
393             self.phy_interface = self.interface
394         self.udp_sport = randint(49152, 65535)
395         if our_seq_number is None:
396             self.our_seq_number = randint(0, 40000000)
397         else:
398             self.our_seq_number = our_seq_number
399         self.vpp_seq_number = None
400         self.my_discriminator = 0
401         self.desired_min_tx = 300000
402         self.required_min_rx = 300000
403         self.required_min_echo_rx = None
404         self.detect_mult = detect_mult
405         self.diag = BFDDiagCode.no_diagnostic
406         self.your_discriminator = None
407         self.state = BFDState.down
408         self.auth_type = BFDAuthType.no_auth
409         self.tunnel_header = tunnel_header
410         self.tx_packets = 0
411         self.rx_packets = 0
412         self.tx_packets_echo = 0
413         self.rx_packets_echo = 0
414
415     def inc_seq_num(self):
416         """increment sequence number, wrapping if needed"""
417         if self.our_seq_number == 0xFFFFFFFF:
418             self.our_seq_number = 0
419         else:
420             self.our_seq_number += 1
421
422     def update(
423         self,
424         my_discriminator=None,
425         your_discriminator=None,
426         desired_min_tx=None,
427         required_min_rx=None,
428         required_min_echo_rx=None,
429         detect_mult=None,
430         diag=None,
431         state=None,
432         auth_type=None,
433     ):
434         """update BFD parameters associated with session"""
435         if my_discriminator is not None:
436             self.my_discriminator = my_discriminator
437         if your_discriminator is not None:
438             self.your_discriminator = your_discriminator
439         if required_min_rx is not None:
440             self.required_min_rx = required_min_rx
441         if required_min_echo_rx is not None:
442             self.required_min_echo_rx = required_min_echo_rx
443         if desired_min_tx is not None:
444             self.desired_min_tx = desired_min_tx
445         if detect_mult is not None:
446             self.detect_mult = detect_mult
447         if diag is not None:
448             self.diag = diag
449         if state is not None:
450             self.state = state
451         if auth_type is not None:
452             self.auth_type = auth_type
453
454     def fill_packet_fields(self, packet):
455         """set packet fields with known values in packet"""
456         bfd = packet[BFD]
457         if self.my_discriminator:
458             self.test.logger.debug(
459                 "BFD: setting packet.my_discriminator=%s", self.my_discriminator
460             )
461             bfd.my_discriminator = self.my_discriminator
462         if self.your_discriminator:
463             self.test.logger.debug(
464                 "BFD: setting packet.your_discriminator=%s", self.your_discriminator
465             )
466             bfd.your_discriminator = self.your_discriminator
467         if self.required_min_rx:
468             self.test.logger.debug(
469                 "BFD: setting packet.required_min_rx_interval=%s", self.required_min_rx
470             )
471             bfd.required_min_rx_interval = self.required_min_rx
472         if self.required_min_echo_rx:
473             self.test.logger.debug(
474                 "BFD: setting packet.required_min_echo_rx=%s", self.required_min_echo_rx
475             )
476             bfd.required_min_echo_rx_interval = self.required_min_echo_rx
477         if self.desired_min_tx:
478             self.test.logger.debug(
479                 "BFD: setting packet.desired_min_tx_interval=%s", self.desired_min_tx
480             )
481             bfd.desired_min_tx_interval = self.desired_min_tx
482         if self.detect_mult:
483             self.test.logger.debug(
484                 "BFD: setting packet.detect_mult=%s", self.detect_mult
485             )
486             bfd.detect_mult = self.detect_mult
487         if self.diag:
488             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
489             bfd.diag = self.diag
490         if self.state:
491             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
492             bfd.state = self.state
493         if self.auth_type:
494             # this is used by a negative test-case
495             self.test.logger.debug("BFD: setting packet.auth_type=%s", self.auth_type)
496             bfd.auth_type = self.auth_type
497
498     def create_packet(self):
499         """create a BFD packet, reflecting the current state of session"""
500         if self.sha1_key:
501             bfd = BFD(flags="A")
502             bfd.auth_type = self.sha1_key.auth_type
503             bfd.auth_len = BFD.sha1_auth_len
504             bfd.auth_key_id = self.bfd_key_id
505             bfd.auth_seq_num = self.our_seq_number
506             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
507         else:
508             bfd = BFD()
509         packet = Ether(
510             src=self.phy_interface.remote_mac, dst=self.phy_interface.local_mac
511         )
512         if self.tunnel_header:
513             packet = packet / self.tunnel_header
514         if self.af == AF_INET6:
515             packet = (
516                 packet
517                 / IPv6(
518                     src=self.interface.remote_ip6,
519                     dst=self.interface.local_ip6,
520                     hlim=255,
521                 )
522                 / UDP(sport=self.udp_sport, dport=BFD.udp_dport)
523                 / bfd
524             )
525         else:
526             packet = (
527                 packet
528                 / IP(
529                     src=self.interface.remote_ip4, dst=self.interface.local_ip4, ttl=255
530                 )
531                 / UDP(sport=self.udp_sport, dport=BFD.udp_dport)
532                 / bfd
533             )
534         self.test.logger.debug("BFD: Creating packet")
535         self.fill_packet_fields(packet)
536         if self.sha1_key:
537             hash_material = (
538                 scapy.compat.raw(packet[BFD])[:32]
539                 + self.sha1_key.key
540                 + b"\0" * (20 - len(self.sha1_key.key))
541             )
542             self.test.logger.debug(
543                 "BFD: Calculated SHA1 hash: %s"
544                 % hashlib.sha1(hash_material).hexdigest()
545             )
546             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
547         return packet
548
549     def send_packet(self, packet=None, interface=None):
550         """send packet on interface, creating the packet if needed"""
551         if packet is None:
552             packet = self.create_packet()
553         if interface is None:
554             interface = self.phy_interface
555         self.test.logger.debug(ppp("Sending packet:", packet))
556         interface.add_stream(packet)
557         self.tx_packets += 1
558         self.test.pg_start()
559
560     def verify_sha1_auth(self, packet):
561         """Verify correctness of authentication in BFD layer."""
562         bfd = packet[BFD]
563         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
564         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type, BFDAuthType)
565         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
566         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
567         if self.vpp_seq_number is None:
568             self.vpp_seq_number = bfd.auth_seq_num
569             self.test.logger.debug(
570                 "Received initial sequence number: %s" % self.vpp_seq_number
571             )
572         else:
573             recvd_seq_num = bfd.auth_seq_num
574             self.test.logger.debug(
575                 "Received followup sequence number: %s" % recvd_seq_num
576             )
577             if self.vpp_seq_number < 0xFFFFFFFF:
578                 if self.sha1_key.auth_type == BFDAuthType.meticulous_keyed_sha1:
579                     self.test.assert_equal(
580                         recvd_seq_num, self.vpp_seq_number + 1, "BFD sequence number"
581                     )
582                 else:
583                     self.test.assert_in_range(
584                         recvd_seq_num,
585                         self.vpp_seq_number,
586                         self.vpp_seq_number + 1,
587                         "BFD sequence number",
588                     )
589             else:
590                 if self.sha1_key.auth_type == BFDAuthType.meticulous_keyed_sha1:
591                     self.test.assert_equal(recvd_seq_num, 0, "BFD sequence number")
592                 else:
593                     self.test.assertIn(
594                         recvd_seq_num,
595                         (self.vpp_seq_number, 0),
596                         "BFD sequence number not one of "
597                         "(%s, 0)" % self.vpp_seq_number,
598                     )
599             self.vpp_seq_number = recvd_seq_num
600         # last 20 bytes represent the hash - so replace them with the key,
601         # pad the result with zeros and hash the result
602         hash_material = (
603             bfd.original[:-20]
604             + self.sha1_key.key
605             + b"\0" * (20 - len(self.sha1_key.key))
606         )
607         expected_hash = hashlib.sha1(hash_material).hexdigest()
608         self.test.assert_equal(
609             binascii.hexlify(bfd.auth_key_hash), expected_hash.encode(), "Auth key hash"
610         )
611
612     def verify_bfd(self, packet):
613         """Verify correctness of BFD layer."""
614         bfd = packet[BFD]
615         self.test.assert_equal(bfd.version, 1, "BFD version")
616         self.test.assert_equal(
617             bfd.your_discriminator, self.my_discriminator, "BFD - your discriminator"
618         )
619         if self.sha1_key:
620             self.verify_sha1_auth(packet)
621
622
623 def bfd_session_up(test):
624     """Bring BFD session up"""
625     test.logger.info("BFD: Waiting for slow hello")
626     p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
627     old_offset = None
628     if hasattr(test, "vpp_clock_offset"):
629         old_offset = test.vpp_clock_offset
630     test.vpp_clock_offset = time.time() - float(p.time)
631     test.logger.debug("BFD: Calculated vpp clock offset: %s", test.vpp_clock_offset)
632     if old_offset:
633         test.assertAlmostEqual(
634             old_offset,
635             test.vpp_clock_offset,
636             delta=0.5,
637             msg="vpp clock offset not stable (new: %s, old: %s)"
638             % (test.vpp_clock_offset, old_offset),
639         )
640     test.logger.info("BFD: Sending Init")
641     test.test_session.update(
642         my_discriminator=randint(0, 40000000),
643         your_discriminator=p[BFD].my_discriminator,
644         state=BFDState.init,
645     )
646     if (
647         test.test_session.sha1_key
648         and test.test_session.sha1_key.auth_type == BFDAuthType.meticulous_keyed_sha1
649     ):
650         test.test_session.inc_seq_num()
651     test.test_session.send_packet()
652     test.logger.info("BFD: Waiting for event")
653     e = test.vapi.wait_for_event(1, "bfd_udp_session_event")
654     verify_event(test, e, expected_state=BFDState.up)
655     test.logger.info("BFD: Session is Up")
656     test.test_session.update(state=BFDState.up)
657     if (
658         test.test_session.sha1_key
659         and test.test_session.sha1_key.auth_type == BFDAuthType.meticulous_keyed_sha1
660     ):
661         test.test_session.inc_seq_num()
662     test.test_session.send_packet()
663     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
664
665
666 def bfd_session_down(test):
667     """Bring BFD session down"""
668     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
669     test.test_session.update(state=BFDState.down)
670     if (
671         test.test_session.sha1_key
672         and test.test_session.sha1_key.auth_type == BFDAuthType.meticulous_keyed_sha1
673     ):
674         test.test_session.inc_seq_num()
675     test.test_session.send_packet()
676     test.logger.info("BFD: Waiting for event")
677     e = test.vapi.wait_for_event(1, "bfd_udp_session_event")
678     verify_event(test, e, expected_state=BFDState.down)
679     test.logger.info("BFD: Session is Down")
680     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
681
682
683 def verify_bfd_session_config(test, session, state=None):
684     dump = session.get_bfd_udp_session_dump_entry()
685     test.assertIsNotNone(dump)
686     # since dump is not none, we have verified that sw_if_index and addresses
687     # are valid (in get_bfd_udp_session_dump_entry)
688     if state:
689         test.assert_equal(dump.state, state, "session state")
690     test.assert_equal(
691         dump.required_min_rx, session.required_min_rx, "required min rx interval"
692     )
693     test.assert_equal(
694         dump.desired_min_tx, session.desired_min_tx, "desired min tx interval"
695     )
696     test.assert_equal(dump.detect_mult, session.detect_mult, "detect multiplier")
697     if session.sha1_key is None:
698         test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
699     else:
700         test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
701         test.assert_equal(dump.bfd_key_id, session.bfd_key_id, "bfd key id")
702         test.assert_equal(
703             dump.conf_key_id, session.sha1_key.conf_key_id, "config key id"
704         )
705
706
707 def verify_ip(test, packet):
708     """Verify correctness of IP layer."""
709     if test.vpp_session.af == AF_INET6:
710         ip = packet[IPv6]
711         local_ip = test.vpp_session.interface.local_ip6
712         remote_ip = test.vpp_session.interface.remote_ip6
713         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
714     else:
715         ip = packet[IP]
716         local_ip = test.vpp_session.interface.local_ip4
717         remote_ip = test.vpp_session.interface.remote_ip4
718         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
719     test.assert_equal(ip.src, local_ip, "IP source address")
720     test.assert_equal(ip.dst, remote_ip, "IP destination address")
721
722
723 def verify_udp(test, packet):
724     """Verify correctness of UDP layer."""
725     udp = packet[UDP]
726     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
727     test.assert_in_range(
728         udp.sport, BFD.udp_sport_min, BFD.udp_sport_max, "UDP source port"
729     )
730
731
732 def verify_event(test, event, expected_state):
733     """Verify correctness of event values."""
734     e = event
735     test.logger.debug("BFD: Event: %s" % reprlib.repr(e))
736     test.assert_equal(
737         e.sw_if_index, test.vpp_session.interface.sw_if_index, "BFD interface index"
738     )
739
740     test.assert_equal(
741         str(e.local_addr), test.vpp_session.local_addr, "Local IPv6 address"
742     )
743     test.assert_equal(str(e.peer_addr), test.vpp_session.peer_addr, "Peer IPv6 address")
744     test.assert_equal(e.state, expected_state, BFDState)
745
746
747 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
748     """wait for BFD packet and verify its correctness
749
750     :param timeout: how long to wait
751     :param pcap_time_min: ignore packets with pcap timestamp lower than this
752
753     :returns: tuple (packet, time spent waiting for packet)
754     """
755     test.logger.info("BFD: Waiting for BFD packet")
756     deadline = time.time() + timeout
757     counter = 0
758     while True:
759         counter += 1
760         # sanity check
761         test.assert_in_range(counter, 0, 100, "number of packets ignored")
762         time_left = deadline - time.time()
763         if time_left < 0:
764             raise CaptureTimeoutError("Packet did not arrive within timeout")
765         p = test.pg0.wait_for_packet(timeout=time_left)
766         test.test_session.rx_packets += 1
767         test.logger.debug(ppp("BFD: Got packet:", p))
768         if pcap_time_min is not None and p.time < pcap_time_min:
769             test.logger.debug(
770                 ppp(
771                     "BFD: ignoring packet (pcap time %s < "
772                     "pcap time min %s):" % (p.time, pcap_time_min),
773                     p,
774                 )
775             )
776         else:
777             break
778     test.logger.debug(test.vapi.ppcli("show trace"))
779     if is_tunnel:
780         # strip an IP layer and move to the next
781         p = p[IP].payload
782
783     bfd = p[BFD]
784     if bfd is None:
785         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
786     if bfd.payload:
787         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
788     verify_ip(test, p)
789     verify_udp(test, p)
790     test.test_session.verify_bfd(p)
791     return p
792
793
794 BFDStats = namedtuple("BFDStats", "rx rx_echo tx tx_echo")
795
796
797 def bfd_grab_stats_snapshot(test, bs_idx=0, thread_index=None):
798     s = test.statistics
799     ti = thread_index
800     if ti is None:
801         rx = s["/bfd/rx-session-counters"][:, bs_idx].sum_packets()
802         rx_echo = s["/bfd/rx-session-echo-counters"][:, bs_idx].sum_packets()
803         tx = s["/bfd/tx-session-counters"][:, bs_idx].sum_packets()
804         tx_echo = s["/bfd/tx-session-echo-counters"][:, bs_idx].sum_packets()
805     else:
806         rx = s["/bfd/rx-session-counters"][ti, bs_idx].sum_packets()
807         rx_echo = s["/bfd/rx-session-echo-counters"][ti, bs_idx].sum_packets()
808         tx = s["/bfd/tx-session-counters"][ti, bs_idx].sum_packets()
809         tx_echo = s["/bfd/tx-session-echo-counters"][ti, bs_idx].sum_packets()
810     return BFDStats(rx, rx_echo, tx, tx_echo)
811
812
813 def bfd_stats_diff(stats_before, stats_after):
814     rx = stats_after.rx - stats_before.rx
815     rx_echo = stats_after.rx_echo - stats_before.rx_echo
816     tx = stats_after.tx - stats_before.tx
817     tx_echo = stats_after.tx_echo - stats_before.tx_echo
818     return BFDStats(rx, rx_echo, tx, tx_echo)
819
820
821 @tag_run_solo
822 @tag_fixme_ubuntu2204
823 @tag_fixme_debian11
824 class BFD4TestCase(VppTestCase):
825     """Bidirectional Forwarding Detection (BFD)"""
826
827     pg0 = None
828     vpp_clock_offset = None
829     vpp_session = None
830     test_session = None
831
832     @classmethod
833     def setUpClass(cls):
834         if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
835             cls, "vpp"
836         ):
837             return
838         super(BFD4TestCase, cls).setUpClass()
839         cls.vapi.cli("set log class bfd level debug")
840         try:
841             cls.create_pg_interfaces([0])
842             cls.create_loopback_interfaces(1)
843             cls.loopback0 = cls.lo_interfaces[0]
844             cls.loopback0.config_ip4()
845             cls.loopback0.admin_up()
846             cls.pg0.config_ip4()
847             cls.pg0.configure_ipv4_neighbors()
848             cls.pg0.admin_up()
849             cls.pg0.resolve_arp()
850
851         except Exception:
852             super(BFD4TestCase, cls).tearDownClass()
853             raise
854
855     @classmethod
856     def tearDownClass(cls):
857         super(BFD4TestCase, cls).tearDownClass()
858
859     def setUp(self):
860         super(BFD4TestCase, self).setUp()
861         self.factory = AuthKeyFactory()
862         self.vapi.want_bfd_events()
863         self.pg0.enable_capture()
864         try:
865             self.bfd_udp4_sessions = self.statistics["/bfd/udp4/sessions"]
866             self.bfd_udp6_sessions = self.statistics["/bfd/udp6/sessions"]
867             self.vapi.cli("trace add bfd-process 500")
868             self.vpp_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
869             self.vpp_session.add_vpp_config()
870             self.vpp_session.admin_up()
871             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
872         except BaseException:
873             self.vapi.want_bfd_events(enable_disable=0)
874             raise
875
876     def tearDown(self):
877         if not self.vpp_dead:
878             self.vapi.want_bfd_events(enable_disable=0)
879         self.vapi.collect_events()  # clear the event queue
880         super(BFD4TestCase, self).tearDown()
881
882     def test_session_up(self):
883         """bring BFD session up"""
884         bfd_session_up(self)
885         bfd_udp4_sessions = self.statistics["/bfd/udp4/sessions"]
886         bfd_udp6_sessions = self.statistics["/bfd/udp6/sessions"]
887         self.assert_equal(bfd_udp4_sessions - self.bfd_udp4_sessions, 1)
888         self.assert_equal(bfd_udp6_sessions, self.bfd_udp6_sessions)
889
890     def test_session_up_by_ip(self):
891         """bring BFD session up - first frame looked up by address pair"""
892         self.logger.info("BFD: Sending Slow control frame")
893         self.test_session.update(my_discriminator=randint(0, 40000000))
894         self.test_session.send_packet()
895         self.pg0.enable_capture()
896         p = self.pg0.wait_for_packet(1)
897         self.assert_equal(
898             p[BFD].your_discriminator,
899             self.test_session.my_discriminator,
900             "BFD - your discriminator",
901         )
902         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
903         self.test_session.update(
904             your_discriminator=p[BFD].my_discriminator, state=BFDState.up
905         )
906         self.logger.info("BFD: Waiting for event")
907         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
908         verify_event(self, e, expected_state=BFDState.init)
909         self.logger.info("BFD: Sending Up")
910         self.test_session.send_packet()
911         self.logger.info("BFD: Waiting for event")
912         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
913         verify_event(self, e, expected_state=BFDState.up)
914         self.logger.info("BFD: Session is Up")
915         self.test_session.update(state=BFDState.up)
916         self.test_session.send_packet()
917         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
918
919     def test_session_down(self):
920         """bring BFD session down"""
921         bfd_session_up(self)
922         bfd_session_down(self)
923
924     def test_hold_up(self):
925         """hold BFD session up"""
926         bfd_session_up(self)
927         for dummy in range(self.test_session.detect_mult * 2):
928             wait_for_bfd_packet(self)
929             self.test_session.send_packet()
930         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
931
932     def test_slow_timer(self):
933         """verify slow periodic control frames while session down"""
934         packet_count = 3
935         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
936         prev_packet = wait_for_bfd_packet(self, 2)
937         for dummy in range(packet_count):
938             next_packet = wait_for_bfd_packet(self, 2)
939             time_diff = next_packet.time - prev_packet.time
940             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
941             # to work around timing issues
942             self.assert_in_range(time_diff, 0.70, 1.05, "time between slow packets")
943             prev_packet = next_packet
944
945     def test_zero_remote_min_rx(self):
946         """no packets when zero remote required min rx interval"""
947         bfd_session_up(self)
948         self.test_session.update(required_min_rx=0)
949         self.test_session.send_packet()
950         for dummy in range(self.test_session.detect_mult):
951             self.sleep(
952                 self.vpp_session.required_min_rx / USEC_IN_SEC,
953                 "sleep before transmitting bfd packet",
954             )
955             self.test_session.send_packet()
956             try:
957                 p = wait_for_bfd_packet(self, timeout=0)
958                 self.logger.error(ppp("Received unexpected packet:", p))
959             except CaptureTimeoutError:
960                 pass
961         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
962         self.test_session.update(required_min_rx=300000)
963         for dummy in range(3):
964             self.test_session.send_packet()
965             wait_for_bfd_packet(
966                 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC
967             )
968         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
969
970     def test_conn_down(self):
971         """verify session goes down after inactivity"""
972         bfd_session_up(self)
973         detection_time = (
974             self.test_session.detect_mult
975             * self.vpp_session.required_min_rx
976             / USEC_IN_SEC
977         )
978         self.sleep(detection_time, "waiting for BFD session time-out")
979         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
980         verify_event(self, e, expected_state=BFDState.down)
981
982     def test_peer_discr_reset_sess_down(self):
983         """peer discriminator reset after session goes down"""
984         bfd_session_up(self)
985         detection_time = (
986             self.test_session.detect_mult
987             * self.vpp_session.required_min_rx
988             / USEC_IN_SEC
989         )
990         self.sleep(detection_time, "waiting for BFD session time-out")
991         self.test_session.my_discriminator = 0
992         wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
993
994     def test_large_required_min_rx(self):
995         """large remote required min rx interval"""
996         bfd_session_up(self)
997         p = wait_for_bfd_packet(self)
998         interval = 3000000
999         self.test_session.update(required_min_rx=interval)
1000         self.test_session.send_packet()
1001         time_mark = time.time()
1002         count = 0
1003         # busy wait here, trying to collect a packet or event, vpp is not
1004         # allowed to send packets and the session will timeout first - so the
1005         # Up->Down event must arrive before any packets do
1006         while time.time() < time_mark + interval / USEC_IN_SEC:
1007             try:
1008                 p = wait_for_bfd_packet(self, timeout=0)
1009                 # if vpp managed to send a packet before we did the session
1010                 # session update, then that's fine, ignore it
1011                 if p.time < time_mark - self.vpp_clock_offset:
1012                     continue
1013                 self.logger.error(ppp("Received unexpected packet:", p))
1014                 count += 1
1015             except CaptureTimeoutError:
1016                 pass
1017             events = self.vapi.collect_events()
1018             if len(events) > 0:
1019                 verify_event(self, events[0], BFDState.down)
1020                 break
1021         self.assert_equal(count, 0, "number of packets received")
1022
1023     def test_immediate_remote_min_rx_reduction(self):
1024         """immediately honor remote required min rx reduction"""
1025         self.vpp_session.remove_vpp_config()
1026         self.vpp_session = VppBFDUDPSession(
1027             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000
1028         )
1029         self.pg0.enable_capture()
1030         self.vpp_session.add_vpp_config()
1031         self.test_session.update(desired_min_tx=1000000, required_min_rx=1000000)
1032         bfd_session_up(self)
1033         reference_packet = wait_for_bfd_packet(self)
1034         time_mark = time.time()
1035         interval = 300000
1036         self.test_session.update(required_min_rx=interval)
1037         self.test_session.send_packet()
1038         extra_time = time.time() - time_mark
1039         p = wait_for_bfd_packet(self)
1040         # first packet is allowed to be late by time we spent doing the update
1041         # calculated in extra_time
1042         self.assert_in_range(
1043             p.time - reference_packet.time,
1044             0.95 * 0.75 * interval / USEC_IN_SEC,
1045             1.05 * interval / USEC_IN_SEC + extra_time,
1046             "time between BFD packets",
1047         )
1048         reference_packet = p
1049         for dummy in range(3):
1050             p = wait_for_bfd_packet(self)
1051             diff = p.time - reference_packet.time
1052             self.assert_in_range(
1053                 diff,
1054                 0.95 * 0.75 * interval / USEC_IN_SEC,
1055                 1.05 * interval / USEC_IN_SEC,
1056                 "time between BFD packets",
1057             )
1058             reference_packet = p
1059
1060     def test_modify_req_min_rx_double(self):
1061         """modify session - double required min rx"""
1062         bfd_session_up(self)
1063         p = wait_for_bfd_packet(self)
1064         self.test_session.update(desired_min_tx=10000, required_min_rx=10000)
1065         self.test_session.send_packet()
1066         # double required min rx
1067         self.vpp_session.modify_parameters(
1068             required_min_rx=2 * self.vpp_session.required_min_rx
1069         )
1070         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1071         # poll bit needs to be set
1072         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet")
1073         # finish poll sequence with final packet
1074         final = self.test_session.create_packet()
1075         final[BFD].flags = "F"
1076         timeout = (
1077             self.test_session.detect_mult
1078             * max(self.test_session.desired_min_tx, self.vpp_session.required_min_rx)
1079             / USEC_IN_SEC
1080         )
1081         self.test_session.send_packet(final)
1082         time_mark = time.time()
1083         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_event")
1084         verify_event(self, e, expected_state=BFDState.down)
1085         time_to_event = time.time() - time_mark
1086         self.assert_in_range(
1087             time_to_event, 0.9 * timeout, 1.1 * timeout, "session timeout"
1088         )
1089
1090     def test_modify_req_min_rx_halve(self):
1091         """modify session - halve required min rx"""
1092         self.vpp_session.modify_parameters(
1093             required_min_rx=2 * self.vpp_session.required_min_rx
1094         )
1095         bfd_session_up(self)
1096         p = wait_for_bfd_packet(self)
1097         self.test_session.update(desired_min_tx=10000, required_min_rx=10000)
1098         self.test_session.send_packet()
1099         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1100         # halve required min rx
1101         old_required_min_rx = self.vpp_session.required_min_rx
1102         self.vpp_session.modify_parameters(
1103             required_min_rx=self.vpp_session.required_min_rx // 2
1104         )
1105         # now we wait 0.8*3*old-req-min-rx and the session should still be up
1106         self.sleep(
1107             0.8 * self.vpp_session.detect_mult * old_required_min_rx / USEC_IN_SEC,
1108             "wait before finishing poll sequence",
1109         )
1110         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
1111         p = wait_for_bfd_packet(self)
1112         # poll bit needs to be set
1113         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet")
1114         # finish poll sequence with final packet
1115         final = self.test_session.create_packet()
1116         final[BFD].flags = "F"
1117         self.test_session.send_packet(final)
1118         # now the session should time out under new conditions
1119         detection_time = (
1120             self.test_session.detect_mult
1121             * self.vpp_session.required_min_rx
1122             / USEC_IN_SEC
1123         )
1124         before = time.time()
1125         e = self.vapi.wait_for_event(2 * detection_time, "bfd_udp_session_event")
1126         after = time.time()
1127         self.assert_in_range(
1128             after - before,
1129             0.9 * detection_time,
1130             1.1 * detection_time,
1131             "time before bfd session goes down",
1132         )
1133         verify_event(self, e, expected_state=BFDState.down)
1134
1135     def test_modify_detect_mult(self):
1136         """modify detect multiplier"""
1137         bfd_session_up(self)
1138         p = wait_for_bfd_packet(self)
1139         self.vpp_session.modify_parameters(detect_mult=1)
1140         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1141         self.assert_equal(
1142             self.vpp_session.detect_mult, p[BFD].detect_mult, "detect mult"
1143         )
1144         # poll bit must not be set
1145         self.assertNotIn(
1146             "P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet"
1147         )
1148         self.vpp_session.modify_parameters(detect_mult=10)
1149         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1150         self.assert_equal(
1151             self.vpp_session.detect_mult, p[BFD].detect_mult, "detect mult"
1152         )
1153         # poll bit must not be set
1154         self.assertNotIn(
1155             "P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet"
1156         )
1157
1158     def test_queued_poll(self):
1159         """test poll sequence queueing"""
1160         bfd_session_up(self)
1161         p = wait_for_bfd_packet(self)
1162         self.vpp_session.modify_parameters(
1163             required_min_rx=2 * self.vpp_session.required_min_rx
1164         )
1165         p = wait_for_bfd_packet(self)
1166         poll_sequence_start = time.time()
1167         poll_sequence_length_min = 0.5
1168         send_final_after = time.time() + poll_sequence_length_min
1169         # poll bit needs to be set
1170         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet")
1171         self.assert_equal(
1172             p[BFD].required_min_rx_interval,
1173             self.vpp_session.required_min_rx,
1174             "BFD required min rx interval",
1175         )
1176         self.vpp_session.modify_parameters(
1177             required_min_rx=2 * self.vpp_session.required_min_rx
1178         )
1179         # 2nd poll sequence should be queued now
1180         # don't send the reply back yet, wait for some time to emulate
1181         # longer round-trip time
1182         packet_count = 0
1183         while time.time() < send_final_after:
1184             self.test_session.send_packet()
1185             p = wait_for_bfd_packet(self)
1186             self.assert_equal(
1187                 len(self.vapi.collect_events()), 0, "number of bfd events"
1188             )
1189             self.assert_equal(
1190                 p[BFD].required_min_rx_interval,
1191                 self.vpp_session.required_min_rx,
1192                 "BFD required min rx interval",
1193             )
1194             packet_count += 1
1195             # poll bit must be set
1196             self.assertIn(
1197                 "P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet"
1198             )
1199         final = self.test_session.create_packet()
1200         final[BFD].flags = "F"
1201         self.test_session.send_packet(final)
1202         # finish 1st with final
1203         poll_sequence_length = time.time() - poll_sequence_start
1204         # vpp must wait for some time before starting new poll sequence
1205         poll_no_2_started = False
1206         for dummy in range(2 * packet_count):
1207             p = wait_for_bfd_packet(self)
1208             self.assert_equal(
1209                 len(self.vapi.collect_events()), 0, "number of bfd events"
1210             )
1211             if "P" in p.sprintf("%BFD.flags%"):
1212                 poll_no_2_started = True
1213                 if time.time() < poll_sequence_start + poll_sequence_length:
1214                     raise Exception("VPP started 2nd poll sequence too soon")
1215                 final = self.test_session.create_packet()
1216                 final[BFD].flags = "F"
1217                 self.test_session.send_packet(final)
1218                 break
1219             else:
1220                 self.test_session.send_packet()
1221         self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1222         # finish 2nd with final
1223         final = self.test_session.create_packet()
1224         final[BFD].flags = "F"
1225         self.test_session.send_packet(final)
1226         p = wait_for_bfd_packet(self)
1227         # poll bit must not be set
1228         self.assertNotIn("P", p.sprintf("%BFD.flags%"), "Poll bit set in BFD packet")
1229
1230     # returning inconsistent results requiring retries in per-patch tests
1231     @unittest.skipUnless(config.extended, "part of extended tests")
1232     def test_poll_response(self):
1233         """test correct response to control frame with poll bit set"""
1234         bfd_session_up(self)
1235         poll = self.test_session.create_packet()
1236         poll[BFD].flags = "P"
1237         self.test_session.send_packet(poll)
1238         final = wait_for_bfd_packet(
1239             self, pcap_time_min=time.time() - self.vpp_clock_offset
1240         )
1241         self.assertIn("F", final.sprintf("%BFD.flags%"))
1242
1243     def test_no_periodic_if_remote_demand(self):
1244         """no periodic frames outside poll sequence if remote demand set"""
1245         bfd_session_up(self)
1246         demand = self.test_session.create_packet()
1247         demand[BFD].flags = "D"
1248         self.test_session.send_packet(demand)
1249         transmit_time = (
1250             0.9
1251             * max(self.vpp_session.required_min_rx, self.test_session.desired_min_tx)
1252             / USEC_IN_SEC
1253         )
1254         count = 0
1255         for dummy in range(self.test_session.detect_mult * 2):
1256             self.sleep(transmit_time)
1257             self.test_session.send_packet(demand)
1258             try:
1259                 p = wait_for_bfd_packet(self, timeout=0)
1260                 self.logger.error(ppp("Received unexpected packet:", p))
1261                 count += 1
1262             except CaptureTimeoutError:
1263                 pass
1264         events = self.vapi.collect_events()
1265         for e in events:
1266             self.logger.error("Received unexpected event: %s", e)
1267         self.assert_equal(count, 0, "number of packets received")
1268         self.assert_equal(len(events), 0, "number of events received")
1269
1270     def test_echo_looped_back(self):
1271         """echo packets looped back"""
1272         bfd_session_up(self)
1273         stats_before = bfd_grab_stats_snapshot(self)
1274         self.pg0.enable_capture()
1275         echo_packet_count = 10
1276         # random source port low enough to increment a few times..
1277         udp_sport_tx = randint(1, 50000)
1278         udp_sport_rx = udp_sport_tx
1279         echo_packet = (
1280             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1281             / IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4)
1282             / UDP(dport=BFD.udp_dport_echo)
1283             / Raw("this should be looped back")
1284         )
1285         for dummy in range(echo_packet_count):
1286             self.sleep(0.01, "delay between echo packets")
1287             echo_packet[UDP].sport = udp_sport_tx
1288             udp_sport_tx += 1
1289             self.logger.debug(ppp("Sending packet:", echo_packet))
1290             self.pg0.add_stream(echo_packet)
1291             self.pg_start()
1292             self.logger.debug(self.vapi.ppcli("show trace"))
1293         counter = 0
1294         bfd_control_packets_rx = 0
1295         while counter < echo_packet_count:
1296             p = self.pg0.wait_for_packet(1)
1297             self.logger.debug(ppp("Got packet:", p))
1298             ether = p[Ether]
1299             self.assert_equal(self.pg0.remote_mac, ether.dst, "Destination MAC")
1300             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1301             ip = p[IP]
1302             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1303             udp = p[UDP]
1304             if udp.dport == BFD.udp_dport:
1305                 bfd_control_packets_rx += 1
1306                 continue
1307             self.assert_equal(self.pg0.remote_ip4, ip.src, "Source IP")
1308             self.assert_equal(udp.dport, BFD.udp_dport_echo, "UDP destination port")
1309             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1310             udp_sport_rx += 1
1311             # need to compare the hex payload here, otherwise BFD_vpp_echo
1312             # gets in way
1313             self.assertEqual(
1314                 scapy.compat.raw(p[UDP].payload),
1315                 scapy.compat.raw(echo_packet[UDP].payload),
1316                 "Received packet is not the echo packet sent",
1317             )
1318             counter += 1
1319         self.assert_equal(
1320             udp_sport_tx,
1321             udp_sport_rx,
1322             "UDP source port (== ECHO packet identifier for test purposes)",
1323         )
1324         stats_after = bfd_grab_stats_snapshot(self)
1325         diff = bfd_stats_diff(stats_before, stats_after)
1326         self.assertEqual(0, diff.rx, "RX counter bumped but no BFD packets sent")
1327         self.assertEqual(bfd_control_packets_rx, diff.tx, "TX counter incorrect")
1328         self.assertEqual(
1329             0, diff.rx_echo, "RX echo counter bumped but no BFD session exists"
1330         )
1331         self.assertEqual(
1332             0, diff.tx_echo, "TX echo counter bumped but no BFD session exists"
1333         )
1334
1335     def test_echo(self):
1336         """echo function"""
1337         stats_before = bfd_grab_stats_snapshot(self)
1338         bfd_session_up(self)
1339         self.test_session.update(required_min_echo_rx=150000)
1340         self.test_session.send_packet()
1341         detection_time = (
1342             self.test_session.detect_mult
1343             * self.vpp_session.required_min_rx
1344             / USEC_IN_SEC
1345         )
1346         # echo shouldn't work without echo source set
1347         for dummy in range(10):
1348             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1349             self.sleep(sleep, "delay before sending bfd packet")
1350             self.test_session.send_packet()
1351         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1352         self.assert_equal(
1353             p[BFD].required_min_rx_interval,
1354             self.vpp_session.required_min_rx,
1355             "BFD required min rx interval",
1356         )
1357         self.test_session.send_packet()
1358         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1359         echo_seen = False
1360         # should be turned on - loopback echo packets
1361         for dummy in range(3):
1362             loop_until = time.time() + 0.75 * detection_time
1363             while time.time() < loop_until:
1364                 p = self.pg0.wait_for_packet(1)
1365                 self.logger.debug(ppp("Got packet:", p))
1366                 if p[UDP].dport == BFD.udp_dport_echo:
1367                     self.assert_equal(p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1368                     self.assertNotEqual(
1369                         p[IP].src,
1370                         self.loopback0.local_ip4,
1371                         "BFD ECHO src IP equal to loopback IP",
1372                     )
1373                     self.logger.debug(ppp("Looping back packet:", p))
1374                     self.assert_equal(
1375                         p[Ether].dst,
1376                         self.pg0.remote_mac,
1377                         "ECHO packet destination MAC address",
1378                     )
1379                     p[Ether].dst = self.pg0.local_mac
1380                     self.pg0.add_stream(p)
1381                     self.test_session.rx_packets_echo += 1
1382                     self.test_session.tx_packets_echo += 1
1383                     self.pg_start()
1384                     echo_seen = True
1385                 elif p.haslayer(BFD):
1386                     self.test_session.rx_packets += 1
1387                     if echo_seen:
1388                         self.assertGreaterEqual(
1389                             p[BFD].required_min_rx_interval, 1000000
1390                         )
1391                     if "P" in p.sprintf("%BFD.flags%"):
1392                         final = self.test_session.create_packet()
1393                         final[BFD].flags = "F"
1394                         self.test_session.send_packet(final)
1395                 else:
1396                     raise Exception(ppp("Received unknown packet:", p))
1397
1398                 self.assert_equal(
1399                     len(self.vapi.collect_events()), 0, "number of bfd events"
1400                 )
1401             self.test_session.send_packet()
1402         self.assertTrue(echo_seen, "No echo packets received")
1403
1404         stats_after = bfd_grab_stats_snapshot(self)
1405         diff = bfd_stats_diff(stats_before, stats_after)
1406         # our rx is vpp tx and vice versa, also tolerate one packet off
1407         self.assert_in_range(
1408             self.test_session.tx_packets, diff.rx - 1, diff.rx + 1, "RX counter"
1409         )
1410         self.assert_in_range(
1411             self.test_session.rx_packets, diff.tx - 1, diff.tx + 1, "TX counter"
1412         )
1413         self.assert_in_range(
1414             self.test_session.tx_packets_echo,
1415             diff.rx_echo - 1,
1416             diff.rx_echo + 1,
1417             "RX echo counter",
1418         )
1419         self.assert_in_range(
1420             self.test_session.rx_packets_echo,
1421             diff.tx_echo - 1,
1422             diff.tx_echo + 1,
1423             "TX echo counter",
1424         )
1425
1426     def test_echo_fail(self):
1427         """session goes down if echo function fails"""
1428         bfd_session_up(self)
1429         self.test_session.update(required_min_echo_rx=150000)
1430         self.test_session.send_packet()
1431         detection_time = (
1432             self.test_session.detect_mult
1433             * self.vpp_session.required_min_rx
1434             / USEC_IN_SEC
1435         )
1436         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1437         # echo function should be used now, but we will drop the echo packets
1438         verified_diag = False
1439         for dummy in range(3):
1440             loop_until = time.time() + 0.75 * detection_time
1441             while time.time() < loop_until:
1442                 p = self.pg0.wait_for_packet(1)
1443                 self.logger.debug(ppp("Got packet:", p))
1444                 if p[UDP].dport == BFD.udp_dport_echo:
1445                     # dropped
1446                     pass
1447                 elif p.haslayer(BFD):
1448                     if "P" in p.sprintf("%BFD.flags%"):
1449                         self.assertGreaterEqual(
1450                             p[BFD].required_min_rx_interval, 1000000
1451                         )
1452                         final = self.test_session.create_packet()
1453                         final[BFD].flags = "F"
1454                         self.test_session.send_packet(final)
1455                     if p[BFD].state == BFDState.down:
1456                         self.assert_equal(
1457                             p[BFD].diag, BFDDiagCode.echo_function_failed, BFDDiagCode
1458                         )
1459                         verified_diag = True
1460                 else:
1461                     raise Exception(ppp("Received unknown packet:", p))
1462             self.test_session.send_packet()
1463         events = self.vapi.collect_events()
1464         self.assert_equal(len(events), 1, "number of bfd events")
1465         self.assert_equal(events[0].state, BFDState.down, BFDState)
1466         self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1467
1468     def test_echo_stop(self):
1469         """echo function stops if peer sets required min echo rx zero"""
1470         bfd_session_up(self)
1471         self.test_session.update(required_min_echo_rx=150000)
1472         self.test_session.send_packet()
1473         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1474         # wait for first echo packet
1475         while True:
1476             p = self.pg0.wait_for_packet(1)
1477             self.logger.debug(ppp("Got packet:", p))
1478             if p[UDP].dport == BFD.udp_dport_echo:
1479                 self.logger.debug(ppp("Looping back packet:", p))
1480                 p[Ether].dst = self.pg0.local_mac
1481                 self.pg0.add_stream(p)
1482                 self.pg_start()
1483                 break
1484             elif p.haslayer(BFD):
1485                 # ignore BFD
1486                 pass
1487             else:
1488                 raise Exception(ppp("Received unknown packet:", p))
1489         self.test_session.update(required_min_echo_rx=0)
1490         self.test_session.send_packet()
1491         # echo packets shouldn't arrive anymore
1492         for dummy in range(5):
1493             wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1494             self.test_session.send_packet()
1495             events = self.vapi.collect_events()
1496             self.assert_equal(len(events), 0, "number of bfd events")
1497
1498     def test_echo_source_removed(self):
1499         """echo function stops if echo source is removed"""
1500         bfd_session_up(self)
1501         self.test_session.update(required_min_echo_rx=150000)
1502         self.test_session.send_packet()
1503         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1504         # wait for first echo packet
1505         while True:
1506             p = self.pg0.wait_for_packet(1)
1507             self.logger.debug(ppp("Got packet:", p))
1508             if p[UDP].dport == BFD.udp_dport_echo:
1509                 self.logger.debug(ppp("Looping back packet:", p))
1510                 p[Ether].dst = self.pg0.local_mac
1511                 self.pg0.add_stream(p)
1512                 self.pg_start()
1513                 break
1514             elif p.haslayer(BFD):
1515                 # ignore BFD
1516                 pass
1517             else:
1518                 raise Exception(ppp("Received unknown packet:", p))
1519         self.vapi.bfd_udp_del_echo_source()
1520         self.test_session.send_packet()
1521         # echo packets shouldn't arrive anymore
1522         for dummy in range(5):
1523             wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1524             self.test_session.send_packet()
1525             events = self.vapi.collect_events()
1526             self.assert_equal(len(events), 0, "number of bfd events")
1527
1528     def test_stale_echo(self):
1529         """stale echo packets don't keep a session up"""
1530         bfd_session_up(self)
1531         self.test_session.update(required_min_echo_rx=150000)
1532         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1533         self.test_session.send_packet()
1534         # should be turned on - loopback echo packets
1535         echo_packet = None
1536         timeout_at = None
1537         timeout_ok = False
1538         for dummy in range(10 * self.vpp_session.detect_mult):
1539             p = self.pg0.wait_for_packet(1)
1540             if p[UDP].dport == BFD.udp_dport_echo:
1541                 if echo_packet is None:
1542                     self.logger.debug(ppp("Got first echo packet:", p))
1543                     echo_packet = p
1544                     timeout_at = (
1545                         time.time()
1546                         + self.vpp_session.detect_mult
1547                         * self.test_session.required_min_echo_rx
1548                         / USEC_IN_SEC
1549                     )
1550                 else:
1551                     self.logger.debug(ppp("Got followup echo packet:", p))
1552                 self.logger.debug(ppp("Looping back first echo packet:", p))
1553                 echo_packet[Ether].dst = self.pg0.local_mac
1554                 self.pg0.add_stream(echo_packet)
1555                 self.pg_start()
1556             elif p.haslayer(BFD):
1557                 self.logger.debug(ppp("Got packet:", p))
1558                 if "P" in p.sprintf("%BFD.flags%"):
1559                     final = self.test_session.create_packet()
1560                     final[BFD].flags = "F"
1561                     self.test_session.send_packet(final)
1562                 if p[BFD].state == BFDState.down:
1563                     self.assertIsNotNone(
1564                         timeout_at,
1565                         "Session went down before first echo packet received",
1566                     )
1567                     now = time.time()
1568                     self.assertGreaterEqual(
1569                         now,
1570                         timeout_at,
1571                         "Session timeout at %s, but is expected at %s"
1572                         % (now, timeout_at),
1573                     )
1574                     self.assert_equal(
1575                         p[BFD].diag, BFDDiagCode.echo_function_failed, BFDDiagCode
1576                     )
1577                     events = self.vapi.collect_events()
1578                     self.assert_equal(len(events), 1, "number of bfd events")
1579                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1580                     timeout_ok = True
1581                     break
1582             else:
1583                 raise Exception(ppp("Received unknown packet:", p))
1584             self.test_session.send_packet()
1585         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1586
1587     def test_invalid_echo_checksum(self):
1588         """echo packets with invalid checksum don't keep a session up"""
1589         bfd_session_up(self)
1590         self.test_session.update(required_min_echo_rx=150000)
1591         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1592         self.test_session.send_packet()
1593         # should be turned on - loopback echo packets
1594         timeout_at = None
1595         timeout_ok = False
1596         for dummy in range(10 * self.vpp_session.detect_mult):
1597             p = self.pg0.wait_for_packet(1)
1598             if p[UDP].dport == BFD.udp_dport_echo:
1599                 self.logger.debug(ppp("Got echo packet:", p))
1600                 if timeout_at is None:
1601                     timeout_at = (
1602                         time.time()
1603                         + self.vpp_session.detect_mult
1604                         * self.test_session.required_min_echo_rx
1605                         / USEC_IN_SEC
1606                     )
1607                 p[BFD_vpp_echo].checksum = getrandbits(64)
1608                 p[Ether].dst = self.pg0.local_mac
1609                 self.logger.debug(ppp("Looping back modified echo packet:", p))
1610                 self.pg0.add_stream(p)
1611                 self.pg_start()
1612             elif p.haslayer(BFD):
1613                 self.logger.debug(ppp("Got packet:", p))
1614                 if "P" in p.sprintf("%BFD.flags%"):
1615                     final = self.test_session.create_packet()
1616                     final[BFD].flags = "F"
1617                     self.test_session.send_packet(final)
1618                 if p[BFD].state == BFDState.down:
1619                     self.assertIsNotNone(
1620                         timeout_at,
1621                         "Session went down before first echo packet received",
1622                     )
1623                     now = time.time()
1624                     self.assertGreaterEqual(
1625                         now,
1626                         timeout_at,
1627                         "Session timeout at %s, but is expected at %s"
1628                         % (now, timeout_at),
1629                     )
1630                     self.assert_equal(
1631                         p[BFD].diag, BFDDiagCode.echo_function_failed, BFDDiagCode
1632                     )
1633                     events = self.vapi.collect_events()
1634                     self.assert_equal(len(events), 1, "number of bfd events")
1635                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1636                     timeout_ok = True
1637                     break
1638             else:
1639                 raise Exception(ppp("Received unknown packet:", p))
1640             self.test_session.send_packet()
1641         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1642
1643     def test_admin_up_down(self):
1644         """put session admin-up and admin-down"""
1645         bfd_session_up(self)
1646         self.vpp_session.admin_down()
1647         self.pg0.enable_capture()
1648         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1649         verify_event(self, e, expected_state=BFDState.admin_down)
1650         for dummy in range(2):
1651             p = wait_for_bfd_packet(self)
1652             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1653         # try to bring session up - shouldn't be possible
1654         self.test_session.update(state=BFDState.init)
1655         self.test_session.send_packet()
1656         for dummy in range(2):
1657             p = wait_for_bfd_packet(self)
1658             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1659         self.vpp_session.admin_up()
1660         self.test_session.update(state=BFDState.down)
1661         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1662         verify_event(self, e, expected_state=BFDState.down)
1663         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1664         self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1665         self.test_session.send_packet()
1666         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1667         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1668         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1669         verify_event(self, e, expected_state=BFDState.init)
1670         self.test_session.update(state=BFDState.up)
1671         self.test_session.send_packet()
1672         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1673         self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1674         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1675         verify_event(self, e, expected_state=BFDState.up)
1676
1677     def test_config_change_remote_demand(self):
1678         """configuration change while peer in demand mode"""
1679         bfd_session_up(self)
1680         demand = self.test_session.create_packet()
1681         demand[BFD].flags = "D"
1682         self.test_session.send_packet(demand)
1683         self.vpp_session.modify_parameters(
1684             required_min_rx=2 * self.vpp_session.required_min_rx
1685         )
1686         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1687         # poll bit must be set
1688         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1689         # terminate poll sequence
1690         final = self.test_session.create_packet()
1691         final[BFD].flags = "D+F"
1692         self.test_session.send_packet(final)
1693         # vpp should be quiet now again
1694         transmit_time = (
1695             0.9
1696             * max(self.vpp_session.required_min_rx, self.test_session.desired_min_tx)
1697             / USEC_IN_SEC
1698         )
1699         count = 0
1700         for dummy in range(self.test_session.detect_mult * 2):
1701             self.sleep(transmit_time)
1702             self.test_session.send_packet(demand)
1703             try:
1704                 p = wait_for_bfd_packet(self, timeout=0)
1705                 self.logger.error(ppp("Received unexpected packet:", p))
1706                 count += 1
1707             except CaptureTimeoutError:
1708                 pass
1709         events = self.vapi.collect_events()
1710         for e in events:
1711             self.logger.error("Received unexpected event: %s", e)
1712         self.assert_equal(count, 0, "number of packets received")
1713         self.assert_equal(len(events), 0, "number of events received")
1714
1715     def test_intf_deleted(self):
1716         """interface with bfd session deleted"""
1717         intf = VppLoInterface(self)
1718         intf.config_ip4()
1719         intf.admin_up()
1720         sw_if_index = intf.sw_if_index
1721         vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1722         vpp_session.add_vpp_config()
1723         vpp_session.admin_up()
1724         intf.remove_vpp_config()
1725         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1726         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1727         self.assertFalse(vpp_session.query_vpp_config())
1728
1729
1730 @tag_run_solo
1731 @tag_fixme_vpp_workers
1732 @tag_fixme_ubuntu2204
1733 class BFD6TestCase(VppTestCase):
1734     """Bidirectional Forwarding Detection (BFD) (IPv6)"""
1735
1736     pg0 = None
1737     vpp_clock_offset = None
1738     vpp_session = None
1739     test_session = None
1740
1741     @classmethod
1742     def setUpClass(cls):
1743         if is_distro_ubuntu2204 == True and not hasattr(cls, "vpp"):
1744             return
1745         super(BFD6TestCase, cls).setUpClass()
1746         cls.vapi.cli("set log class bfd level debug")
1747         try:
1748             cls.create_pg_interfaces([0])
1749             cls.pg0.config_ip6()
1750             cls.pg0.configure_ipv6_neighbors()
1751             cls.pg0.admin_up()
1752             cls.pg0.resolve_ndp()
1753             cls.create_loopback_interfaces(1)
1754             cls.loopback0 = cls.lo_interfaces[0]
1755             cls.loopback0.config_ip6()
1756             cls.loopback0.admin_up()
1757
1758         except Exception:
1759             super(BFD6TestCase, cls).tearDownClass()
1760             raise
1761
1762     @classmethod
1763     def tearDownClass(cls):
1764         super(BFD6TestCase, cls).tearDownClass()
1765
1766     def setUp(self):
1767         super(BFD6TestCase, self).setUp()
1768         if is_distro_ubuntu2204 == True and not hasattr(self, "vpp"):
1769             return
1770         self.factory = AuthKeyFactory()
1771         self.vapi.want_bfd_events()
1772         self.pg0.enable_capture()
1773         try:
1774             self.bfd_udp4_sessions = self.statistics["/bfd/udp4/sessions"]
1775             self.bfd_udp6_sessions = self.statistics["/bfd/udp6/sessions"]
1776             self.vpp_session = VppBFDUDPSession(
1777                 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6
1778             )
1779             self.vpp_session.add_vpp_config()
1780             self.vpp_session.admin_up()
1781             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1782             self.logger.debug(self.vapi.cli("show adj nbr"))
1783         except BaseException:
1784             self.vapi.want_bfd_events(enable_disable=0)
1785             raise
1786
1787     def tearDown(self):
1788         if not self.vpp_dead:
1789             self.vapi.want_bfd_events(enable_disable=0)
1790         self.vapi.collect_events()  # clear the event queue
1791         super(BFD6TestCase, self).tearDown()
1792
1793     def test_session_up(self):
1794         """bring BFD session up"""
1795         bfd_session_up(self)
1796         bfd_udp4_sessions = self.statistics["/bfd/udp4/sessions"]
1797         bfd_udp6_sessions = self.statistics["/bfd/udp6/sessions"]
1798         self.assert_equal(bfd_udp4_sessions, self.bfd_udp4_sessions)
1799         self.assert_equal(bfd_udp6_sessions - self.bfd_udp6_sessions, 1)
1800
1801     def test_session_up_by_ip(self):
1802         """bring BFD session up - first frame looked up by address pair"""
1803         self.logger.info("BFD: Sending Slow control frame")
1804         self.test_session.update(my_discriminator=randint(0, 40000000))
1805         self.test_session.send_packet()
1806         self.pg0.enable_capture()
1807         p = self.pg0.wait_for_packet(1)
1808         self.assert_equal(
1809             p[BFD].your_discriminator,
1810             self.test_session.my_discriminator,
1811             "BFD - your discriminator",
1812         )
1813         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1814         self.test_session.update(
1815             your_discriminator=p[BFD].my_discriminator, state=BFDState.up
1816         )
1817         self.logger.info("BFD: Waiting for event")
1818         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1819         verify_event(self, e, expected_state=BFDState.init)
1820         self.logger.info("BFD: Sending Up")
1821         self.test_session.send_packet()
1822         self.logger.info("BFD: Waiting for event")
1823         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1824         verify_event(self, e, expected_state=BFDState.up)
1825         self.logger.info("BFD: Session is Up")
1826         self.test_session.update(state=BFDState.up)
1827         self.test_session.send_packet()
1828         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1829
1830     def test_hold_up(self):
1831         """hold BFD session up"""
1832         bfd_session_up(self)
1833         for dummy in range(self.test_session.detect_mult * 2):
1834             wait_for_bfd_packet(self)
1835             self.test_session.send_packet()
1836         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
1837         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1838
1839     def test_echo_looped_back(self):
1840         """echo packets looped back"""
1841         bfd_session_up(self)
1842         stats_before = bfd_grab_stats_snapshot(self)
1843         self.pg0.enable_capture()
1844         echo_packet_count = 10
1845         # random source port low enough to increment a few times..
1846         udp_sport_tx = randint(1, 50000)
1847         udp_sport_rx = udp_sport_tx
1848         echo_packet = (
1849             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1850             / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.remote_ip6)
1851             / UDP(dport=BFD.udp_dport_echo)
1852             / Raw("this should be looped back")
1853         )
1854         for dummy in range(echo_packet_count):
1855             self.sleep(0.01, "delay between echo packets")
1856             echo_packet[UDP].sport = udp_sport_tx
1857             udp_sport_tx += 1
1858             self.logger.debug(ppp("Sending packet:", echo_packet))
1859             self.pg0.add_stream(echo_packet)
1860             self.pg_start()
1861         counter = 0
1862         bfd_control_packets_rx = 0
1863         while counter < echo_packet_count:
1864             p = self.pg0.wait_for_packet(1)
1865             self.logger.debug(ppp("Got packet:", p))
1866             ether = p[Ether]
1867             self.assert_equal(self.pg0.remote_mac, ether.dst, "Destination MAC")
1868             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1869             ip = p[IPv6]
1870             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1871             udp = p[UDP]
1872             if udp.dport == BFD.udp_dport:
1873                 bfd_control_packets_rx += 1
1874                 continue
1875             self.assert_equal(self.pg0.remote_ip6, ip.src, "Source IP")
1876             self.assert_equal(udp.dport, BFD.udp_dport_echo, "UDP destination port")
1877             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1878             udp_sport_rx += 1
1879             # need to compare the hex payload here, otherwise BFD_vpp_echo
1880             # gets in way
1881             self.assertEqual(
1882                 scapy.compat.raw(p[UDP].payload),
1883                 scapy.compat.raw(echo_packet[UDP].payload),
1884                 "Received packet is not the echo packet sent",
1885             )
1886             counter += 1
1887         self.assert_equal(
1888             udp_sport_tx,
1889             udp_sport_rx,
1890             "UDP source port (== ECHO packet identifier for test purposes)",
1891         )
1892         stats_after = bfd_grab_stats_snapshot(self)
1893         diff = bfd_stats_diff(stats_before, stats_after)
1894         self.assertEqual(0, diff.rx, "RX counter bumped but no BFD packets sent")
1895         self.assertEqual(bfd_control_packets_rx, diff.tx, "TX counter incorrect")
1896         self.assertEqual(
1897             0, diff.rx_echo, "RX echo counter bumped but no BFD session exists"
1898         )
1899         self.assertEqual(
1900             0, diff.tx_echo, "TX echo counter bumped but no BFD session exists"
1901         )
1902
1903     def test_echo(self):
1904         """echo function"""
1905         stats_before = bfd_grab_stats_snapshot(self)
1906         bfd_session_up(self)
1907         self.test_session.update(required_min_echo_rx=150000)
1908         self.test_session.send_packet()
1909         detection_time = (
1910             self.test_session.detect_mult
1911             * self.vpp_session.required_min_rx
1912             / USEC_IN_SEC
1913         )
1914         # echo shouldn't work without echo source set
1915         for dummy in range(10):
1916             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1917             self.sleep(sleep, "delay before sending bfd packet")
1918             self.test_session.send_packet()
1919         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1920         self.assert_equal(
1921             p[BFD].required_min_rx_interval,
1922             self.vpp_session.required_min_rx,
1923             "BFD required min rx interval",
1924         )
1925         self.test_session.send_packet()
1926         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1927         echo_seen = False
1928         # should be turned on - loopback echo packets
1929         for dummy in range(3):
1930             loop_until = time.time() + 0.75 * detection_time
1931             while time.time() < loop_until:
1932                 p = self.pg0.wait_for_packet(1)
1933                 self.logger.debug(ppp("Got packet:", p))
1934                 if p[UDP].dport == BFD.udp_dport_echo:
1935                     self.assert_equal(
1936                         p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP"
1937                     )
1938                     self.assertNotEqual(
1939                         p[IPv6].src,
1940                         self.loopback0.local_ip6,
1941                         "BFD ECHO src IP equal to loopback IP",
1942                     )
1943                     self.logger.debug(ppp("Looping back packet:", p))
1944                     self.assert_equal(
1945                         p[Ether].dst,
1946                         self.pg0.remote_mac,
1947                         "ECHO packet destination MAC address",
1948                     )
1949                     self.test_session.rx_packets_echo += 1
1950                     self.test_session.tx_packets_echo += 1
1951                     p[Ether].dst = self.pg0.local_mac
1952                     self.pg0.add_stream(p)
1953                     self.pg_start()
1954                     echo_seen = True
1955                 elif p.haslayer(BFD):
1956                     self.test_session.rx_packets += 1
1957                     if echo_seen:
1958                         self.assertGreaterEqual(
1959                             p[BFD].required_min_rx_interval, 1000000
1960                         )
1961                     if "P" in p.sprintf("%BFD.flags%"):
1962                         final = self.test_session.create_packet()
1963                         final[BFD].flags = "F"
1964                         self.test_session.send_packet(final)
1965                 else:
1966                     raise Exception(ppp("Received unknown packet:", p))
1967
1968                 self.assert_equal(
1969                     len(self.vapi.collect_events()), 0, "number of bfd events"
1970                 )
1971             self.test_session.send_packet()
1972         self.assertTrue(echo_seen, "No echo packets received")
1973
1974         stats_after = bfd_grab_stats_snapshot(self)
1975         diff = bfd_stats_diff(stats_before, stats_after)
1976         # our rx is vpp tx and vice versa, also tolerate one packet off
1977         self.assert_in_range(
1978             self.test_session.tx_packets, diff.rx - 1, diff.rx + 1, "RX counter"
1979         )
1980         self.assert_in_range(
1981             self.test_session.rx_packets, diff.tx - 1, diff.tx + 1, "TX counter"
1982         )
1983         self.assert_in_range(
1984             self.test_session.tx_packets_echo,
1985             diff.rx_echo - 1,
1986             diff.rx_echo + 1,
1987             "RX echo counter",
1988         )
1989         self.assert_in_range(
1990             self.test_session.rx_packets_echo,
1991             diff.tx_echo - 1,
1992             diff.tx_echo + 1,
1993             "TX echo counter",
1994         )
1995
1996     def test_intf_deleted(self):
1997         """interface with bfd session deleted"""
1998         intf = VppLoInterface(self)
1999         intf.config_ip6()
2000         intf.admin_up()
2001         sw_if_index = intf.sw_if_index
2002         vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip6, af=AF_INET6)
2003         vpp_session.add_vpp_config()
2004         vpp_session.admin_up()
2005         intf.remove_vpp_config()
2006         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
2007         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
2008         self.assertFalse(vpp_session.query_vpp_config())
2009
2010
2011 @tag_run_solo
2012 class BFDFIBTestCase(VppTestCase):
2013     """BFD-FIB interactions (IPv6)"""
2014
2015     vpp_session = None
2016     test_session = None
2017
2018     @classmethod
2019     def setUpClass(cls):
2020         super(BFDFIBTestCase, cls).setUpClass()
2021
2022     @classmethod
2023     def tearDownClass(cls):
2024         super(BFDFIBTestCase, cls).tearDownClass()
2025
2026     def setUp(self):
2027         super(BFDFIBTestCase, self).setUp()
2028         self.create_pg_interfaces(range(1))
2029
2030         self.vapi.want_bfd_events()
2031         self.pg0.enable_capture()
2032
2033         for i in self.pg_interfaces:
2034             i.admin_up()
2035             i.config_ip6()
2036             i.configure_ipv6_neighbors()
2037
2038     def tearDown(self):
2039         if not self.vpp_dead:
2040             self.vapi.want_bfd_events(enable_disable=False)
2041
2042         super(BFDFIBTestCase, self).tearDown()
2043
2044     @staticmethod
2045     def pkt_is_not_data_traffic(p):
2046         """not data traffic implies BFD or the usual IPv6 ND/RA"""
2047         if p.haslayer(BFD) or is_ipv6_misc(p):
2048             return True
2049         return False
2050
2051     def test_session_with_fib(self):
2052         """BFD-FIB interactions"""
2053
2054         # packets to match against both of the routes
2055         p = [
2056             (
2057                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2058                 / IPv6(src="3001::1", dst="2001::1")
2059                 / UDP(sport=1234, dport=1234)
2060                 / Raw(b"\xa5" * 100)
2061             ),
2062             (
2063                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2064                 / IPv6(src="3001::1", dst="2002::1")
2065                 / UDP(sport=1234, dport=1234)
2066                 / Raw(b"\xa5" * 100)
2067             ),
2068         ]
2069
2070         # A recursive and a non-recursive route via a next-hop that
2071         # will have a BFD session
2072         ip_2001_s_64 = VppIpRoute(
2073             self,
2074             "2001::",
2075             64,
2076             [VppRoutePath(self.pg0.remote_ip6, self.pg0.sw_if_index)],
2077         )
2078         ip_2002_s_64 = VppIpRoute(
2079             self, "2002::", 64, [VppRoutePath(self.pg0.remote_ip6, 0xFFFFFFFF)]
2080         )
2081         ip_2001_s_64.add_vpp_config()
2082         ip_2002_s_64.add_vpp_config()
2083
2084         # bring the session up now the routes are present
2085         self.vpp_session = VppBFDUDPSession(
2086             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6
2087         )
2088         self.vpp_session.add_vpp_config()
2089         self.vpp_session.admin_up()
2090         self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
2091
2092         # session is up - traffic passes
2093         bfd_session_up(self)
2094
2095         self.pg0.add_stream(p)
2096         self.pg_start()
2097         for packet in p:
2098             captured = self.pg0.wait_for_packet(
2099                 1, filter_out_fn=self.pkt_is_not_data_traffic
2100             )
2101             self.assertEqual(captured[IPv6].dst, packet[IPv6].dst)
2102
2103         # session is up - traffic is dropped
2104         bfd_session_down(self)
2105
2106         self.pg0.add_stream(p)
2107         self.pg_start()
2108         with self.assertRaises(CaptureTimeoutError):
2109             self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
2110
2111         # session is up - traffic passes
2112         bfd_session_up(self)
2113
2114         self.pg0.add_stream(p)
2115         self.pg_start()
2116         for packet in p:
2117             captured = self.pg0.wait_for_packet(
2118                 1, filter_out_fn=self.pkt_is_not_data_traffic
2119             )
2120             self.assertEqual(captured[IPv6].dst, packet[IPv6].dst)
2121
2122
2123 @unittest.skipUnless(config.extended, "part of extended tests")
2124 class BFDTunTestCase(VppTestCase):
2125     """BFD over GRE tunnel"""
2126
2127     vpp_session = None
2128     test_session = None
2129
2130     @classmethod
2131     def setUpClass(cls):
2132         super(BFDTunTestCase, cls).setUpClass()
2133
2134     @classmethod
2135     def tearDownClass(cls):
2136         super(BFDTunTestCase, cls).tearDownClass()
2137
2138     def setUp(self):
2139         super(BFDTunTestCase, self).setUp()
2140         self.create_pg_interfaces(range(1))
2141
2142         self.vapi.want_bfd_events()
2143         self.pg0.enable_capture()
2144
2145         for i in self.pg_interfaces:
2146             i.admin_up()
2147             i.config_ip4()
2148             i.resolve_arp()
2149
2150     def tearDown(self):
2151         if not self.vpp_dead:
2152             self.vapi.want_bfd_events(enable_disable=0)
2153
2154         super(BFDTunTestCase, self).tearDown()
2155
2156     @staticmethod
2157     def pkt_is_not_data_traffic(p):
2158         """not data traffic implies BFD or the usual IPv6 ND/RA"""
2159         if p.haslayer(BFD) or is_ipv6_misc(p):
2160             return True
2161         return False
2162
2163     def test_bfd_o_gre(self):
2164         """BFD-o-GRE"""
2165
2166         # A GRE interface over which to run a BFD session
2167         gre_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
2168         gre_if.add_vpp_config()
2169         gre_if.admin_up()
2170         gre_if.config_ip4()
2171
2172         # bring the session up now the routes are present
2173         self.vpp_session = VppBFDUDPSession(
2174             self, gre_if, gre_if.remote_ip4, is_tunnel=True
2175         )
2176         self.vpp_session.add_vpp_config()
2177         self.vpp_session.admin_up()
2178
2179         self.test_session = BFDTestSession(
2180             self,
2181             gre_if,
2182             AF_INET,
2183             tunnel_header=(IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / GRE()),
2184             phy_interface=self.pg0,
2185         )
2186
2187         # packets to match against both of the routes
2188         p = [
2189             (
2190                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2191                 / IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4)
2192                 / UDP(sport=1234, dport=1234)
2193                 / Raw(b"\xa5" * 100)
2194             )
2195         ]
2196
2197         # session is up - traffic passes
2198         bfd_session_up(self)
2199
2200         self.send_and_expect(self.pg0, p, self.pg0)
2201
2202         # bring session down
2203         bfd_session_down(self)
2204
2205
2206 @tag_run_solo
2207 class BFDSHA1TestCase(VppTestCase):
2208     """Bidirectional Forwarding Detection (BFD) (SHA1 auth)"""
2209
2210     pg0 = None
2211     vpp_clock_offset = None
2212     vpp_session = None
2213     test_session = None
2214
2215     @classmethod
2216     def setUpClass(cls):
2217         super(BFDSHA1TestCase, cls).setUpClass()
2218         cls.vapi.cli("set log class bfd level debug")
2219         try:
2220             cls.create_pg_interfaces([0])
2221             cls.pg0.config_ip4()
2222             cls.pg0.admin_up()
2223             cls.pg0.resolve_arp()
2224
2225         except Exception:
2226             super(BFDSHA1TestCase, cls).tearDownClass()
2227             raise
2228
2229     @classmethod
2230     def tearDownClass(cls):
2231         super(BFDSHA1TestCase, cls).tearDownClass()
2232
2233     def setUp(self):
2234         super(BFDSHA1TestCase, self).setUp()
2235         self.factory = AuthKeyFactory()
2236         self.vapi.want_bfd_events()
2237         self.pg0.enable_capture()
2238
2239     def tearDown(self):
2240         if not self.vpp_dead:
2241             self.vapi.want_bfd_events(enable_disable=False)
2242         self.vapi.collect_events()  # clear the event queue
2243         super(BFDSHA1TestCase, self).tearDown()
2244
2245     def test_session_up(self):
2246         """bring BFD session up"""
2247         key = self.factory.create_random_key(self)
2248         key.add_vpp_config()
2249         self.vpp_session = VppBFDUDPSession(
2250             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2251         )
2252         self.vpp_session.add_vpp_config()
2253         self.vpp_session.admin_up()
2254         self.test_session = BFDTestSession(
2255             self,
2256             self.pg0,
2257             AF_INET,
2258             sha1_key=key,
2259             bfd_key_id=self.vpp_session.bfd_key_id,
2260         )
2261         bfd_session_up(self)
2262
2263     def test_hold_up(self):
2264         """hold BFD session up"""
2265         key = self.factory.create_random_key(self)
2266         key.add_vpp_config()
2267         self.vpp_session = VppBFDUDPSession(
2268             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2269         )
2270         self.vpp_session.add_vpp_config()
2271         self.vpp_session.admin_up()
2272         self.test_session = BFDTestSession(
2273             self,
2274             self.pg0,
2275             AF_INET,
2276             sha1_key=key,
2277             bfd_key_id=self.vpp_session.bfd_key_id,
2278         )
2279         bfd_session_up(self)
2280         for dummy in range(self.test_session.detect_mult * 2):
2281             wait_for_bfd_packet(self)
2282             self.test_session.send_packet()
2283         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2284
2285     def test_hold_up_meticulous(self):
2286         """hold BFD session up - meticulous auth"""
2287         key = self.factory.create_random_key(self, BFDAuthType.meticulous_keyed_sha1)
2288         key.add_vpp_config()
2289         self.vpp_session = VppBFDUDPSession(
2290             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2291         )
2292         self.vpp_session.add_vpp_config()
2293         self.vpp_session.admin_up()
2294         # specify sequence number so that it wraps
2295         self.test_session = BFDTestSession(
2296             self,
2297             self.pg0,
2298             AF_INET,
2299             sha1_key=key,
2300             bfd_key_id=self.vpp_session.bfd_key_id,
2301             our_seq_number=0xFFFFFFFF - 4,
2302         )
2303         bfd_session_up(self)
2304         for dummy in range(30):
2305             wait_for_bfd_packet(self)
2306             self.test_session.inc_seq_num()
2307             self.test_session.send_packet()
2308         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2309
2310     def test_send_bad_seq_number(self):
2311         """session is not kept alive by msgs with bad sequence numbers"""
2312         key = self.factory.create_random_key(self, BFDAuthType.meticulous_keyed_sha1)
2313         key.add_vpp_config()
2314         self.vpp_session = VppBFDUDPSession(
2315             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2316         )
2317         self.vpp_session.add_vpp_config()
2318         self.test_session = BFDTestSession(
2319             self,
2320             self.pg0,
2321             AF_INET,
2322             sha1_key=key,
2323             bfd_key_id=self.vpp_session.bfd_key_id,
2324         )
2325         bfd_session_up(self)
2326         detection_time = (
2327             self.test_session.detect_mult
2328             * self.vpp_session.required_min_rx
2329             / USEC_IN_SEC
2330         )
2331         send_until = time.time() + 2 * detection_time
2332         while time.time() < send_until:
2333             self.test_session.send_packet()
2334             self.sleep(
2335                 0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
2336                 "time between bfd packets",
2337             )
2338         e = self.vapi.collect_events()
2339         # session should be down now, because the sequence numbers weren't
2340         # updated
2341         self.assert_equal(len(e), 1, "number of bfd events")
2342         verify_event(self, e[0], expected_state=BFDState.down)
2343
2344     def execute_rogue_session_scenario(
2345         self,
2346         vpp_bfd_udp_session,
2347         legitimate_test_session,
2348         rogue_test_session,
2349         rogue_bfd_values=None,
2350     ):
2351         """execute a rogue session interaction scenario
2352
2353         1. create vpp session, add config
2354         2. bring the legitimate session up
2355         3. copy the bfd values from legitimate session to rogue session
2356         4. apply rogue_bfd_values to rogue session
2357         5. set rogue session state to down
2358         6. send message to take the session down from the rogue session
2359         7. assert that the legitimate session is unaffected
2360         """
2361
2362         self.vpp_session = vpp_bfd_udp_session
2363         self.vpp_session.add_vpp_config()
2364         self.test_session = legitimate_test_session
2365         # bring vpp session up
2366         bfd_session_up(self)
2367         # send packet from rogue session
2368         rogue_test_session.update(
2369             my_discriminator=self.test_session.my_discriminator,
2370             your_discriminator=self.test_session.your_discriminator,
2371             desired_min_tx=self.test_session.desired_min_tx,
2372             required_min_rx=self.test_session.required_min_rx,
2373             detect_mult=self.test_session.detect_mult,
2374             diag=self.test_session.diag,
2375             state=self.test_session.state,
2376             auth_type=self.test_session.auth_type,
2377         )
2378         if rogue_bfd_values:
2379             rogue_test_session.update(**rogue_bfd_values)
2380         rogue_test_session.update(state=BFDState.down)
2381         rogue_test_session.send_packet()
2382         wait_for_bfd_packet(self)
2383         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2384
2385     def test_mismatch_auth(self):
2386         """session is not brought down by unauthenticated msg"""
2387         key = self.factory.create_random_key(self)
2388         key.add_vpp_config()
2389         vpp_session = VppBFDUDPSession(
2390             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2391         )
2392         legitimate_test_session = BFDTestSession(
2393             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=vpp_session.bfd_key_id
2394         )
2395         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2396         self.execute_rogue_session_scenario(
2397             vpp_session, legitimate_test_session, rogue_test_session
2398         )
2399
2400     def test_mismatch_bfd_key_id(self):
2401         """session is not brought down by msg with non-existent key-id"""
2402         key = self.factory.create_random_key(self)
2403         key.add_vpp_config()
2404         vpp_session = VppBFDUDPSession(
2405             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2406         )
2407         # pick a different random bfd key id
2408         x = randint(0, 255)
2409         while x == vpp_session.bfd_key_id:
2410             x = randint(0, 255)
2411         legitimate_test_session = BFDTestSession(
2412             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=vpp_session.bfd_key_id
2413         )
2414         rogue_test_session = BFDTestSession(
2415             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x
2416         )
2417         self.execute_rogue_session_scenario(
2418             vpp_session, legitimate_test_session, rogue_test_session
2419         )
2420
2421     def test_mismatched_auth_type(self):
2422         """session is not brought down by msg with wrong auth type"""
2423         key = self.factory.create_random_key(self)
2424         key.add_vpp_config()
2425         vpp_session = VppBFDUDPSession(
2426             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2427         )
2428         legitimate_test_session = BFDTestSession(
2429             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=vpp_session.bfd_key_id
2430         )
2431         rogue_test_session = BFDTestSession(
2432             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=vpp_session.bfd_key_id
2433         )
2434         self.execute_rogue_session_scenario(
2435             vpp_session,
2436             legitimate_test_session,
2437             rogue_test_session,
2438             {"auth_type": BFDAuthType.keyed_md5},
2439         )
2440
2441     def test_restart(self):
2442         """simulate remote peer restart and resynchronization"""
2443         key = self.factory.create_random_key(self, BFDAuthType.meticulous_keyed_sha1)
2444         key.add_vpp_config()
2445         self.vpp_session = VppBFDUDPSession(
2446             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2447         )
2448         self.vpp_session.add_vpp_config()
2449         self.test_session = BFDTestSession(
2450             self,
2451             self.pg0,
2452             AF_INET,
2453             sha1_key=key,
2454             bfd_key_id=self.vpp_session.bfd_key_id,
2455             our_seq_number=0,
2456         )
2457         bfd_session_up(self)
2458         # don't send any packets for 2*detection_time
2459         detection_time = (
2460             self.test_session.detect_mult
2461             * self.vpp_session.required_min_rx
2462             / USEC_IN_SEC
2463         )
2464         self.sleep(2 * detection_time, "simulating peer restart")
2465         events = self.vapi.collect_events()
2466         self.assert_equal(len(events), 1, "number of bfd events")
2467         verify_event(self, events[0], expected_state=BFDState.down)
2468         self.test_session.update(state=BFDState.down)
2469         # reset sequence number
2470         self.test_session.our_seq_number = 0
2471         self.test_session.vpp_seq_number = None
2472         # now throw away any pending packets
2473         self.pg0.enable_capture()
2474         self.test_session.my_discriminator = 0
2475         bfd_session_up(self)
2476
2477
2478 @tag_run_solo
2479 class BFDAuthOnOffTestCase(VppTestCase):
2480     """Bidirectional Forwarding Detection (BFD) (changing auth)"""
2481
2482     pg0 = None
2483     vpp_session = None
2484     test_session = None
2485
2486     @classmethod
2487     def setUpClass(cls):
2488         super(BFDAuthOnOffTestCase, cls).setUpClass()
2489         cls.vapi.cli("set log class bfd level debug")
2490         try:
2491             cls.create_pg_interfaces([0])
2492             cls.pg0.config_ip4()
2493             cls.pg0.admin_up()
2494             cls.pg0.resolve_arp()
2495
2496         except Exception:
2497             super(BFDAuthOnOffTestCase, cls).tearDownClass()
2498             raise
2499
2500     @classmethod
2501     def tearDownClass(cls):
2502         super(BFDAuthOnOffTestCase, cls).tearDownClass()
2503
2504     def setUp(self):
2505         super(BFDAuthOnOffTestCase, self).setUp()
2506         self.factory = AuthKeyFactory()
2507         self.vapi.want_bfd_events()
2508         self.pg0.enable_capture()
2509
2510     def tearDown(self):
2511         if not self.vpp_dead:
2512             self.vapi.want_bfd_events(enable_disable=False)
2513         self.vapi.collect_events()  # clear the event queue
2514         super(BFDAuthOnOffTestCase, self).tearDown()
2515
2516     def test_auth_on_immediate(self):
2517         """turn auth on without disturbing session state (immediate)"""
2518         key = self.factory.create_random_key(self)
2519         key.add_vpp_config()
2520         self.vpp_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2521         self.vpp_session.add_vpp_config()
2522         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2523         bfd_session_up(self)
2524         for dummy in range(self.test_session.detect_mult * 2):
2525             p = wait_for_bfd_packet(self)
2526             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2527             self.test_session.send_packet()
2528         self.vpp_session.activate_auth(key)
2529         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2530         self.test_session.sha1_key = key
2531         for dummy in range(self.test_session.detect_mult * 2):
2532             p = wait_for_bfd_packet(self)
2533             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2534             self.test_session.send_packet()
2535         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2536         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
2537
2538     def test_auth_off_immediate(self):
2539         """turn auth off without disturbing session state (immediate)"""
2540         key = self.factory.create_random_key(self)
2541         key.add_vpp_config()
2542         self.vpp_session = VppBFDUDPSession(
2543             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2544         )
2545         self.vpp_session.add_vpp_config()
2546         self.test_session = BFDTestSession(
2547             self,
2548             self.pg0,
2549             AF_INET,
2550             sha1_key=key,
2551             bfd_key_id=self.vpp_session.bfd_key_id,
2552         )
2553         bfd_session_up(self)
2554         # self.vapi.want_bfd_events(enable_disable=0)
2555         for dummy in range(self.test_session.detect_mult * 2):
2556             p = wait_for_bfd_packet(self)
2557             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2558             self.test_session.inc_seq_num()
2559             self.test_session.send_packet()
2560         self.vpp_session.deactivate_auth()
2561         self.test_session.bfd_key_id = None
2562         self.test_session.sha1_key = None
2563         for dummy in range(self.test_session.detect_mult * 2):
2564             p = wait_for_bfd_packet(self)
2565             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2566             self.test_session.inc_seq_num()
2567             self.test_session.send_packet()
2568         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2569         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
2570
2571     def test_auth_change_key_immediate(self):
2572         """change auth key without disturbing session state (immediate)"""
2573         key1 = self.factory.create_random_key(self)
2574         key1.add_vpp_config()
2575         key2 = self.factory.create_random_key(self)
2576         key2.add_vpp_config()
2577         self.vpp_session = VppBFDUDPSession(
2578             self, self.pg0, self.pg0.remote_ip4, sha1_key=key1
2579         )
2580         self.vpp_session.add_vpp_config()
2581         self.test_session = BFDTestSession(
2582             self,
2583             self.pg0,
2584             AF_INET,
2585             sha1_key=key1,
2586             bfd_key_id=self.vpp_session.bfd_key_id,
2587         )
2588         bfd_session_up(self)
2589         for dummy in range(self.test_session.detect_mult * 2):
2590             p = wait_for_bfd_packet(self)
2591             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2592             self.test_session.send_packet()
2593         self.vpp_session.activate_auth(key2)
2594         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2595         self.test_session.sha1_key = key2
2596         for dummy in range(self.test_session.detect_mult * 2):
2597             p = wait_for_bfd_packet(self)
2598             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2599             self.test_session.send_packet()
2600         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2601         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
2602
2603     def test_auth_on_delayed(self):
2604         """turn auth on without disturbing session state (delayed)"""
2605         key = self.factory.create_random_key(self)
2606         key.add_vpp_config()
2607         self.vpp_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2608         self.vpp_session.add_vpp_config()
2609         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2610         bfd_session_up(self)
2611         for dummy in range(self.test_session.detect_mult * 2):
2612             wait_for_bfd_packet(self)
2613             self.test_session.send_packet()
2614         self.vpp_session.activate_auth(key, delayed=True)
2615         for dummy in range(self.test_session.detect_mult * 2):
2616             p = wait_for_bfd_packet(self)
2617             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2618             self.test_session.send_packet()
2619         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2620         self.test_session.sha1_key = key
2621         self.test_session.send_packet()
2622         for dummy in range(self.test_session.detect_mult * 2):
2623             p = wait_for_bfd_packet(self)
2624             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2625             self.test_session.send_packet()
2626         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2627         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
2628
2629     def test_auth_off_delayed(self):
2630         """turn auth off without disturbing session state (delayed)"""
2631         key = self.factory.create_random_key(self)
2632         key.add_vpp_config()
2633         self.vpp_session = VppBFDUDPSession(
2634             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2635         )
2636         self.vpp_session.add_vpp_config()
2637         self.test_session = BFDTestSession(
2638             self,
2639             self.pg0,
2640             AF_INET,
2641             sha1_key=key,
2642             bfd_key_id=self.vpp_session.bfd_key_id,
2643         )
2644         bfd_session_up(self)
2645         for dummy in range(self.test_session.detect_mult * 2):
2646             p = wait_for_bfd_packet(self)
2647             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2648             self.test_session.send_packet()
2649         self.vpp_session.deactivate_auth(delayed=True)
2650         for dummy in range(self.test_session.detect_mult * 2):
2651             p = wait_for_bfd_packet(self)
2652             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2653             self.test_session.send_packet()
2654         self.test_session.bfd_key_id = None
2655         self.test_session.sha1_key = None
2656         self.test_session.send_packet()
2657         for dummy in range(self.test_session.detect_mult * 2):
2658             p = wait_for_bfd_packet(self)
2659             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2660             self.test_session.send_packet()
2661         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2662         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
2663
2664     def test_auth_change_key_delayed(self):
2665         """change auth key without disturbing session state (delayed)"""
2666         key1 = self.factory.create_random_key(self)
2667         key1.add_vpp_config()
2668         key2 = self.factory.create_random_key(self)
2669         key2.add_vpp_config()
2670         self.vpp_session = VppBFDUDPSession(
2671             self, self.pg0, self.pg0.remote_ip4, sha1_key=key1
2672         )
2673         self.vpp_session.add_vpp_config()
2674         self.vpp_session.admin_up()
2675         self.test_session = BFDTestSession(
2676             self,
2677             self.pg0,
2678             AF_INET,
2679             sha1_key=key1,
2680             bfd_key_id=self.vpp_session.bfd_key_id,
2681         )
2682         bfd_session_up(self)
2683         for dummy in range(self.test_session.detect_mult * 2):
2684             p = wait_for_bfd_packet(self)
2685             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2686             self.test_session.send_packet()
2687         self.vpp_session.activate_auth(key2, delayed=True)
2688         for dummy in range(self.test_session.detect_mult * 2):
2689             p = wait_for_bfd_packet(self)
2690             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2691             self.test_session.send_packet()
2692         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2693         self.test_session.sha1_key = key2
2694         self.test_session.send_packet()
2695         for dummy in range(self.test_session.detect_mult * 2):
2696             p = wait_for_bfd_packet(self)
2697             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2698             self.test_session.send_packet()
2699         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2700         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
2701
2702
2703 @tag_run_solo
2704 class BFDCLITestCase(VppTestCase):
2705     """Bidirectional Forwarding Detection (BFD) (CLI)"""
2706
2707     pg0 = None
2708
2709     @classmethod
2710     def setUpClass(cls):
2711         super(BFDCLITestCase, cls).setUpClass()
2712         cls.vapi.cli("set log class bfd level debug")
2713         try:
2714             cls.create_pg_interfaces((0,))
2715             cls.pg0.config_ip4()
2716             cls.pg0.config_ip6()
2717             cls.pg0.resolve_arp()
2718             cls.pg0.resolve_ndp()
2719
2720         except Exception:
2721             super(BFDCLITestCase, cls).tearDownClass()
2722             raise
2723
2724     @classmethod
2725     def tearDownClass(cls):
2726         super(BFDCLITestCase, cls).tearDownClass()
2727
2728     def setUp(self):
2729         super(BFDCLITestCase, self).setUp()
2730         self.factory = AuthKeyFactory()
2731         self.pg0.enable_capture()
2732
2733     def tearDown(self):
2734         try:
2735             self.vapi.want_bfd_events(enable_disable=False)
2736         except UnexpectedApiReturnValueError:
2737             # some tests aren't subscribed, so this is not an issue
2738             pass
2739         self.vapi.collect_events()  # clear the event queue
2740         super(BFDCLITestCase, self).tearDown()
2741
2742     def cli_verify_no_response(self, cli):
2743         """execute a CLI, asserting that the response is empty"""
2744         self.assert_equal(self.vapi.cli(cli), "", "CLI command response")
2745
2746     def cli_verify_response(self, cli, expected):
2747         """execute a CLI, asserting that the response matches expectation"""
2748         try:
2749             reply = self.vapi.cli(cli)
2750         except CliFailedCommandError as cli_error:
2751             reply = str(cli_error)
2752         self.assert_equal(reply.strip(), expected, "CLI command response")
2753
2754     def test_show(self):
2755         """show commands"""
2756         k1 = self.factory.create_random_key(self)
2757         k1.add_vpp_config()
2758         k2 = self.factory.create_random_key(
2759             self, auth_type=BFDAuthType.meticulous_keyed_sha1
2760         )
2761         k2.add_vpp_config()
2762         s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2763         s1.add_vpp_config()
2764         s2 = VppBFDUDPSession(
2765             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=k2
2766         )
2767         s2.add_vpp_config()
2768         self.logger.info(self.vapi.ppcli("show bfd keys"))
2769         self.logger.info(self.vapi.ppcli("show bfd sessions"))
2770         self.logger.info(self.vapi.ppcli("show bfd"))
2771
2772     def test_set_del_sha1_key(self):
2773         """set/delete SHA1 auth key"""
2774         k = self.factory.create_random_key(self)
2775         self.registry.register(k, self.logger)
2776         self.cli_verify_no_response(
2777             "bfd key set conf-key-id %s type keyed-sha1 secret %s"
2778             % (
2779                 k.conf_key_id,
2780                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key),
2781             )
2782         )
2783         self.assertTrue(k.query_vpp_config())
2784         self.vpp_session = VppBFDUDPSession(
2785             self, self.pg0, self.pg0.remote_ip4, sha1_key=k
2786         )
2787         self.vpp_session.add_vpp_config()
2788         self.test_session = BFDTestSession(
2789             self, self.pg0, AF_INET, sha1_key=k, bfd_key_id=self.vpp_session.bfd_key_id
2790         )
2791         self.vapi.want_bfd_events()
2792         bfd_session_up(self)
2793         bfd_session_down(self)
2794         # try to replace the secret for the key - should fail because the key
2795         # is in-use
2796         k2 = self.factory.create_random_key(self)
2797         self.cli_verify_response(
2798             "bfd key set conf-key-id %s type keyed-sha1 secret %s"
2799             % (
2800                 k.conf_key_id,
2801                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key),
2802             ),
2803             "bfd key set: `bfd_auth_set_key' API call failed, "
2804             "rv=-103:BFD object in use",
2805         )
2806         # manipulating the session using old secret should still work
2807         bfd_session_up(self)
2808         bfd_session_down(self)
2809         self.vpp_session.remove_vpp_config()
2810         self.cli_verify_no_response("bfd key del conf-key-id %s" % k.conf_key_id)
2811         self.assertFalse(k.query_vpp_config())
2812
2813     def test_set_del_meticulous_sha1_key(self):
2814         """set/delete meticulous SHA1 auth key"""
2815         k = self.factory.create_random_key(
2816             self, auth_type=BFDAuthType.meticulous_keyed_sha1
2817         )
2818         self.registry.register(k, self.logger)
2819         self.cli_verify_no_response(
2820             "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s"
2821             % (
2822                 k.conf_key_id,
2823                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key),
2824             )
2825         )
2826         self.assertTrue(k.query_vpp_config())
2827         self.vpp_session = VppBFDUDPSession(
2828             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=k
2829         )
2830         self.vpp_session.add_vpp_config()
2831         self.vpp_session.admin_up()
2832         self.test_session = BFDTestSession(
2833             self, self.pg0, AF_INET6, sha1_key=k, bfd_key_id=self.vpp_session.bfd_key_id
2834         )
2835         self.vapi.want_bfd_events()
2836         bfd_session_up(self)
2837         bfd_session_down(self)
2838         # try to replace the secret for the key - should fail because the key
2839         # is in-use
2840         k2 = self.factory.create_random_key(self)
2841         self.cli_verify_response(
2842             "bfd key set conf-key-id %s type keyed-sha1 secret %s"
2843             % (
2844                 k.conf_key_id,
2845                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key),
2846             ),
2847             "bfd key set: `bfd_auth_set_key' API call failed, "
2848             "rv=-103:BFD object in use",
2849         )
2850         # manipulating the session using old secret should still work
2851         bfd_session_up(self)
2852         bfd_session_down(self)
2853         self.vpp_session.remove_vpp_config()
2854         self.cli_verify_no_response("bfd key del conf-key-id %s" % k.conf_key_id)
2855         self.assertFalse(k.query_vpp_config())
2856
2857     def test_add_mod_del_bfd_udp(self):
2858         """create/modify/delete IPv4 BFD UDP session"""
2859         vpp_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2860         self.registry.register(vpp_session, self.logger)
2861         cli_add_cmd = (
2862             "bfd udp session add interface %s local-addr %s "
2863             "peer-addr %s desired-min-tx %s required-min-rx %s "
2864             "detect-mult %s"
2865             % (
2866                 self.pg0.name,
2867                 self.pg0.local_ip4,
2868                 self.pg0.remote_ip4,
2869                 vpp_session.desired_min_tx,
2870                 vpp_session.required_min_rx,
2871                 vpp_session.detect_mult,
2872             )
2873         )
2874         self.cli_verify_no_response(cli_add_cmd)
2875         # 2nd add should fail
2876         self.cli_verify_response(
2877             cli_add_cmd,
2878             "bfd udp session add: `bfd_add_add_session' API call"
2879             " failed, rv=-101:Duplicate BFD object",
2880         )
2881         verify_bfd_session_config(self, vpp_session)
2882         mod_session = VppBFDUDPSession(
2883             self,
2884             self.pg0,
2885             self.pg0.remote_ip4,
2886             required_min_rx=2 * vpp_session.required_min_rx,
2887             desired_min_tx=3 * vpp_session.desired_min_tx,
2888             detect_mult=4 * vpp_session.detect_mult,
2889         )
2890         self.cli_verify_no_response(
2891             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2892             "desired-min-tx %s required-min-rx %s detect-mult %s"
2893             % (
2894                 self.pg0.name,
2895                 self.pg0.local_ip4,
2896                 self.pg0.remote_ip4,
2897                 mod_session.desired_min_tx,
2898                 mod_session.required_min_rx,
2899                 mod_session.detect_mult,
2900             )
2901         )
2902         verify_bfd_session_config(self, mod_session)
2903         cli_del_cmd = (
2904             "bfd udp session del interface %s local-addr %s "
2905             "peer-addr %s" % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2906         )
2907         self.cli_verify_no_response(cli_del_cmd)
2908         # 2nd del is expected to fail
2909         self.cli_verify_response(
2910             cli_del_cmd,
2911             "bfd udp session del: `bfd_udp_del_session' API call"
2912             " failed, rv=-102:No such BFD object",
2913         )
2914         self.assertFalse(vpp_session.query_vpp_config())
2915
2916     def test_add_mod_del_bfd_udp6(self):
2917         """create/modify/delete IPv6 BFD UDP session"""
2918         vpp_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2919         self.registry.register(vpp_session, self.logger)
2920         cli_add_cmd = (
2921             "bfd udp session add interface %s local-addr %s "
2922             "peer-addr %s desired-min-tx %s required-min-rx %s "
2923             "detect-mult %s"
2924             % (
2925                 self.pg0.name,
2926                 self.pg0.local_ip6,
2927                 self.pg0.remote_ip6,
2928                 vpp_session.desired_min_tx,
2929                 vpp_session.required_min_rx,
2930                 vpp_session.detect_mult,
2931             )
2932         )
2933         self.cli_verify_no_response(cli_add_cmd)
2934         # 2nd add should fail
2935         self.cli_verify_response(
2936             cli_add_cmd,
2937             "bfd udp session add: `bfd_add_add_session' API call"
2938             " failed, rv=-101:Duplicate BFD object",
2939         )
2940         verify_bfd_session_config(self, vpp_session)
2941         mod_session = VppBFDUDPSession(
2942             self,
2943             self.pg0,
2944             self.pg0.remote_ip6,
2945             af=AF_INET6,
2946             required_min_rx=2 * vpp_session.required_min_rx,
2947             desired_min_tx=3 * vpp_session.desired_min_tx,
2948             detect_mult=4 * vpp_session.detect_mult,
2949         )
2950         self.cli_verify_no_response(
2951             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2952             "desired-min-tx %s required-min-rx %s detect-mult %s"
2953             % (
2954                 self.pg0.name,
2955                 self.pg0.local_ip6,
2956                 self.pg0.remote_ip6,
2957                 mod_session.desired_min_tx,
2958                 mod_session.required_min_rx,
2959                 mod_session.detect_mult,
2960             )
2961         )
2962         verify_bfd_session_config(self, mod_session)
2963         cli_del_cmd = (
2964             "bfd udp session del interface %s local-addr %s "
2965             "peer-addr %s" % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6)
2966         )
2967         self.cli_verify_no_response(cli_del_cmd)
2968         # 2nd del is expected to fail
2969         self.cli_verify_response(
2970             cli_del_cmd,
2971             "bfd udp session del: `bfd_udp_del_session' API call"
2972             " failed, rv=-102:No such BFD object",
2973         )
2974         self.assertFalse(vpp_session.query_vpp_config())
2975
2976     def test_add_mod_del_bfd_udp_auth(self):
2977         """create/modify/delete IPv4 BFD UDP session (authenticated)"""
2978         key = self.factory.create_random_key(self)
2979         key.add_vpp_config()
2980         vpp_session = VppBFDUDPSession(
2981             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2982         )
2983         self.registry.register(vpp_session, self.logger)
2984         cli_add_cmd = (
2985             "bfd udp session add interface %s local-addr %s "
2986             "peer-addr %s desired-min-tx %s required-min-rx %s "
2987             "detect-mult %s conf-key-id %s bfd-key-id %s"
2988             % (
2989                 self.pg0.name,
2990                 self.pg0.local_ip4,
2991                 self.pg0.remote_ip4,
2992                 vpp_session.desired_min_tx,
2993                 vpp_session.required_min_rx,
2994                 vpp_session.detect_mult,
2995                 key.conf_key_id,
2996                 vpp_session.bfd_key_id,
2997             )
2998         )
2999         self.cli_verify_no_response(cli_add_cmd)
3000         # 2nd add should fail
3001         self.cli_verify_response(
3002             cli_add_cmd,
3003             "bfd udp session add: `bfd_add_add_session' API call"
3004             " failed, rv=-101:Duplicate BFD object",
3005         )
3006         verify_bfd_session_config(self, vpp_session)
3007         mod_session = VppBFDUDPSession(
3008             self,
3009             self.pg0,
3010             self.pg0.remote_ip4,
3011             sha1_key=key,
3012             bfd_key_id=vpp_session.bfd_key_id,
3013             required_min_rx=2 * vpp_session.required_min_rx,
3014             desired_min_tx=3 * vpp_session.desired_min_tx,
3015             detect_mult=4 * vpp_session.detect_mult,
3016         )
3017         self.cli_verify_no_response(
3018             "bfd udp session mod interface %s local-addr %s peer-addr %s "
3019             "desired-min-tx %s required-min-rx %s detect-mult %s"
3020             % (
3021                 self.pg0.name,
3022                 self.pg0.local_ip4,
3023                 self.pg0.remote_ip4,
3024                 mod_session.desired_min_tx,
3025                 mod_session.required_min_rx,
3026                 mod_session.detect_mult,
3027             )
3028         )
3029         verify_bfd_session_config(self, mod_session)
3030         cli_del_cmd = (
3031             "bfd udp session del interface %s local-addr %s "
3032             "peer-addr %s" % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
3033         )
3034         self.cli_verify_no_response(cli_del_cmd)
3035         # 2nd del is expected to fail
3036         self.cli_verify_response(
3037             cli_del_cmd,
3038             "bfd udp session del: `bfd_udp_del_session' API call"
3039             " failed, rv=-102:No such BFD object",
3040         )
3041         self.assertFalse(vpp_session.query_vpp_config())
3042
3043     def test_add_mod_del_bfd_udp6_auth(self):
3044         """create/modify/delete IPv6 BFD UDP session (authenticated)"""
3045         key = self.factory.create_random_key(
3046             self, auth_type=BFDAuthType.meticulous_keyed_sha1
3047         )
3048         key.add_vpp_config()
3049         vpp_session = VppBFDUDPSession(
3050             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key
3051         )
3052         self.registry.register(vpp_session, self.logger)
3053         cli_add_cmd = (
3054             "bfd udp session add interface %s local-addr %s "
3055             "peer-addr %s desired-min-tx %s required-min-rx %s "
3056             "detect-mult %s conf-key-id %s bfd-key-id %s"
3057             % (
3058                 self.pg0.name,
3059                 self.pg0.local_ip6,
3060                 self.pg0.remote_ip6,
3061                 vpp_session.desired_min_tx,
3062                 vpp_session.required_min_rx,
3063                 vpp_session.detect_mult,
3064                 key.conf_key_id,
3065                 vpp_session.bfd_key_id,
3066             )
3067         )
3068         self.cli_verify_no_response(cli_add_cmd)
3069         # 2nd add should fail
3070         self.cli_verify_response(
3071             cli_add_cmd,
3072             "bfd udp session add: `bfd_add_add_session' API call"
3073             " failed, rv=-101:Duplicate BFD object",
3074         )
3075         verify_bfd_session_config(self, vpp_session)
3076         mod_session = VppBFDUDPSession(
3077             self,
3078             self.pg0,
3079             self.pg0.remote_ip6,
3080             af=AF_INET6,
3081             sha1_key=key,
3082             bfd_key_id=vpp_session.bfd_key_id,
3083             required_min_rx=2 * vpp_session.required_min_rx,
3084             desired_min_tx=3 * vpp_session.desired_min_tx,
3085             detect_mult=4 * vpp_session.detect_mult,
3086         )
3087         self.cli_verify_no_response(
3088             "bfd udp session mod interface %s local-addr %s peer-addr %s "
3089             "desired-min-tx %s required-min-rx %s detect-mult %s"
3090             % (
3091                 self.pg0.name,
3092                 self.pg0.local_ip6,
3093                 self.pg0.remote_ip6,
3094                 mod_session.desired_min_tx,
3095                 mod_session.required_min_rx,
3096                 mod_session.detect_mult,
3097             )
3098         )
3099         verify_bfd_session_config(self, mod_session)
3100         cli_del_cmd = (
3101             "bfd udp session del interface %s local-addr %s "
3102             "peer-addr %s" % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6)
3103         )
3104         self.cli_verify_no_response(cli_del_cmd)
3105         # 2nd del is expected to fail
3106         self.cli_verify_response(
3107             cli_del_cmd,
3108             "bfd udp session del: `bfd_udp_del_session' API call"
3109             " failed, rv=-102:No such BFD object",
3110         )
3111         self.assertFalse(vpp_session.query_vpp_config())
3112
3113     def test_auth_on_off(self):
3114         """turn authentication on and off"""
3115         key = self.factory.create_random_key(
3116             self, auth_type=BFDAuthType.meticulous_keyed_sha1
3117         )
3118         key.add_vpp_config()
3119         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
3120         auth_session = VppBFDUDPSession(
3121             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
3122         )
3123         session.add_vpp_config()
3124         cli_activate = (
3125             "bfd udp session auth activate interface %s local-addr %s "
3126             "peer-addr %s conf-key-id %s bfd-key-id %s"
3127             % (
3128                 self.pg0.name,
3129                 self.pg0.local_ip4,
3130                 self.pg0.remote_ip4,
3131                 key.conf_key_id,
3132                 auth_session.bfd_key_id,
3133             )
3134         )
3135         self.cli_verify_no_response(cli_activate)
3136         verify_bfd_session_config(self, auth_session)
3137         self.cli_verify_no_response(cli_activate)
3138         verify_bfd_session_config(self, auth_session)
3139         cli_deactivate = (
3140             "bfd udp session auth deactivate interface %s local-addr %s "
3141             "peer-addr %s " % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
3142         )
3143         self.cli_verify_no_response(cli_deactivate)
3144         verify_bfd_session_config(self, session)
3145         self.cli_verify_no_response(cli_deactivate)
3146         verify_bfd_session_config(self, session)
3147
3148     def test_auth_on_off_delayed(self):
3149         """turn authentication on and off (delayed)"""
3150         key = self.factory.create_random_key(
3151             self, auth_type=BFDAuthType.meticulous_keyed_sha1
3152         )
3153         key.add_vpp_config()
3154         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
3155         auth_session = VppBFDUDPSession(
3156             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
3157         )
3158         session.add_vpp_config()
3159         cli_activate = (
3160             "bfd udp session auth activate interface %s local-addr %s "
3161             "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"
3162             % (
3163                 self.pg0.name,
3164                 self.pg0.local_ip4,
3165                 self.pg0.remote_ip4,
3166                 key.conf_key_id,
3167                 auth_session.bfd_key_id,
3168             )
3169         )
3170         self.cli_verify_no_response(cli_activate)
3171         verify_bfd_session_config(self, auth_session)
3172         self.cli_verify_no_response(cli_activate)
3173         verify_bfd_session_config(self, auth_session)
3174         cli_deactivate = (
3175             "bfd udp session auth deactivate interface %s local-addr %s "
3176             "peer-addr %s delayed yes"
3177             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
3178         )
3179         self.cli_verify_no_response(cli_deactivate)
3180         verify_bfd_session_config(self, session)
3181         self.cli_verify_no_response(cli_deactivate)
3182         verify_bfd_session_config(self, session)
3183
3184     def test_admin_up_down(self):
3185         """put session admin-up and admin-down"""
3186         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
3187         session.add_vpp_config()
3188         cli_down = (
3189             "bfd udp session set-flags admin down interface %s local-addr %s "
3190             "peer-addr %s " % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
3191         )
3192         cli_up = (
3193             "bfd udp session set-flags admin up interface %s local-addr %s "
3194             "peer-addr %s " % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
3195         )
3196         self.cli_verify_no_response(cli_down)
3197         verify_bfd_session_config(self, session, state=BFDState.admin_down)
3198         self.cli_verify_no_response(cli_up)
3199         verify_bfd_session_config(self, session, state=BFDState.down)
3200
3201     def test_set_del_udp_echo_source(self):
3202         """set/del udp echo source"""
3203         self.create_loopback_interfaces(1)
3204         self.loopback0 = self.lo_interfaces[0]
3205         self.loopback0.admin_up()
3206         self.cli_verify_response("show bfd echo-source", "UDP echo source is not set.")
3207         cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
3208         self.cli_verify_no_response(cli_set)
3209         self.cli_verify_response(
3210             "show bfd echo-source",
3211             "UDP echo source is: %s\n"
3212             "IPv4 address usable as echo source: none\n"
3213             "IPv6 address usable as echo source: none" % self.loopback0.name,
3214         )
3215         self.loopback0.config_ip4()
3216         echo_ip4 = str(
3217             ipaddress.IPv4Address(
3218                 int(ipaddress.IPv4Address(self.loopback0.local_ip4)) ^ 1
3219             )
3220         )
3221         self.cli_verify_response(
3222             "show bfd echo-source",
3223             "UDP echo source is: %s\n"
3224             "IPv4 address usable as echo source: %s\n"
3225             "IPv6 address usable as echo source: none"
3226             % (self.loopback0.name, echo_ip4),
3227         )
3228         echo_ip6 = str(
3229             ipaddress.IPv6Address(
3230                 int(ipaddress.IPv6Address(self.loopback0.local_ip6)) ^ 1
3231             )
3232         )
3233         self.loopback0.config_ip6()
3234         self.cli_verify_response(
3235             "show bfd echo-source",
3236             "UDP echo source is: %s\n"
3237             "IPv4 address usable as echo source: %s\n"
3238             "IPv6 address usable as echo source: %s"
3239             % (self.loopback0.name, echo_ip4, echo_ip6),
3240         )
3241         cli_del = "bfd udp echo-source del"
3242         self.cli_verify_no_response(cli_del)
3243         self.cli_verify_response("show bfd echo-source", "UDP echo source is not set.")
3244
3245
3246 if __name__ == "__main__":
3247     unittest.main(testRunner=VppTestRunner)