f66f75a3408ce31c3784ba271ad28a42627d5e75
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python3
2 """ BFD tests """
3
4 from __future__ import division
5
6 import binascii
7 import hashlib
8 import ipaddress
9 import time
10 import unittest
11 from random import randint, shuffle, getrandbits
12 from socket import AF_INET, AF_INET6, inet_ntop
13 from struct import pack, unpack
14
15 from six import moves
16 import scapy.compat
17 from scapy.layers.inet import UDP, IP
18 from scapy.layers.inet6 import IPv6
19 from scapy.layers.l2 import Ether, GRE
20 from scapy.packet import Raw
21
22 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
23     BFDDiagCode, BFDState, BFD_vpp_echo
24 from framework import VppTestCase, VppTestRunner, running_extended_tests
25 from util import ppp
26 from vpp_ip import DpoProto
27 from vpp_ip_route import VppIpRoute, VppRoutePath
28 from vpp_lo_interface import VppLoInterface
29 from vpp_papi_provider import UnexpectedApiReturnValueError, \
30     CliFailedCommandError
31 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
32 from vpp_gre_interface import VppGreInterface
33 from vpp_papi import VppEnum
34
35 USEC_IN_SEC = 1000000
36
37
38 class AuthKeyFactory(object):
39     """Factory class for creating auth keys with unique conf key ID"""
40
41     def __init__(self):
42         self._conf_key_ids = {}
43
44     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
45         """ create a random key with unique conf key id """
46         conf_key_id = randint(0, 0xFFFFFFFF)
47         while conf_key_id in self._conf_key_ids:
48             conf_key_id = randint(0, 0xFFFFFFFF)
49         self._conf_key_ids[conf_key_id] = 1
50         key = scapy.compat.raw(
51             bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
52         return VppBFDAuthKey(test=test, auth_type=auth_type,
53                              conf_key_id=conf_key_id, key=key)
54
55
56 class BFDAPITestCase(VppTestCase):
57     """Bidirectional Forwarding Detection (BFD) - API"""
58
59     pg0 = None
60     pg1 = None
61
62     @classmethod
63     def setUpClass(cls):
64         super(BFDAPITestCase, cls).setUpClass()
65         cls.vapi.cli("set log class bfd level debug")
66         try:
67             cls.create_pg_interfaces(range(2))
68             for i in cls.pg_interfaces:
69                 i.config_ip4()
70                 i.config_ip6()
71                 i.resolve_arp()
72
73         except Exception:
74             super(BFDAPITestCase, cls).tearDownClass()
75             raise
76
77     @classmethod
78     def tearDownClass(cls):
79         super(BFDAPITestCase, cls).tearDownClass()
80
81     def setUp(self):
82         super(BFDAPITestCase, self).setUp()
83         self.factory = AuthKeyFactory()
84
85     def test_add_bfd(self):
86         """ create a BFD session """
87         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
88         session.add_vpp_config()
89         self.logger.debug("Session state is %s", session.state)
90         session.remove_vpp_config()
91         session.add_vpp_config()
92         self.logger.debug("Session state is %s", session.state)
93         session.remove_vpp_config()
94
95     def test_double_add(self):
96         """ create the same BFD session twice (negative case) """
97         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
98         session.add_vpp_config()
99
100         with self.vapi.assert_negative_api_retval():
101             session.add_vpp_config()
102
103         session.remove_vpp_config()
104
105     def test_add_bfd6(self):
106         """ create IPv6 BFD session """
107         session = VppBFDUDPSession(
108             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
109         session.add_vpp_config()
110         self.logger.debug("Session state is %s", session.state)
111         session.remove_vpp_config()
112         session.add_vpp_config()
113         self.logger.debug("Session state is %s", session.state)
114         session.remove_vpp_config()
115
116     def test_mod_bfd(self):
117         """ modify BFD session parameters """
118         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
119                                    desired_min_tx=50000,
120                                    required_min_rx=10000,
121                                    detect_mult=1)
122         session.add_vpp_config()
123         s = session.get_bfd_udp_session_dump_entry()
124         self.assert_equal(session.desired_min_tx,
125                           s.desired_min_tx,
126                           "desired min transmit interval")
127         self.assert_equal(session.required_min_rx,
128                           s.required_min_rx,
129                           "required min receive interval")
130         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
131         session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
132                                   required_min_rx=session.required_min_rx * 2,
133                                   detect_mult=session.detect_mult * 2)
134         s = session.get_bfd_udp_session_dump_entry()
135         self.assert_equal(session.desired_min_tx,
136                           s.desired_min_tx,
137                           "desired min transmit interval")
138         self.assert_equal(session.required_min_rx,
139                           s.required_min_rx,
140                           "required min receive interval")
141         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
142
143     def test_add_sha1_keys(self):
144         """ add SHA1 keys """
145         key_count = 10
146         keys = [self.factory.create_random_key(
147             self) for i in range(0, key_count)]
148         for key in keys:
149             self.assertFalse(key.query_vpp_config())
150         for key in keys:
151             key.add_vpp_config()
152         for key in keys:
153             self.assertTrue(key.query_vpp_config())
154         # remove randomly
155         indexes = list(range(key_count))
156         shuffle(indexes)
157         removed = []
158         for i in indexes:
159             key = keys[i]
160             key.remove_vpp_config()
161             removed.append(i)
162             for j in range(key_count):
163                 key = keys[j]
164                 if j in removed:
165                     self.assertFalse(key.query_vpp_config())
166                 else:
167                     self.assertTrue(key.query_vpp_config())
168         # should be removed now
169         for key in keys:
170             self.assertFalse(key.query_vpp_config())
171         # add back and remove again
172         for key in keys:
173             key.add_vpp_config()
174         for key in keys:
175             self.assertTrue(key.query_vpp_config())
176         for key in keys:
177             key.remove_vpp_config()
178         for key in keys:
179             self.assertFalse(key.query_vpp_config())
180
181     def test_add_bfd_sha1(self):
182         """ create a BFD session (SHA1) """
183         key = self.factory.create_random_key(self)
184         key.add_vpp_config()
185         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
186                                    sha1_key=key)
187         session.add_vpp_config()
188         self.logger.debug("Session state is %s", session.state)
189         session.remove_vpp_config()
190         session.add_vpp_config()
191         self.logger.debug("Session state is %s", session.state)
192         session.remove_vpp_config()
193
194     def test_double_add_sha1(self):
195         """ create the same BFD session twice (negative case) (SHA1) """
196         key = self.factory.create_random_key(self)
197         key.add_vpp_config()
198         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
199                                    sha1_key=key)
200         session.add_vpp_config()
201         with self.assertRaises(Exception):
202             session.add_vpp_config()
203
204     def test_add_auth_nonexistent_key(self):
205         """ create BFD session using non-existent SHA1 (negative case) """
206         session = VppBFDUDPSession(
207             self, self.pg0, self.pg0.remote_ip4,
208             sha1_key=self.factory.create_random_key(self))
209         with self.assertRaises(Exception):
210             session.add_vpp_config()
211
212     def test_shared_sha1_key(self):
213         """ share single SHA1 key between multiple BFD sessions """
214         key = self.factory.create_random_key(self)
215         key.add_vpp_config()
216         sessions = [
217             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
218                              sha1_key=key),
219             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
220                              sha1_key=key, af=AF_INET6),
221             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
222                              sha1_key=key),
223             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
224                              sha1_key=key, af=AF_INET6)]
225         for s in sessions:
226             s.add_vpp_config()
227         removed = 0
228         for s in sessions:
229             e = key.get_bfd_auth_keys_dump_entry()
230             self.assert_equal(e.use_count, len(sessions) - removed,
231                               "Use count for shared key")
232             s.remove_vpp_config()
233             removed += 1
234         e = key.get_bfd_auth_keys_dump_entry()
235         self.assert_equal(e.use_count, len(sessions) - removed,
236                           "Use count for shared key")
237
238     def test_activate_auth(self):
239         """ activate SHA1 authentication """
240         key = self.factory.create_random_key(self)
241         key.add_vpp_config()
242         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
243         session.add_vpp_config()
244         session.activate_auth(key)
245
246     def test_deactivate_auth(self):
247         """ deactivate SHA1 authentication """
248         key = self.factory.create_random_key(self)
249         key.add_vpp_config()
250         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
251         session.add_vpp_config()
252         session.activate_auth(key)
253         session.deactivate_auth()
254
255     def test_change_key(self):
256         """ change SHA1 key """
257         key1 = self.factory.create_random_key(self)
258         key2 = self.factory.create_random_key(self)
259         while key2.conf_key_id == key1.conf_key_id:
260             key2 = self.factory.create_random_key(self)
261         key1.add_vpp_config()
262         key2.add_vpp_config()
263         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
264                                    sha1_key=key1)
265         session.add_vpp_config()
266         session.activate_auth(key2)
267
268     def test_set_del_udp_echo_source(self):
269         """ set/del udp echo source """
270         self.create_loopback_interfaces(1)
271         self.loopback0 = self.lo_interfaces[0]
272         self.loopback0.admin_up()
273         echo_source = self.vapi.bfd_udp_get_echo_source()
274         self.assertFalse(echo_source.is_set)
275         self.assertFalse(echo_source.have_usable_ip4)
276         self.assertFalse(echo_source.have_usable_ip6)
277
278         self.vapi.bfd_udp_set_echo_source(
279             sw_if_index=self.loopback0.sw_if_index)
280         echo_source = self.vapi.bfd_udp_get_echo_source()
281         self.assertTrue(echo_source.is_set)
282         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
283         self.assertFalse(echo_source.have_usable_ip4)
284         self.assertFalse(echo_source.have_usable_ip6)
285
286         self.loopback0.config_ip4()
287         echo_ip4 = ipaddress.IPv4Address(int(ipaddress.IPv4Address(
288             self.loopback0.local_ip4)) ^ 1).packed
289         echo_source = self.vapi.bfd_udp_get_echo_source()
290         self.assertTrue(echo_source.is_set)
291         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
292         self.assertTrue(echo_source.have_usable_ip4)
293         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
294         self.assertFalse(echo_source.have_usable_ip6)
295
296         self.loopback0.config_ip6()
297         echo_ip6 = ipaddress.IPv6Address(int(ipaddress.IPv6Address(
298             self.loopback0.local_ip6)) ^ 1).packed
299
300         echo_source = self.vapi.bfd_udp_get_echo_source()
301         self.assertTrue(echo_source.is_set)
302         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
303         self.assertTrue(echo_source.have_usable_ip4)
304         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
305         self.assertTrue(echo_source.have_usable_ip6)
306         self.assertEqual(echo_source.ip6_addr.packed, echo_ip6)
307
308         self.vapi.bfd_udp_del_echo_source()
309         echo_source = self.vapi.bfd_udp_get_echo_source()
310         self.assertFalse(echo_source.is_set)
311         self.assertFalse(echo_source.have_usable_ip4)
312         self.assertFalse(echo_source.have_usable_ip6)
313
314
315 class BFDTestSession(object):
316     """ BFD session as seen from test framework side """
317
318     def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
319                  bfd_key_id=None, our_seq_number=None,
320                  tunnel_header=None, phy_interface=None):
321         self.test = test
322         self.af = af
323         self.sha1_key = sha1_key
324         self.bfd_key_id = bfd_key_id
325         self.interface = interface
326         if phy_interface:
327             self.phy_interface = phy_interface
328         else:
329             self.phy_interface = self.interface
330         self.udp_sport = randint(49152, 65535)
331         if our_seq_number is None:
332             self.our_seq_number = randint(0, 40000000)
333         else:
334             self.our_seq_number = our_seq_number
335         self.vpp_seq_number = None
336         self.my_discriminator = 0
337         self.desired_min_tx = 300000
338         self.required_min_rx = 300000
339         self.required_min_echo_rx = None
340         self.detect_mult = detect_mult
341         self.diag = BFDDiagCode.no_diagnostic
342         self.your_discriminator = None
343         self.state = BFDState.down
344         self.auth_type = BFDAuthType.no_auth
345         self.tunnel_header = tunnel_header
346
347     def inc_seq_num(self):
348         """ increment sequence number, wrapping if needed """
349         if self.our_seq_number == 0xFFFFFFFF:
350             self.our_seq_number = 0
351         else:
352             self.our_seq_number += 1
353
354     def update(self, my_discriminator=None, your_discriminator=None,
355                desired_min_tx=None, required_min_rx=None,
356                required_min_echo_rx=None, detect_mult=None,
357                diag=None, state=None, auth_type=None):
358         """ update BFD parameters associated with session """
359         if my_discriminator is not None:
360             self.my_discriminator = my_discriminator
361         if your_discriminator is not None:
362             self.your_discriminator = your_discriminator
363         if required_min_rx is not None:
364             self.required_min_rx = required_min_rx
365         if required_min_echo_rx is not None:
366             self.required_min_echo_rx = required_min_echo_rx
367         if desired_min_tx is not None:
368             self.desired_min_tx = desired_min_tx
369         if detect_mult is not None:
370             self.detect_mult = detect_mult
371         if diag is not None:
372             self.diag = diag
373         if state is not None:
374             self.state = state
375         if auth_type is not None:
376             self.auth_type = auth_type
377
378     def fill_packet_fields(self, packet):
379         """ set packet fields with known values in packet """
380         bfd = packet[BFD]
381         if self.my_discriminator:
382             self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
383                                    self.my_discriminator)
384             bfd.my_discriminator = self.my_discriminator
385         if self.your_discriminator:
386             self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
387                                    self.your_discriminator)
388             bfd.your_discriminator = self.your_discriminator
389         if self.required_min_rx:
390             self.test.logger.debug(
391                 "BFD: setting packet.required_min_rx_interval=%s",
392                 self.required_min_rx)
393             bfd.required_min_rx_interval = self.required_min_rx
394         if self.required_min_echo_rx:
395             self.test.logger.debug(
396                 "BFD: setting packet.required_min_echo_rx=%s",
397                 self.required_min_echo_rx)
398             bfd.required_min_echo_rx_interval = self.required_min_echo_rx
399         if self.desired_min_tx:
400             self.test.logger.debug(
401                 "BFD: setting packet.desired_min_tx_interval=%s",
402                 self.desired_min_tx)
403             bfd.desired_min_tx_interval = self.desired_min_tx
404         if self.detect_mult:
405             self.test.logger.debug(
406                 "BFD: setting packet.detect_mult=%s", self.detect_mult)
407             bfd.detect_mult = self.detect_mult
408         if self.diag:
409             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
410             bfd.diag = self.diag
411         if self.state:
412             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
413             bfd.state = self.state
414         if self.auth_type:
415             # this is used by a negative test-case
416             self.test.logger.debug("BFD: setting packet.auth_type=%s",
417                                    self.auth_type)
418             bfd.auth_type = self.auth_type
419
420     def create_packet(self):
421         """ create a BFD packet, reflecting the current state of session """
422         if self.sha1_key:
423             bfd = BFD(flags="A")
424             bfd.auth_type = self.sha1_key.auth_type
425             bfd.auth_len = BFD.sha1_auth_len
426             bfd.auth_key_id = self.bfd_key_id
427             bfd.auth_seq_num = self.our_seq_number
428             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
429         else:
430             bfd = BFD()
431         packet = Ether(src=self.phy_interface.remote_mac,
432                        dst=self.phy_interface.local_mac)
433         if self.tunnel_header:
434             packet = packet / self.tunnel_header
435         if self.af == AF_INET6:
436             packet = (packet /
437                       IPv6(src=self.interface.remote_ip6,
438                            dst=self.interface.local_ip6,
439                            hlim=255) /
440                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
441                       bfd)
442         else:
443             packet = (packet /
444                       IP(src=self.interface.remote_ip4,
445                          dst=self.interface.local_ip4,
446                          ttl=255) /
447                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
448                       bfd)
449         self.test.logger.debug("BFD: Creating packet")
450         self.fill_packet_fields(packet)
451         if self.sha1_key:
452             hash_material = scapy.compat.raw(
453                 packet[BFD])[:32] + self.sha1_key.key + \
454                 b"\0" * (20 - len(self.sha1_key.key))
455             self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
456                                    hashlib.sha1(hash_material).hexdigest())
457             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
458         return packet
459
460     def send_packet(self, packet=None, interface=None):
461         """ send packet on interface, creating the packet if needed """
462         if packet is None:
463             packet = self.create_packet()
464         if interface is None:
465             interface = self.phy_interface
466         self.test.logger.debug(ppp("Sending packet:", packet))
467         interface.add_stream(packet)
468         self.test.pg_start()
469
470     def verify_sha1_auth(self, packet):
471         """ Verify correctness of authentication in BFD layer. """
472         bfd = packet[BFD]
473         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
474         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
475                                BFDAuthType)
476         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
477         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
478         if self.vpp_seq_number is None:
479             self.vpp_seq_number = bfd.auth_seq_num
480             self.test.logger.debug("Received initial sequence number: %s" %
481                                    self.vpp_seq_number)
482         else:
483             recvd_seq_num = bfd.auth_seq_num
484             self.test.logger.debug("Received followup sequence number: %s" %
485                                    recvd_seq_num)
486             if self.vpp_seq_number < 0xffffffff:
487                 if self.sha1_key.auth_type == \
488                         BFDAuthType.meticulous_keyed_sha1:
489                     self.test.assert_equal(recvd_seq_num,
490                                            self.vpp_seq_number + 1,
491                                            "BFD sequence number")
492                 else:
493                     self.test.assert_in_range(recvd_seq_num,
494                                               self.vpp_seq_number,
495                                               self.vpp_seq_number + 1,
496                                               "BFD sequence number")
497             else:
498                 if self.sha1_key.auth_type == \
499                         BFDAuthType.meticulous_keyed_sha1:
500                     self.test.assert_equal(recvd_seq_num, 0,
501                                            "BFD sequence number")
502                 else:
503                     self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
504                                        "BFD sequence number not one of "
505                                        "(%s, 0)" % self.vpp_seq_number)
506             self.vpp_seq_number = recvd_seq_num
507         # last 20 bytes represent the hash - so replace them with the key,
508         # pad the result with zeros and hash the result
509         hash_material = bfd.original[:-20] + self.sha1_key.key + \
510             b"\0" * (20 - len(self.sha1_key.key))
511         expected_hash = hashlib.sha1(hash_material).hexdigest()
512         self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
513                                expected_hash.encode(), "Auth key hash")
514
515     def verify_bfd(self, packet):
516         """ Verify correctness of BFD layer. """
517         bfd = packet[BFD]
518         self.test.assert_equal(bfd.version, 1, "BFD version")
519         self.test.assert_equal(bfd.your_discriminator,
520                                self.my_discriminator,
521                                "BFD - your discriminator")
522         if self.sha1_key:
523             self.verify_sha1_auth(packet)
524
525
526 def bfd_session_up(test):
527     """ Bring BFD session up """
528     test.logger.info("BFD: Waiting for slow hello")
529     p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
530     old_offset = None
531     if hasattr(test, 'vpp_clock_offset'):
532         old_offset = test.vpp_clock_offset
533     test.vpp_clock_offset = time.time() - float(p.time)
534     test.logger.debug("BFD: Calculated vpp clock offset: %s",
535                       test.vpp_clock_offset)
536     if old_offset:
537         test.assertAlmostEqual(
538             old_offset, test.vpp_clock_offset, delta=0.5,
539             msg="vpp clock offset not stable (new: %s, old: %s)" %
540             (test.vpp_clock_offset, old_offset))
541     test.logger.info("BFD: Sending Init")
542     test.test_session.update(my_discriminator=randint(0, 40000000),
543                              your_discriminator=p[BFD].my_discriminator,
544                              state=BFDState.init)
545     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
546             BFDAuthType.meticulous_keyed_sha1:
547         test.test_session.inc_seq_num()
548     test.test_session.send_packet()
549     test.logger.info("BFD: Waiting for event")
550     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
551     verify_event(test, e, expected_state=BFDState.up)
552     test.logger.info("BFD: Session is Up")
553     test.test_session.update(state=BFDState.up)
554     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
555             BFDAuthType.meticulous_keyed_sha1:
556         test.test_session.inc_seq_num()
557     test.test_session.send_packet()
558     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
559
560
561 def bfd_session_down(test):
562     """ Bring BFD session down """
563     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
564     test.test_session.update(state=BFDState.down)
565     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
566             BFDAuthType.meticulous_keyed_sha1:
567         test.test_session.inc_seq_num()
568     test.test_session.send_packet()
569     test.logger.info("BFD: Waiting for event")
570     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
571     verify_event(test, e, expected_state=BFDState.down)
572     test.logger.info("BFD: Session is Down")
573     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
574
575
576 def verify_bfd_session_config(test, session, state=None):
577     dump = session.get_bfd_udp_session_dump_entry()
578     test.assertIsNotNone(dump)
579     # since dump is not none, we have verified that sw_if_index and addresses
580     # are valid (in get_bfd_udp_session_dump_entry)
581     if state:
582         test.assert_equal(dump.state, state, "session state")
583     test.assert_equal(dump.required_min_rx, session.required_min_rx,
584                       "required min rx interval")
585     test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
586                       "desired min tx interval")
587     test.assert_equal(dump.detect_mult, session.detect_mult,
588                       "detect multiplier")
589     if session.sha1_key is None:
590         test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
591     else:
592         test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
593         test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
594                           "bfd key id")
595         test.assert_equal(dump.conf_key_id,
596                           session.sha1_key.conf_key_id,
597                           "config key id")
598
599
600 def verify_ip(test, packet):
601     """ Verify correctness of IP layer. """
602     if test.vpp_session.af == AF_INET6:
603         ip = packet[IPv6]
604         local_ip = test.vpp_session.interface.local_ip6
605         remote_ip = test.vpp_session.interface.remote_ip6
606         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
607     else:
608         ip = packet[IP]
609         local_ip = test.vpp_session.interface.local_ip4
610         remote_ip = test.vpp_session.interface.remote_ip4
611         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
612     test.assert_equal(ip.src, local_ip, "IP source address")
613     test.assert_equal(ip.dst, remote_ip, "IP destination address")
614
615
616 def verify_udp(test, packet):
617     """ Verify correctness of UDP layer. """
618     udp = packet[UDP]
619     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
620     test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
621                          "UDP source port")
622
623
624 def verify_event(test, event, expected_state):
625     """ Verify correctness of event values. """
626     e = event
627     test.logger.debug("BFD: Event: %s" % moves.reprlib.repr(e))
628     test.assert_equal(e.sw_if_index,
629                       test.vpp_session.interface.sw_if_index,
630                       "BFD interface index")
631
632     test.assert_equal(str(e.local_addr), test.vpp_session.local_addr,
633                       "Local IPv6 address")
634     test.assert_equal(str(e.peer_addr), test.vpp_session.peer_addr,
635                       "Peer IPv6 address")
636     test.assert_equal(e.state, expected_state, BFDState)
637
638
639 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
640     """ wait for BFD packet and verify its correctness
641
642     :param timeout: how long to wait
643     :param pcap_time_min: ignore packets with pcap timestamp lower than this
644
645     :returns: tuple (packet, time spent waiting for packet)
646     """
647     test.logger.info("BFD: Waiting for BFD packet")
648     deadline = time.time() + timeout
649     counter = 0
650     while True:
651         counter += 1
652         # sanity check
653         test.assert_in_range(counter, 0, 100, "number of packets ignored")
654         time_left = deadline - time.time()
655         if time_left < 0:
656             raise CaptureTimeoutError("Packet did not arrive within timeout")
657         p = test.pg0.wait_for_packet(timeout=time_left)
658         test.logger.debug(ppp("BFD: Got packet:", p))
659         if pcap_time_min is not None and p.time < pcap_time_min:
660             test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
661                                   "pcap time min %s):" %
662                                   (p.time, pcap_time_min), p))
663         else:
664             break
665     if is_tunnel:
666         # strip an IP layer and move to the next
667         p = p[IP].payload
668
669     bfd = p[BFD]
670     if bfd is None:
671         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
672     if bfd.payload:
673         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
674     verify_ip(test, p)
675     verify_udp(test, p)
676     test.test_session.verify_bfd(p)
677     return p
678
679
680 class BFD4TestCase(VppTestCase):
681     """Bidirectional Forwarding Detection (BFD)"""
682
683     pg0 = None
684     vpp_clock_offset = None
685     vpp_session = None
686     test_session = None
687
688     @classmethod
689     def force_solo(cls):
690         return True
691
692     @classmethod
693     def setUpClass(cls):
694         super(BFD4TestCase, cls).setUpClass()
695         cls.vapi.cli("set log class bfd level debug")
696         try:
697             cls.create_pg_interfaces([0])
698             cls.create_loopback_interfaces(1)
699             cls.loopback0 = cls.lo_interfaces[0]
700             cls.loopback0.config_ip4()
701             cls.loopback0.admin_up()
702             cls.pg0.config_ip4()
703             cls.pg0.configure_ipv4_neighbors()
704             cls.pg0.admin_up()
705             cls.pg0.resolve_arp()
706
707         except Exception:
708             super(BFD4TestCase, cls).tearDownClass()
709             raise
710
711     @classmethod
712     def tearDownClass(cls):
713         super(BFD4TestCase, cls).tearDownClass()
714
715     def setUp(self):
716         super(BFD4TestCase, self).setUp()
717         self.factory = AuthKeyFactory()
718         self.vapi.want_bfd_events()
719         self.pg0.enable_capture()
720         try:
721             self.vpp_session = VppBFDUDPSession(self, self.pg0,
722                                                 self.pg0.remote_ip4)
723             self.vpp_session.add_vpp_config()
724             self.vpp_session.admin_up()
725             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
726         except BaseException:
727             self.vapi.want_bfd_events(enable_disable=0)
728             raise
729
730     def tearDown(self):
731         if not self.vpp_dead:
732             self.vapi.want_bfd_events(enable_disable=0)
733         self.vapi.collect_events()  # clear the event queue
734         super(BFD4TestCase, self).tearDown()
735
736     def test_session_up(self):
737         """ bring BFD session up """
738         bfd_session_up(self)
739
740     def test_session_up_by_ip(self):
741         """ bring BFD session up - first frame looked up by address pair """
742         self.logger.info("BFD: Sending Slow control frame")
743         self.test_session.update(my_discriminator=randint(0, 40000000))
744         self.test_session.send_packet()
745         self.pg0.enable_capture()
746         p = self.pg0.wait_for_packet(1)
747         self.assert_equal(p[BFD].your_discriminator,
748                           self.test_session.my_discriminator,
749                           "BFD - your discriminator")
750         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
751         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
752                                  state=BFDState.up)
753         self.logger.info("BFD: Waiting for event")
754         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
755         verify_event(self, e, expected_state=BFDState.init)
756         self.logger.info("BFD: Sending Up")
757         self.test_session.send_packet()
758         self.logger.info("BFD: Waiting for event")
759         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
760         verify_event(self, e, expected_state=BFDState.up)
761         self.logger.info("BFD: Session is Up")
762         self.test_session.update(state=BFDState.up)
763         self.test_session.send_packet()
764         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
765
766     def test_session_down(self):
767         """ bring BFD session down """
768         bfd_session_up(self)
769         bfd_session_down(self)
770
771     def test_hold_up(self):
772         """ hold BFD session up """
773         bfd_session_up(self)
774         for dummy in range(self.test_session.detect_mult * 2):
775             wait_for_bfd_packet(self)
776             self.test_session.send_packet()
777         self.assert_equal(len(self.vapi.collect_events()), 0,
778                           "number of bfd events")
779
780     def test_slow_timer(self):
781         """ verify slow periodic control frames while session down """
782         packet_count = 3
783         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
784         prev_packet = wait_for_bfd_packet(self, 2)
785         for dummy in range(packet_count):
786             next_packet = wait_for_bfd_packet(self, 2)
787             time_diff = next_packet.time - prev_packet.time
788             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
789             # to work around timing issues
790             self.assert_in_range(
791                 time_diff, 0.70, 1.05, "time between slow packets")
792             prev_packet = next_packet
793
794     def test_zero_remote_min_rx(self):
795         """ no packets when zero remote required min rx interval """
796         bfd_session_up(self)
797         self.test_session.update(required_min_rx=0)
798         self.test_session.send_packet()
799         for dummy in range(self.test_session.detect_mult):
800             self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
801                        "sleep before transmitting bfd packet")
802             self.test_session.send_packet()
803             try:
804                 p = wait_for_bfd_packet(self, timeout=0)
805                 self.logger.error(ppp("Received unexpected packet:", p))
806             except CaptureTimeoutError:
807                 pass
808         self.assert_equal(
809             len(self.vapi.collect_events()), 0, "number of bfd events")
810         self.test_session.update(required_min_rx=300000)
811         for dummy in range(3):
812             self.test_session.send_packet()
813             wait_for_bfd_packet(
814                 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
815         self.assert_equal(
816             len(self.vapi.collect_events()), 0, "number of bfd events")
817
818     def test_conn_down(self):
819         """ verify session goes down after inactivity """
820         bfd_session_up(self)
821         detection_time = self.test_session.detect_mult *\
822             self.vpp_session.required_min_rx / USEC_IN_SEC
823         self.sleep(detection_time, "waiting for BFD session time-out")
824         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
825         verify_event(self, e, expected_state=BFDState.down)
826
827     def test_peer_discr_reset_sess_down(self):
828         """ peer discriminator reset after session goes down """
829         bfd_session_up(self)
830         detection_time = self.test_session.detect_mult *\
831             self.vpp_session.required_min_rx / USEC_IN_SEC
832         self.sleep(detection_time, "waiting for BFD session time-out")
833         self.test_session.my_discriminator = 0
834         wait_for_bfd_packet(self,
835                             pcap_time_min=time.time() - self.vpp_clock_offset)
836
837     def test_large_required_min_rx(self):
838         """ large remote required min rx interval """
839         bfd_session_up(self)
840         p = wait_for_bfd_packet(self)
841         interval = 3000000
842         self.test_session.update(required_min_rx=interval)
843         self.test_session.send_packet()
844         time_mark = time.time()
845         count = 0
846         # busy wait here, trying to collect a packet or event, vpp is not
847         # allowed to send packets and the session will timeout first - so the
848         # Up->Down event must arrive before any packets do
849         while time.time() < time_mark + interval / USEC_IN_SEC:
850             try:
851                 p = wait_for_bfd_packet(self, timeout=0)
852                 # if vpp managed to send a packet before we did the session
853                 # session update, then that's fine, ignore it
854                 if p.time < time_mark - self.vpp_clock_offset:
855                     continue
856                 self.logger.error(ppp("Received unexpected packet:", p))
857                 count += 1
858             except CaptureTimeoutError:
859                 pass
860             events = self.vapi.collect_events()
861             if len(events) > 0:
862                 verify_event(self, events[0], BFDState.down)
863                 break
864         self.assert_equal(count, 0, "number of packets received")
865
866     def test_immediate_remote_min_rx_reduction(self):
867         """ immediately honor remote required min rx reduction """
868         self.vpp_session.remove_vpp_config()
869         self.vpp_session = VppBFDUDPSession(
870             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
871         self.pg0.enable_capture()
872         self.vpp_session.add_vpp_config()
873         self.test_session.update(desired_min_tx=1000000,
874                                  required_min_rx=1000000)
875         bfd_session_up(self)
876         reference_packet = wait_for_bfd_packet(self)
877         time_mark = time.time()
878         interval = 300000
879         self.test_session.update(required_min_rx=interval)
880         self.test_session.send_packet()
881         extra_time = time.time() - time_mark
882         p = wait_for_bfd_packet(self)
883         # first packet is allowed to be late by time we spent doing the update
884         # calculated in extra_time
885         self.assert_in_range(p.time - reference_packet.time,
886                              .95 * 0.75 * interval / USEC_IN_SEC,
887                              1.05 * interval / USEC_IN_SEC + extra_time,
888                              "time between BFD packets")
889         reference_packet = p
890         for dummy in range(3):
891             p = wait_for_bfd_packet(self)
892             diff = p.time - reference_packet.time
893             self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
894                                  1.05 * interval / USEC_IN_SEC,
895                                  "time between BFD packets")
896             reference_packet = p
897
898     def test_modify_req_min_rx_double(self):
899         """ modify session - double required min rx """
900         bfd_session_up(self)
901         p = wait_for_bfd_packet(self)
902         self.test_session.update(desired_min_tx=10000,
903                                  required_min_rx=10000)
904         self.test_session.send_packet()
905         # double required min rx
906         self.vpp_session.modify_parameters(
907             required_min_rx=2 * self.vpp_session.required_min_rx)
908         p = wait_for_bfd_packet(
909             self, pcap_time_min=time.time() - self.vpp_clock_offset)
910         # poll bit needs to be set
911         self.assertIn("P", p.sprintf("%BFD.flags%"),
912                       "Poll bit not set in BFD packet")
913         # finish poll sequence with final packet
914         final = self.test_session.create_packet()
915         final[BFD].flags = "F"
916         timeout = self.test_session.detect_mult * \
917             max(self.test_session.desired_min_tx,
918                 self.vpp_session.required_min_rx) / USEC_IN_SEC
919         self.test_session.send_packet(final)
920         time_mark = time.time()
921         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
922         verify_event(self, e, expected_state=BFDState.down)
923         time_to_event = time.time() - time_mark
924         self.assert_in_range(time_to_event, .9 * timeout,
925                              1.1 * timeout, "session timeout")
926
927     def test_modify_req_min_rx_halve(self):
928         """ modify session - halve required min rx """
929         self.vpp_session.modify_parameters(
930             required_min_rx=2 * self.vpp_session.required_min_rx)
931         bfd_session_up(self)
932         p = wait_for_bfd_packet(self)
933         self.test_session.update(desired_min_tx=10000,
934                                  required_min_rx=10000)
935         self.test_session.send_packet()
936         p = wait_for_bfd_packet(
937             self, pcap_time_min=time.time() - self.vpp_clock_offset)
938         # halve required min rx
939         old_required_min_rx = self.vpp_session.required_min_rx
940         self.vpp_session.modify_parameters(
941             required_min_rx=self.vpp_session.required_min_rx // 2)
942         # now we wait 0.8*3*old-req-min-rx and the session should still be up
943         self.sleep(0.8 * self.vpp_session.detect_mult *
944                    old_required_min_rx / USEC_IN_SEC,
945                    "wait before finishing poll sequence")
946         self.assert_equal(len(self.vapi.collect_events()), 0,
947                           "number of bfd events")
948         p = wait_for_bfd_packet(self)
949         # poll bit needs to be set
950         self.assertIn("P", p.sprintf("%BFD.flags%"),
951                       "Poll bit not set in BFD packet")
952         # finish poll sequence with final packet
953         final = self.test_session.create_packet()
954         final[BFD].flags = "F"
955         self.test_session.send_packet(final)
956         # now the session should time out under new conditions
957         detection_time = self.test_session.detect_mult *\
958             self.vpp_session.required_min_rx / USEC_IN_SEC
959         before = time.time()
960         e = self.vapi.wait_for_event(
961             2 * detection_time, "bfd_udp_session_details")
962         after = time.time()
963         self.assert_in_range(after - before,
964                              0.9 * detection_time,
965                              1.1 * detection_time,
966                              "time before bfd session goes down")
967         verify_event(self, e, expected_state=BFDState.down)
968
969     def test_modify_detect_mult(self):
970         """ modify detect multiplier """
971         bfd_session_up(self)
972         p = wait_for_bfd_packet(self)
973         self.vpp_session.modify_parameters(detect_mult=1)
974         p = wait_for_bfd_packet(
975             self, pcap_time_min=time.time() - self.vpp_clock_offset)
976         self.assert_equal(self.vpp_session.detect_mult,
977                           p[BFD].detect_mult,
978                           "detect mult")
979         # poll bit must not be set
980         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
981                          "Poll bit not set in BFD packet")
982         self.vpp_session.modify_parameters(detect_mult=10)
983         p = wait_for_bfd_packet(
984             self, pcap_time_min=time.time() - self.vpp_clock_offset)
985         self.assert_equal(self.vpp_session.detect_mult,
986                           p[BFD].detect_mult,
987                           "detect mult")
988         # poll bit must not be set
989         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
990                          "Poll bit not set in BFD packet")
991
992     def test_queued_poll(self):
993         """ test poll sequence queueing """
994         bfd_session_up(self)
995         p = wait_for_bfd_packet(self)
996         self.vpp_session.modify_parameters(
997             required_min_rx=2 * self.vpp_session.required_min_rx)
998         p = wait_for_bfd_packet(self)
999         poll_sequence_start = time.time()
1000         poll_sequence_length_min = 0.5
1001         send_final_after = time.time() + poll_sequence_length_min
1002         # poll bit needs to be set
1003         self.assertIn("P", p.sprintf("%BFD.flags%"),
1004                       "Poll bit not set in BFD packet")
1005         self.assert_equal(p[BFD].required_min_rx_interval,
1006                           self.vpp_session.required_min_rx,
1007                           "BFD required min rx interval")
1008         self.vpp_session.modify_parameters(
1009             required_min_rx=2 * self.vpp_session.required_min_rx)
1010         # 2nd poll sequence should be queued now
1011         # don't send the reply back yet, wait for some time to emulate
1012         # longer round-trip time
1013         packet_count = 0
1014         while time.time() < send_final_after:
1015             self.test_session.send_packet()
1016             p = wait_for_bfd_packet(self)
1017             self.assert_equal(len(self.vapi.collect_events()), 0,
1018                               "number of bfd events")
1019             self.assert_equal(p[BFD].required_min_rx_interval,
1020                               self.vpp_session.required_min_rx,
1021                               "BFD required min rx interval")
1022             packet_count += 1
1023             # poll bit must be set
1024             self.assertIn("P", p.sprintf("%BFD.flags%"),
1025                           "Poll bit not set in BFD packet")
1026         final = self.test_session.create_packet()
1027         final[BFD].flags = "F"
1028         self.test_session.send_packet(final)
1029         # finish 1st with final
1030         poll_sequence_length = time.time() - poll_sequence_start
1031         # vpp must wait for some time before starting new poll sequence
1032         poll_no_2_started = False
1033         for dummy in range(2 * packet_count):
1034             p = wait_for_bfd_packet(self)
1035             self.assert_equal(len(self.vapi.collect_events()), 0,
1036                               "number of bfd events")
1037             if "P" in p.sprintf("%BFD.flags%"):
1038                 poll_no_2_started = True
1039                 if time.time() < poll_sequence_start + poll_sequence_length:
1040                     raise Exception("VPP started 2nd poll sequence too soon")
1041                 final = self.test_session.create_packet()
1042                 final[BFD].flags = "F"
1043                 self.test_session.send_packet(final)
1044                 break
1045             else:
1046                 self.test_session.send_packet()
1047         self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1048         # finish 2nd with final
1049         final = self.test_session.create_packet()
1050         final[BFD].flags = "F"
1051         self.test_session.send_packet(final)
1052         p = wait_for_bfd_packet(self)
1053         # poll bit must not be set
1054         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1055                          "Poll bit set in BFD packet")
1056
1057     # returning inconsistent results requiring retries in per-patch tests
1058     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1059     def test_poll_response(self):
1060         """ test correct response to control frame with poll bit set """
1061         bfd_session_up(self)
1062         poll = self.test_session.create_packet()
1063         poll[BFD].flags = "P"
1064         self.test_session.send_packet(poll)
1065         final = wait_for_bfd_packet(
1066             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1067         self.assertIn("F", final.sprintf("%BFD.flags%"))
1068
1069     def test_no_periodic_if_remote_demand(self):
1070         """ no periodic frames outside poll sequence if remote demand set """
1071         bfd_session_up(self)
1072         demand = self.test_session.create_packet()
1073         demand[BFD].flags = "D"
1074         self.test_session.send_packet(demand)
1075         transmit_time = 0.9 \
1076             * max(self.vpp_session.required_min_rx,
1077                   self.test_session.desired_min_tx) \
1078             / USEC_IN_SEC
1079         count = 0
1080         for dummy in range(self.test_session.detect_mult * 2):
1081             self.sleep(transmit_time)
1082             self.test_session.send_packet(demand)
1083             try:
1084                 p = wait_for_bfd_packet(self, timeout=0)
1085                 self.logger.error(ppp("Received unexpected packet:", p))
1086                 count += 1
1087             except CaptureTimeoutError:
1088                 pass
1089         events = self.vapi.collect_events()
1090         for e in events:
1091             self.logger.error("Received unexpected event: %s", e)
1092         self.assert_equal(count, 0, "number of packets received")
1093         self.assert_equal(len(events), 0, "number of events received")
1094
1095     def test_echo_looped_back(self):
1096         """ echo packets looped back """
1097         # don't need a session in this case..
1098         self.vpp_session.remove_vpp_config()
1099         self.pg0.enable_capture()
1100         echo_packet_count = 10
1101         # random source port low enough to increment a few times..
1102         udp_sport_tx = randint(1, 50000)
1103         udp_sport_rx = udp_sport_tx
1104         echo_packet = (Ether(src=self.pg0.remote_mac,
1105                              dst=self.pg0.local_mac) /
1106                        IP(src=self.pg0.remote_ip4,
1107                           dst=self.pg0.remote_ip4) /
1108                        UDP(dport=BFD.udp_dport_echo) /
1109                        Raw("this should be looped back"))
1110         for dummy in range(echo_packet_count):
1111             self.sleep(.01, "delay between echo packets")
1112             echo_packet[UDP].sport = udp_sport_tx
1113             udp_sport_tx += 1
1114             self.logger.debug(ppp("Sending packet:", echo_packet))
1115             self.pg0.add_stream(echo_packet)
1116             self.pg_start()
1117         for dummy in range(echo_packet_count):
1118             p = self.pg0.wait_for_packet(1)
1119             self.logger.debug(ppp("Got packet:", p))
1120             ether = p[Ether]
1121             self.assert_equal(self.pg0.remote_mac,
1122                               ether.dst, "Destination MAC")
1123             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1124             ip = p[IP]
1125             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1126             self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1127             udp = p[UDP]
1128             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1129                               "UDP destination port")
1130             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1131             udp_sport_rx += 1
1132             # need to compare the hex payload here, otherwise BFD_vpp_echo
1133             # gets in way
1134             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1135                              scapy.compat.raw(echo_packet[UDP].payload),
1136                              "Received packet is not the echo packet sent")
1137         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1138                           "ECHO packet identifier for test purposes)")
1139
1140     def test_echo(self):
1141         """ echo function """
1142         bfd_session_up(self)
1143         self.test_session.update(required_min_echo_rx=150000)
1144         self.test_session.send_packet()
1145         detection_time = self.test_session.detect_mult *\
1146             self.vpp_session.required_min_rx / USEC_IN_SEC
1147         # echo shouldn't work without echo source set
1148         for dummy in range(10):
1149             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1150             self.sleep(sleep, "delay before sending bfd packet")
1151             self.test_session.send_packet()
1152         p = wait_for_bfd_packet(
1153             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1154         self.assert_equal(p[BFD].required_min_rx_interval,
1155                           self.vpp_session.required_min_rx,
1156                           "BFD required min rx interval")
1157         self.test_session.send_packet()
1158         self.vapi.bfd_udp_set_echo_source(
1159             sw_if_index=self.loopback0.sw_if_index)
1160         echo_seen = False
1161         # should be turned on - loopback echo packets
1162         for dummy in range(3):
1163             loop_until = time.time() + 0.75 * detection_time
1164             while time.time() < loop_until:
1165                 p = self.pg0.wait_for_packet(1)
1166                 self.logger.debug(ppp("Got packet:", p))
1167                 if p[UDP].dport == BFD.udp_dport_echo:
1168                     self.assert_equal(
1169                         p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1170                     self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1171                                         "BFD ECHO src IP equal to loopback IP")
1172                     self.logger.debug(ppp("Looping back packet:", p))
1173                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1174                                       "ECHO packet destination MAC address")
1175                     p[Ether].dst = self.pg0.local_mac
1176                     self.pg0.add_stream(p)
1177                     self.pg_start()
1178                     echo_seen = True
1179                 elif p.haslayer(BFD):
1180                     if echo_seen:
1181                         self.assertGreaterEqual(
1182                             p[BFD].required_min_rx_interval,
1183                             1000000)
1184                     if "P" in p.sprintf("%BFD.flags%"):
1185                         final = self.test_session.create_packet()
1186                         final[BFD].flags = "F"
1187                         self.test_session.send_packet(final)
1188                 else:
1189                     raise Exception(ppp("Received unknown packet:", p))
1190
1191                 self.assert_equal(len(self.vapi.collect_events()), 0,
1192                                   "number of bfd events")
1193             self.test_session.send_packet()
1194         self.assertTrue(echo_seen, "No echo packets received")
1195
1196     def test_echo_fail(self):
1197         """ session goes down if echo function fails """
1198         bfd_session_up(self)
1199         self.test_session.update(required_min_echo_rx=150000)
1200         self.test_session.send_packet()
1201         detection_time = self.test_session.detect_mult *\
1202             self.vpp_session.required_min_rx / USEC_IN_SEC
1203         self.vapi.bfd_udp_set_echo_source(
1204             sw_if_index=self.loopback0.sw_if_index)
1205         # echo function should be used now, but we will drop the echo packets
1206         verified_diag = False
1207         for dummy in range(3):
1208             loop_until = time.time() + 0.75 * detection_time
1209             while time.time() < loop_until:
1210                 p = self.pg0.wait_for_packet(1)
1211                 self.logger.debug(ppp("Got packet:", p))
1212                 if p[UDP].dport == BFD.udp_dport_echo:
1213                     # dropped
1214                     pass
1215                 elif p.haslayer(BFD):
1216                     if "P" in p.sprintf("%BFD.flags%"):
1217                         self.assertGreaterEqual(
1218                             p[BFD].required_min_rx_interval,
1219                             1000000)
1220                         final = self.test_session.create_packet()
1221                         final[BFD].flags = "F"
1222                         self.test_session.send_packet(final)
1223                     if p[BFD].state == BFDState.down:
1224                         self.assert_equal(p[BFD].diag,
1225                                           BFDDiagCode.echo_function_failed,
1226                                           BFDDiagCode)
1227                         verified_diag = True
1228                 else:
1229                     raise Exception(ppp("Received unknown packet:", p))
1230             self.test_session.send_packet()
1231         events = self.vapi.collect_events()
1232         self.assert_equal(len(events), 1, "number of bfd events")
1233         self.assert_equal(events[0].state, BFDState.down, BFDState)
1234         self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1235
1236     def test_echo_stop(self):
1237         """ echo function stops if peer sets required min echo rx zero """
1238         bfd_session_up(self)
1239         self.test_session.update(required_min_echo_rx=150000)
1240         self.test_session.send_packet()
1241         self.vapi.bfd_udp_set_echo_source(
1242             sw_if_index=self.loopback0.sw_if_index)
1243         # wait for first echo packet
1244         while True:
1245             p = self.pg0.wait_for_packet(1)
1246             self.logger.debug(ppp("Got packet:", p))
1247             if p[UDP].dport == BFD.udp_dport_echo:
1248                 self.logger.debug(ppp("Looping back packet:", p))
1249                 p[Ether].dst = self.pg0.local_mac
1250                 self.pg0.add_stream(p)
1251                 self.pg_start()
1252                 break
1253             elif p.haslayer(BFD):
1254                 # ignore BFD
1255                 pass
1256             else:
1257                 raise Exception(ppp("Received unknown packet:", p))
1258         self.test_session.update(required_min_echo_rx=0)
1259         self.test_session.send_packet()
1260         # echo packets shouldn't arrive anymore
1261         for dummy in range(5):
1262             wait_for_bfd_packet(
1263                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1264             self.test_session.send_packet()
1265             events = self.vapi.collect_events()
1266             self.assert_equal(len(events), 0, "number of bfd events")
1267
1268     def test_echo_source_removed(self):
1269         """ echo function stops if echo source is removed """
1270         bfd_session_up(self)
1271         self.test_session.update(required_min_echo_rx=150000)
1272         self.test_session.send_packet()
1273         self.vapi.bfd_udp_set_echo_source(
1274             sw_if_index=self.loopback0.sw_if_index)
1275         # wait for first echo packet
1276         while True:
1277             p = self.pg0.wait_for_packet(1)
1278             self.logger.debug(ppp("Got packet:", p))
1279             if p[UDP].dport == BFD.udp_dport_echo:
1280                 self.logger.debug(ppp("Looping back packet:", p))
1281                 p[Ether].dst = self.pg0.local_mac
1282                 self.pg0.add_stream(p)
1283                 self.pg_start()
1284                 break
1285             elif p.haslayer(BFD):
1286                 # ignore BFD
1287                 pass
1288             else:
1289                 raise Exception(ppp("Received unknown packet:", p))
1290         self.vapi.bfd_udp_del_echo_source()
1291         self.test_session.send_packet()
1292         # echo packets shouldn't arrive anymore
1293         for dummy in range(5):
1294             wait_for_bfd_packet(
1295                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1296             self.test_session.send_packet()
1297             events = self.vapi.collect_events()
1298             self.assert_equal(len(events), 0, "number of bfd events")
1299
1300     def test_stale_echo(self):
1301         """ stale echo packets don't keep a session up """
1302         bfd_session_up(self)
1303         self.test_session.update(required_min_echo_rx=150000)
1304         self.vapi.bfd_udp_set_echo_source(
1305             sw_if_index=self.loopback0.sw_if_index)
1306         self.test_session.send_packet()
1307         # should be turned on - loopback echo packets
1308         echo_packet = None
1309         timeout_at = None
1310         timeout_ok = False
1311         for dummy in range(10 * self.vpp_session.detect_mult):
1312             p = self.pg0.wait_for_packet(1)
1313             if p[UDP].dport == BFD.udp_dport_echo:
1314                 if echo_packet is None:
1315                     self.logger.debug(ppp("Got first echo packet:", p))
1316                     echo_packet = p
1317                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1318                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1319                 else:
1320                     self.logger.debug(ppp("Got followup echo packet:", p))
1321                 self.logger.debug(ppp("Looping back first echo packet:", p))
1322                 echo_packet[Ether].dst = self.pg0.local_mac
1323                 self.pg0.add_stream(echo_packet)
1324                 self.pg_start()
1325             elif p.haslayer(BFD):
1326                 self.logger.debug(ppp("Got packet:", p))
1327                 if "P" in p.sprintf("%BFD.flags%"):
1328                     final = self.test_session.create_packet()
1329                     final[BFD].flags = "F"
1330                     self.test_session.send_packet(final)
1331                 if p[BFD].state == BFDState.down:
1332                     self.assertIsNotNone(
1333                         timeout_at,
1334                         "Session went down before first echo packet received")
1335                     now = time.time()
1336                     self.assertGreaterEqual(
1337                         now, timeout_at,
1338                         "Session timeout at %s, but is expected at %s" %
1339                         (now, timeout_at))
1340                     self.assert_equal(p[BFD].diag,
1341                                       BFDDiagCode.echo_function_failed,
1342                                       BFDDiagCode)
1343                     events = self.vapi.collect_events()
1344                     self.assert_equal(len(events), 1, "number of bfd events")
1345                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1346                     timeout_ok = True
1347                     break
1348             else:
1349                 raise Exception(ppp("Received unknown packet:", p))
1350             self.test_session.send_packet()
1351         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1352
1353     def test_invalid_echo_checksum(self):
1354         """ echo packets with invalid checksum don't keep a session up """
1355         bfd_session_up(self)
1356         self.test_session.update(required_min_echo_rx=150000)
1357         self.vapi.bfd_udp_set_echo_source(
1358             sw_if_index=self.loopback0.sw_if_index)
1359         self.test_session.send_packet()
1360         # should be turned on - loopback echo packets
1361         timeout_at = None
1362         timeout_ok = False
1363         for dummy in range(10 * self.vpp_session.detect_mult):
1364             p = self.pg0.wait_for_packet(1)
1365             if p[UDP].dport == BFD.udp_dport_echo:
1366                 self.logger.debug(ppp("Got echo packet:", p))
1367                 if timeout_at is None:
1368                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1369                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1370                 p[BFD_vpp_echo].checksum = getrandbits(64)
1371                 p[Ether].dst = self.pg0.local_mac
1372                 self.logger.debug(ppp("Looping back modified echo packet:", p))
1373                 self.pg0.add_stream(p)
1374                 self.pg_start()
1375             elif p.haslayer(BFD):
1376                 self.logger.debug(ppp("Got packet:", p))
1377                 if "P" in p.sprintf("%BFD.flags%"):
1378                     final = self.test_session.create_packet()
1379                     final[BFD].flags = "F"
1380                     self.test_session.send_packet(final)
1381                 if p[BFD].state == BFDState.down:
1382                     self.assertIsNotNone(
1383                         timeout_at,
1384                         "Session went down before first echo packet received")
1385                     now = time.time()
1386                     self.assertGreaterEqual(
1387                         now, timeout_at,
1388                         "Session timeout at %s, but is expected at %s" %
1389                         (now, timeout_at))
1390                     self.assert_equal(p[BFD].diag,
1391                                       BFDDiagCode.echo_function_failed,
1392                                       BFDDiagCode)
1393                     events = self.vapi.collect_events()
1394                     self.assert_equal(len(events), 1, "number of bfd events")
1395                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1396                     timeout_ok = True
1397                     break
1398             else:
1399                 raise Exception(ppp("Received unknown packet:", p))
1400             self.test_session.send_packet()
1401         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1402
1403     def test_admin_up_down(self):
1404         """ put session admin-up and admin-down """
1405         bfd_session_up(self)
1406         self.vpp_session.admin_down()
1407         self.pg0.enable_capture()
1408         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1409         verify_event(self, e, expected_state=BFDState.admin_down)
1410         for dummy in range(2):
1411             p = wait_for_bfd_packet(self)
1412             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1413         # try to bring session up - shouldn't be possible
1414         self.test_session.update(state=BFDState.init)
1415         self.test_session.send_packet()
1416         for dummy in range(2):
1417             p = wait_for_bfd_packet(self)
1418             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1419         self.vpp_session.admin_up()
1420         self.test_session.update(state=BFDState.down)
1421         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1422         verify_event(self, e, expected_state=BFDState.down)
1423         p = wait_for_bfd_packet(
1424             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1425         self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1426         self.test_session.send_packet()
1427         p = wait_for_bfd_packet(
1428             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1429         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1430         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1431         verify_event(self, e, expected_state=BFDState.init)
1432         self.test_session.update(state=BFDState.up)
1433         self.test_session.send_packet()
1434         p = wait_for_bfd_packet(
1435             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1436         self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1437         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1438         verify_event(self, e, expected_state=BFDState.up)
1439
1440     def test_config_change_remote_demand(self):
1441         """ configuration change while peer in demand mode """
1442         bfd_session_up(self)
1443         demand = self.test_session.create_packet()
1444         demand[BFD].flags = "D"
1445         self.test_session.send_packet(demand)
1446         self.vpp_session.modify_parameters(
1447             required_min_rx=2 * self.vpp_session.required_min_rx)
1448         p = wait_for_bfd_packet(
1449             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1450         # poll bit must be set
1451         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1452         # terminate poll sequence
1453         final = self.test_session.create_packet()
1454         final[BFD].flags = "D+F"
1455         self.test_session.send_packet(final)
1456         # vpp should be quiet now again
1457         transmit_time = 0.9 \
1458             * max(self.vpp_session.required_min_rx,
1459                   self.test_session.desired_min_tx) \
1460             / USEC_IN_SEC
1461         count = 0
1462         for dummy in range(self.test_session.detect_mult * 2):
1463             self.sleep(transmit_time)
1464             self.test_session.send_packet(demand)
1465             try:
1466                 p = wait_for_bfd_packet(self, timeout=0)
1467                 self.logger.error(ppp("Received unexpected packet:", p))
1468                 count += 1
1469             except CaptureTimeoutError:
1470                 pass
1471         events = self.vapi.collect_events()
1472         for e in events:
1473             self.logger.error("Received unexpected event: %s", e)
1474         self.assert_equal(count, 0, "number of packets received")
1475         self.assert_equal(len(events), 0, "number of events received")
1476
1477     def test_intf_deleted(self):
1478         """ interface with bfd session deleted """
1479         intf = VppLoInterface(self)
1480         intf.config_ip4()
1481         intf.admin_up()
1482         sw_if_index = intf.sw_if_index
1483         vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1484         vpp_session.add_vpp_config()
1485         vpp_session.admin_up()
1486         intf.remove_vpp_config()
1487         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1488         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1489         self.assertFalse(vpp_session.query_vpp_config())
1490
1491
1492 class BFD6TestCase(VppTestCase):
1493     """Bidirectional Forwarding Detection (BFD) (IPv6) """
1494
1495     pg0 = None
1496     vpp_clock_offset = None
1497     vpp_session = None
1498     test_session = None
1499
1500     @classmethod
1501     def force_solo(cls):
1502         return True
1503
1504     @classmethod
1505     def setUpClass(cls):
1506         super(BFD6TestCase, cls).setUpClass()
1507         cls.vapi.cli("set log class bfd level debug")
1508         try:
1509             cls.create_pg_interfaces([0])
1510             cls.pg0.config_ip6()
1511             cls.pg0.configure_ipv6_neighbors()
1512             cls.pg0.admin_up()
1513             cls.pg0.resolve_ndp()
1514             cls.create_loopback_interfaces(1)
1515             cls.loopback0 = cls.lo_interfaces[0]
1516             cls.loopback0.config_ip6()
1517             cls.loopback0.admin_up()
1518
1519         except Exception:
1520             super(BFD6TestCase, cls).tearDownClass()
1521             raise
1522
1523     @classmethod
1524     def tearDownClass(cls):
1525         super(BFD6TestCase, cls).tearDownClass()
1526
1527     def setUp(self):
1528         super(BFD6TestCase, self).setUp()
1529         self.factory = AuthKeyFactory()
1530         self.vapi.want_bfd_events()
1531         self.pg0.enable_capture()
1532         try:
1533             self.vpp_session = VppBFDUDPSession(self, self.pg0,
1534                                                 self.pg0.remote_ip6,
1535                                                 af=AF_INET6)
1536             self.vpp_session.add_vpp_config()
1537             self.vpp_session.admin_up()
1538             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1539             self.logger.debug(self.vapi.cli("show adj nbr"))
1540         except BaseException:
1541             self.vapi.want_bfd_events(enable_disable=0)
1542             raise
1543
1544     def tearDown(self):
1545         if not self.vpp_dead:
1546             self.vapi.want_bfd_events(enable_disable=0)
1547         self.vapi.collect_events()  # clear the event queue
1548         super(BFD6TestCase, self).tearDown()
1549
1550     def test_session_up(self):
1551         """ bring BFD session up """
1552         bfd_session_up(self)
1553
1554     def test_session_up_by_ip(self):
1555         """ bring BFD session up - first frame looked up by address pair """
1556         self.logger.info("BFD: Sending Slow control frame")
1557         self.test_session.update(my_discriminator=randint(0, 40000000))
1558         self.test_session.send_packet()
1559         self.pg0.enable_capture()
1560         p = self.pg0.wait_for_packet(1)
1561         self.assert_equal(p[BFD].your_discriminator,
1562                           self.test_session.my_discriminator,
1563                           "BFD - your discriminator")
1564         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1565         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1566                                  state=BFDState.up)
1567         self.logger.info("BFD: Waiting for event")
1568         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1569         verify_event(self, e, expected_state=BFDState.init)
1570         self.logger.info("BFD: Sending Up")
1571         self.test_session.send_packet()
1572         self.logger.info("BFD: Waiting for event")
1573         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1574         verify_event(self, e, expected_state=BFDState.up)
1575         self.logger.info("BFD: Session is Up")
1576         self.test_session.update(state=BFDState.up)
1577         self.test_session.send_packet()
1578         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1579
1580     def test_hold_up(self):
1581         """ hold BFD session up """
1582         bfd_session_up(self)
1583         for dummy in range(self.test_session.detect_mult * 2):
1584             wait_for_bfd_packet(self)
1585             self.test_session.send_packet()
1586         self.assert_equal(len(self.vapi.collect_events()), 0,
1587                           "number of bfd events")
1588         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1589
1590     def test_echo_looped_back(self):
1591         """ echo packets looped back """
1592         # don't need a session in this case..
1593         self.vpp_session.remove_vpp_config()
1594         self.pg0.enable_capture()
1595         echo_packet_count = 10
1596         # random source port low enough to increment a few times..
1597         udp_sport_tx = randint(1, 50000)
1598         udp_sport_rx = udp_sport_tx
1599         echo_packet = (Ether(src=self.pg0.remote_mac,
1600                              dst=self.pg0.local_mac) /
1601                        IPv6(src=self.pg0.remote_ip6,
1602                             dst=self.pg0.remote_ip6) /
1603                        UDP(dport=BFD.udp_dport_echo) /
1604                        Raw("this should be looped back"))
1605         for dummy in range(echo_packet_count):
1606             self.sleep(.01, "delay between echo packets")
1607             echo_packet[UDP].sport = udp_sport_tx
1608             udp_sport_tx += 1
1609             self.logger.debug(ppp("Sending packet:", echo_packet))
1610             self.pg0.add_stream(echo_packet)
1611             self.pg_start()
1612         for dummy in range(echo_packet_count):
1613             p = self.pg0.wait_for_packet(1)
1614             self.logger.debug(ppp("Got packet:", p))
1615             ether = p[Ether]
1616             self.assert_equal(self.pg0.remote_mac,
1617                               ether.dst, "Destination MAC")
1618             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1619             ip = p[IPv6]
1620             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1621             self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1622             udp = p[UDP]
1623             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1624                               "UDP destination port")
1625             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1626             udp_sport_rx += 1
1627             # need to compare the hex payload here, otherwise BFD_vpp_echo
1628             # gets in way
1629             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1630                              scapy.compat.raw(echo_packet[UDP].payload),
1631                              "Received packet is not the echo packet sent")
1632         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1633                           "ECHO packet identifier for test purposes)")
1634         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1635                           "ECHO packet identifier for test purposes)")
1636
1637     def test_echo(self):
1638         """ echo function """
1639         bfd_session_up(self)
1640         self.test_session.update(required_min_echo_rx=150000)
1641         self.test_session.send_packet()
1642         detection_time = self.test_session.detect_mult *\
1643             self.vpp_session.required_min_rx / USEC_IN_SEC
1644         # echo shouldn't work without echo source set
1645         for dummy in range(10):
1646             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1647             self.sleep(sleep, "delay before sending bfd packet")
1648             self.test_session.send_packet()
1649         p = wait_for_bfd_packet(
1650             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1651         self.assert_equal(p[BFD].required_min_rx_interval,
1652                           self.vpp_session.required_min_rx,
1653                           "BFD required min rx interval")
1654         self.test_session.send_packet()
1655         self.vapi.bfd_udp_set_echo_source(
1656             sw_if_index=self.loopback0.sw_if_index)
1657         echo_seen = False
1658         # should be turned on - loopback echo packets
1659         for dummy in range(3):
1660             loop_until = time.time() + 0.75 * detection_time
1661             while time.time() < loop_until:
1662                 p = self.pg0.wait_for_packet(1)
1663                 self.logger.debug(ppp("Got packet:", p))
1664                 if p[UDP].dport == BFD.udp_dport_echo:
1665                     self.assert_equal(
1666                         p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1667                     self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1668                                         "BFD ECHO src IP equal to loopback IP")
1669                     self.logger.debug(ppp("Looping back packet:", p))
1670                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1671                                       "ECHO packet destination MAC address")
1672                     p[Ether].dst = self.pg0.local_mac
1673                     self.pg0.add_stream(p)
1674                     self.pg_start()
1675                     echo_seen = True
1676                 elif p.haslayer(BFD):
1677                     if echo_seen:
1678                         self.assertGreaterEqual(
1679                             p[BFD].required_min_rx_interval,
1680                             1000000)
1681                     if "P" in p.sprintf("%BFD.flags%"):
1682                         final = self.test_session.create_packet()
1683                         final[BFD].flags = "F"
1684                         self.test_session.send_packet(final)
1685                 else:
1686                     raise Exception(ppp("Received unknown packet:", p))
1687
1688                 self.assert_equal(len(self.vapi.collect_events()), 0,
1689                                   "number of bfd events")
1690             self.test_session.send_packet()
1691         self.assertTrue(echo_seen, "No echo packets received")
1692
1693     def test_intf_deleted(self):
1694         """ interface with bfd session deleted """
1695         intf = VppLoInterface(self)
1696         intf.config_ip6()
1697         intf.admin_up()
1698         sw_if_index = intf.sw_if_index
1699         vpp_session = VppBFDUDPSession(
1700             self, intf, intf.remote_ip6, af=AF_INET6)
1701         vpp_session.add_vpp_config()
1702         vpp_session.admin_up()
1703         intf.remove_vpp_config()
1704         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1705         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1706         self.assertFalse(vpp_session.query_vpp_config())
1707
1708
1709 class BFDFIBTestCase(VppTestCase):
1710     """ BFD-FIB interactions (IPv6) """
1711
1712     vpp_session = None
1713     test_session = None
1714
1715     @classmethod
1716     def force_solo(cls):
1717         return True
1718
1719     @classmethod
1720     def setUpClass(cls):
1721         super(BFDFIBTestCase, cls).setUpClass()
1722
1723     @classmethod
1724     def tearDownClass(cls):
1725         super(BFDFIBTestCase, cls).tearDownClass()
1726
1727     def setUp(self):
1728         super(BFDFIBTestCase, self).setUp()
1729         self.create_pg_interfaces(range(1))
1730
1731         self.vapi.want_bfd_events()
1732         self.pg0.enable_capture()
1733
1734         for i in self.pg_interfaces:
1735             i.admin_up()
1736             i.config_ip6()
1737             i.configure_ipv6_neighbors()
1738
1739     def tearDown(self):
1740         if not self.vpp_dead:
1741             self.vapi.want_bfd_events(enable_disable=False)
1742
1743         super(BFDFIBTestCase, self).tearDown()
1744
1745     @staticmethod
1746     def pkt_is_not_data_traffic(p):
1747         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1748         if p.haslayer(BFD) or is_ipv6_misc(p):
1749             return True
1750         return False
1751
1752     def test_session_with_fib(self):
1753         """ BFD-FIB interactions """
1754
1755         # packets to match against both of the routes
1756         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1757               IPv6(src="3001::1", dst="2001::1") /
1758               UDP(sport=1234, dport=1234) /
1759               Raw(b'\xa5' * 100)),
1760              (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1761               IPv6(src="3001::1", dst="2002::1") /
1762               UDP(sport=1234, dport=1234) /
1763               Raw(b'\xa5' * 100))]
1764
1765         # A recursive and a non-recursive route via a next-hop that
1766         # will have a BFD session
1767         ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1768                                   [VppRoutePath(self.pg0.remote_ip6,
1769                                                 self.pg0.sw_if_index)])
1770         ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1771                                   [VppRoutePath(self.pg0.remote_ip6,
1772                                                 0xffffffff)])
1773         ip_2001_s_64.add_vpp_config()
1774         ip_2002_s_64.add_vpp_config()
1775
1776         # bring the session up now the routes are present
1777         self.vpp_session = VppBFDUDPSession(self,
1778                                             self.pg0,
1779                                             self.pg0.remote_ip6,
1780                                             af=AF_INET6)
1781         self.vpp_session.add_vpp_config()
1782         self.vpp_session.admin_up()
1783         self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1784
1785         # session is up - traffic passes
1786         bfd_session_up(self)
1787
1788         self.pg0.add_stream(p)
1789         self.pg_start()
1790         for packet in p:
1791             captured = self.pg0.wait_for_packet(
1792                 1,
1793                 filter_out_fn=self.pkt_is_not_data_traffic)
1794             self.assertEqual(captured[IPv6].dst,
1795                              packet[IPv6].dst)
1796
1797         # session is up - traffic is dropped
1798         bfd_session_down(self)
1799
1800         self.pg0.add_stream(p)
1801         self.pg_start()
1802         with self.assertRaises(CaptureTimeoutError):
1803             self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1804
1805         # session is up - traffic passes
1806         bfd_session_up(self)
1807
1808         self.pg0.add_stream(p)
1809         self.pg_start()
1810         for packet in p:
1811             captured = self.pg0.wait_for_packet(
1812                 1,
1813                 filter_out_fn=self.pkt_is_not_data_traffic)
1814             self.assertEqual(captured[IPv6].dst,
1815                              packet[IPv6].dst)
1816
1817
1818 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1819 class BFDTunTestCase(VppTestCase):
1820     """ BFD over GRE tunnel """
1821
1822     vpp_session = None
1823     test_session = None
1824
1825     @classmethod
1826     def setUpClass(cls):
1827         super(BFDTunTestCase, cls).setUpClass()
1828
1829     @classmethod
1830     def tearDownClass(cls):
1831         super(BFDTunTestCase, cls).tearDownClass()
1832
1833     def setUp(self):
1834         super(BFDTunTestCase, self).setUp()
1835         self.create_pg_interfaces(range(1))
1836
1837         self.vapi.want_bfd_events()
1838         self.pg0.enable_capture()
1839
1840         for i in self.pg_interfaces:
1841             i.admin_up()
1842             i.config_ip4()
1843             i.resolve_arp()
1844
1845     def tearDown(self):
1846         if not self.vpp_dead:
1847             self.vapi.want_bfd_events(enable_disable=0)
1848
1849         super(BFDTunTestCase, self).tearDown()
1850
1851     @staticmethod
1852     def pkt_is_not_data_traffic(p):
1853         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1854         if p.haslayer(BFD) or is_ipv6_misc(p):
1855             return True
1856         return False
1857
1858     def test_bfd_o_gre(self):
1859         """ BFD-o-GRE  """
1860
1861         # A GRE interface over which to run a BFD session
1862         gre_if = VppGreInterface(self,
1863                                  self.pg0.local_ip4,
1864                                  self.pg0.remote_ip4)
1865         gre_if.add_vpp_config()
1866         gre_if.admin_up()
1867         gre_if.config_ip4()
1868
1869         # bring the session up now the routes are present
1870         self.vpp_session = VppBFDUDPSession(self,
1871                                             gre_if,
1872                                             gre_if.remote_ip4,
1873                                             is_tunnel=True)
1874         self.vpp_session.add_vpp_config()
1875         self.vpp_session.admin_up()
1876
1877         self.test_session = BFDTestSession(
1878             self, gre_if, AF_INET,
1879             tunnel_header=(IP(src=self.pg0.remote_ip4,
1880                               dst=self.pg0.local_ip4) /
1881                            GRE()),
1882             phy_interface=self.pg0)
1883
1884         # packets to match against both of the routes
1885         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1886               IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4) /
1887               UDP(sport=1234, dport=1234) /
1888               Raw(b'\xa5' * 100))]
1889
1890         # session is up - traffic passes
1891         bfd_session_up(self)
1892
1893         self.send_and_expect(self.pg0, p, self.pg0)
1894
1895         # bring session down
1896         bfd_session_down(self)
1897
1898
1899 class BFDSHA1TestCase(VppTestCase):
1900     """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1901
1902     pg0 = None
1903     vpp_clock_offset = None
1904     vpp_session = None
1905     test_session = None
1906
1907     @classmethod
1908     def force_solo(cls):
1909         return True
1910
1911     @classmethod
1912     def setUpClass(cls):
1913         super(BFDSHA1TestCase, cls).setUpClass()
1914         cls.vapi.cli("set log class bfd level debug")
1915         try:
1916             cls.create_pg_interfaces([0])
1917             cls.pg0.config_ip4()
1918             cls.pg0.admin_up()
1919             cls.pg0.resolve_arp()
1920
1921         except Exception:
1922             super(BFDSHA1TestCase, cls).tearDownClass()
1923             raise
1924
1925     @classmethod
1926     def tearDownClass(cls):
1927         super(BFDSHA1TestCase, cls).tearDownClass()
1928
1929     def setUp(self):
1930         super(BFDSHA1TestCase, self).setUp()
1931         self.factory = AuthKeyFactory()
1932         self.vapi.want_bfd_events()
1933         self.pg0.enable_capture()
1934
1935     def tearDown(self):
1936         if not self.vpp_dead:
1937             self.vapi.want_bfd_events(enable_disable=False)
1938         self.vapi.collect_events()  # clear the event queue
1939         super(BFDSHA1TestCase, self).tearDown()
1940
1941     def test_session_up(self):
1942         """ bring BFD session up """
1943         key = self.factory.create_random_key(self)
1944         key.add_vpp_config()
1945         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1946                                             self.pg0.remote_ip4,
1947                                             sha1_key=key)
1948         self.vpp_session.add_vpp_config()
1949         self.vpp_session.admin_up()
1950         self.test_session = BFDTestSession(
1951             self, self.pg0, AF_INET, sha1_key=key,
1952             bfd_key_id=self.vpp_session.bfd_key_id)
1953         bfd_session_up(self)
1954
1955     def test_hold_up(self):
1956         """ hold BFD session up """
1957         key = self.factory.create_random_key(self)
1958         key.add_vpp_config()
1959         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1960                                             self.pg0.remote_ip4,
1961                                             sha1_key=key)
1962         self.vpp_session.add_vpp_config()
1963         self.vpp_session.admin_up()
1964         self.test_session = BFDTestSession(
1965             self, self.pg0, AF_INET, sha1_key=key,
1966             bfd_key_id=self.vpp_session.bfd_key_id)
1967         bfd_session_up(self)
1968         for dummy in range(self.test_session.detect_mult * 2):
1969             wait_for_bfd_packet(self)
1970             self.test_session.send_packet()
1971         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1972
1973     def test_hold_up_meticulous(self):
1974         """ hold BFD session up - meticulous auth """
1975         key = self.factory.create_random_key(
1976             self, BFDAuthType.meticulous_keyed_sha1)
1977         key.add_vpp_config()
1978         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1979                                             self.pg0.remote_ip4, sha1_key=key)
1980         self.vpp_session.add_vpp_config()
1981         self.vpp_session.admin_up()
1982         # specify sequence number so that it wraps
1983         self.test_session = BFDTestSession(
1984             self, self.pg0, AF_INET, sha1_key=key,
1985             bfd_key_id=self.vpp_session.bfd_key_id,
1986             our_seq_number=0xFFFFFFFF - 4)
1987         bfd_session_up(self)
1988         for dummy in range(30):
1989             wait_for_bfd_packet(self)
1990             self.test_session.inc_seq_num()
1991             self.test_session.send_packet()
1992         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1993
1994     def test_send_bad_seq_number(self):
1995         """ session is not kept alive by msgs with bad sequence numbers"""
1996         key = self.factory.create_random_key(
1997             self, BFDAuthType.meticulous_keyed_sha1)
1998         key.add_vpp_config()
1999         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2000                                             self.pg0.remote_ip4, sha1_key=key)
2001         self.vpp_session.add_vpp_config()
2002         self.test_session = BFDTestSession(
2003             self, self.pg0, AF_INET, sha1_key=key,
2004             bfd_key_id=self.vpp_session.bfd_key_id)
2005         bfd_session_up(self)
2006         detection_time = self.test_session.detect_mult *\
2007             self.vpp_session.required_min_rx / USEC_IN_SEC
2008         send_until = time.time() + 2 * detection_time
2009         while time.time() < send_until:
2010             self.test_session.send_packet()
2011             self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
2012                        "time between bfd packets")
2013         e = self.vapi.collect_events()
2014         # session should be down now, because the sequence numbers weren't
2015         # updated
2016         self.assert_equal(len(e), 1, "number of bfd events")
2017         verify_event(self, e[0], expected_state=BFDState.down)
2018
2019     def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
2020                                        legitimate_test_session,
2021                                        rogue_test_session,
2022                                        rogue_bfd_values=None):
2023         """ execute a rogue session interaction scenario
2024
2025         1. create vpp session, add config
2026         2. bring the legitimate session up
2027         3. copy the bfd values from legitimate session to rogue session
2028         4. apply rogue_bfd_values to rogue session
2029         5. set rogue session state to down
2030         6. send message to take the session down from the rogue session
2031         7. assert that the legitimate session is unaffected
2032         """
2033
2034         self.vpp_session = vpp_bfd_udp_session
2035         self.vpp_session.add_vpp_config()
2036         self.test_session = legitimate_test_session
2037         # bring vpp session up
2038         bfd_session_up(self)
2039         # send packet from rogue session
2040         rogue_test_session.update(
2041             my_discriminator=self.test_session.my_discriminator,
2042             your_discriminator=self.test_session.your_discriminator,
2043             desired_min_tx=self.test_session.desired_min_tx,
2044             required_min_rx=self.test_session.required_min_rx,
2045             detect_mult=self.test_session.detect_mult,
2046             diag=self.test_session.diag,
2047             state=self.test_session.state,
2048             auth_type=self.test_session.auth_type)
2049         if rogue_bfd_values:
2050             rogue_test_session.update(**rogue_bfd_values)
2051         rogue_test_session.update(state=BFDState.down)
2052         rogue_test_session.send_packet()
2053         wait_for_bfd_packet(self)
2054         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2055
2056     def test_mismatch_auth(self):
2057         """ session is not brought down by unauthenticated msg """
2058         key = self.factory.create_random_key(self)
2059         key.add_vpp_config()
2060         vpp_session = VppBFDUDPSession(
2061             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2062         legitimate_test_session = BFDTestSession(
2063             self, self.pg0, AF_INET, sha1_key=key,
2064             bfd_key_id=vpp_session.bfd_key_id)
2065         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2066         self.execute_rogue_session_scenario(vpp_session,
2067                                             legitimate_test_session,
2068                                             rogue_test_session)
2069
2070     def test_mismatch_bfd_key_id(self):
2071         """ session is not brought down by msg with non-existent key-id """
2072         key = self.factory.create_random_key(self)
2073         key.add_vpp_config()
2074         vpp_session = VppBFDUDPSession(
2075             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2076         # pick a different random bfd key id
2077         x = randint(0, 255)
2078         while x == vpp_session.bfd_key_id:
2079             x = randint(0, 255)
2080         legitimate_test_session = BFDTestSession(
2081             self, self.pg0, AF_INET, sha1_key=key,
2082             bfd_key_id=vpp_session.bfd_key_id)
2083         rogue_test_session = BFDTestSession(
2084             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
2085         self.execute_rogue_session_scenario(vpp_session,
2086                                             legitimate_test_session,
2087                                             rogue_test_session)
2088
2089     def test_mismatched_auth_type(self):
2090         """ session is not brought down by msg with wrong auth type """
2091         key = self.factory.create_random_key(self)
2092         key.add_vpp_config()
2093         vpp_session = VppBFDUDPSession(
2094             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2095         legitimate_test_session = BFDTestSession(
2096             self, self.pg0, AF_INET, sha1_key=key,
2097             bfd_key_id=vpp_session.bfd_key_id)
2098         rogue_test_session = BFDTestSession(
2099             self, self.pg0, AF_INET, sha1_key=key,
2100             bfd_key_id=vpp_session.bfd_key_id)
2101         self.execute_rogue_session_scenario(
2102             vpp_session, legitimate_test_session, rogue_test_session,
2103             {'auth_type': BFDAuthType.keyed_md5})
2104
2105     def test_restart(self):
2106         """ simulate remote peer restart and resynchronization """
2107         key = self.factory.create_random_key(
2108             self, BFDAuthType.meticulous_keyed_sha1)
2109         key.add_vpp_config()
2110         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2111                                             self.pg0.remote_ip4, sha1_key=key)
2112         self.vpp_session.add_vpp_config()
2113         self.test_session = BFDTestSession(
2114             self, self.pg0, AF_INET, sha1_key=key,
2115             bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2116         bfd_session_up(self)
2117         # don't send any packets for 2*detection_time
2118         detection_time = self.test_session.detect_mult *\
2119             self.vpp_session.required_min_rx / USEC_IN_SEC
2120         self.sleep(2 * detection_time, "simulating peer restart")
2121         events = self.vapi.collect_events()
2122         self.assert_equal(len(events), 1, "number of bfd events")
2123         verify_event(self, events[0], expected_state=BFDState.down)
2124         self.test_session.update(state=BFDState.down)
2125         # reset sequence number
2126         self.test_session.our_seq_number = 0
2127         self.test_session.vpp_seq_number = None
2128         # now throw away any pending packets
2129         self.pg0.enable_capture()
2130         self.test_session.my_discriminator = 0
2131         bfd_session_up(self)
2132
2133
2134 class BFDAuthOnOffTestCase(VppTestCase):
2135     """Bidirectional Forwarding Detection (BFD) (changing auth) """
2136
2137     pg0 = None
2138     vpp_session = None
2139     test_session = None
2140
2141     @classmethod
2142     def force_solo(cls):
2143         return True
2144
2145     @classmethod
2146     def setUpClass(cls):
2147         super(BFDAuthOnOffTestCase, cls).setUpClass()
2148         cls.vapi.cli("set log class bfd level debug")
2149         try:
2150             cls.create_pg_interfaces([0])
2151             cls.pg0.config_ip4()
2152             cls.pg0.admin_up()
2153             cls.pg0.resolve_arp()
2154
2155         except Exception:
2156             super(BFDAuthOnOffTestCase, cls).tearDownClass()
2157             raise
2158
2159     @classmethod
2160     def tearDownClass(cls):
2161         super(BFDAuthOnOffTestCase, cls).tearDownClass()
2162
2163     def setUp(self):
2164         super(BFDAuthOnOffTestCase, self).setUp()
2165         self.factory = AuthKeyFactory()
2166         self.vapi.want_bfd_events()
2167         self.pg0.enable_capture()
2168
2169     def tearDown(self):
2170         if not self.vpp_dead:
2171             self.vapi.want_bfd_events(enable_disable=False)
2172         self.vapi.collect_events()  # clear the event queue
2173         super(BFDAuthOnOffTestCase, self).tearDown()
2174
2175     def test_auth_on_immediate(self):
2176         """ turn auth on without disturbing session state (immediate) """
2177         key = self.factory.create_random_key(self)
2178         key.add_vpp_config()
2179         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2180                                             self.pg0.remote_ip4)
2181         self.vpp_session.add_vpp_config()
2182         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2183         bfd_session_up(self)
2184         for dummy in range(self.test_session.detect_mult * 2):
2185             p = wait_for_bfd_packet(self)
2186             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2187             self.test_session.send_packet()
2188         self.vpp_session.activate_auth(key)
2189         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2190         self.test_session.sha1_key = key
2191         for dummy in range(self.test_session.detect_mult * 2):
2192             p = wait_for_bfd_packet(self)
2193             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2194             self.test_session.send_packet()
2195         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2196         self.assert_equal(len(self.vapi.collect_events()), 0,
2197                           "number of bfd events")
2198
2199     def test_auth_off_immediate(self):
2200         """ turn auth off without disturbing session state (immediate) """
2201         key = self.factory.create_random_key(self)
2202         key.add_vpp_config()
2203         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2204                                             self.pg0.remote_ip4, sha1_key=key)
2205         self.vpp_session.add_vpp_config()
2206         self.test_session = BFDTestSession(
2207             self, self.pg0, AF_INET, sha1_key=key,
2208             bfd_key_id=self.vpp_session.bfd_key_id)
2209         bfd_session_up(self)
2210         # self.vapi.want_bfd_events(enable_disable=0)
2211         for dummy in range(self.test_session.detect_mult * 2):
2212             p = wait_for_bfd_packet(self)
2213             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2214             self.test_session.inc_seq_num()
2215             self.test_session.send_packet()
2216         self.vpp_session.deactivate_auth()
2217         self.test_session.bfd_key_id = None
2218         self.test_session.sha1_key = None
2219         for dummy in range(self.test_session.detect_mult * 2):
2220             p = wait_for_bfd_packet(self)
2221             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2222             self.test_session.inc_seq_num()
2223             self.test_session.send_packet()
2224         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2225         self.assert_equal(len(self.vapi.collect_events()), 0,
2226                           "number of bfd events")
2227
2228     def test_auth_change_key_immediate(self):
2229         """ change auth key without disturbing session state (immediate) """
2230         key1 = self.factory.create_random_key(self)
2231         key1.add_vpp_config()
2232         key2 = self.factory.create_random_key(self)
2233         key2.add_vpp_config()
2234         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2235                                             self.pg0.remote_ip4, sha1_key=key1)
2236         self.vpp_session.add_vpp_config()
2237         self.test_session = BFDTestSession(
2238             self, self.pg0, AF_INET, sha1_key=key1,
2239             bfd_key_id=self.vpp_session.bfd_key_id)
2240         bfd_session_up(self)
2241         for dummy in range(self.test_session.detect_mult * 2):
2242             p = wait_for_bfd_packet(self)
2243             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2244             self.test_session.send_packet()
2245         self.vpp_session.activate_auth(key2)
2246         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2247         self.test_session.sha1_key = key2
2248         for dummy in range(self.test_session.detect_mult * 2):
2249             p = wait_for_bfd_packet(self)
2250             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2251             self.test_session.send_packet()
2252         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2253         self.assert_equal(len(self.vapi.collect_events()), 0,
2254                           "number of bfd events")
2255
2256     def test_auth_on_delayed(self):
2257         """ turn auth on without disturbing session state (delayed) """
2258         key = self.factory.create_random_key(self)
2259         key.add_vpp_config()
2260         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2261                                             self.pg0.remote_ip4)
2262         self.vpp_session.add_vpp_config()
2263         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2264         bfd_session_up(self)
2265         for dummy in range(self.test_session.detect_mult * 2):
2266             wait_for_bfd_packet(self)
2267             self.test_session.send_packet()
2268         self.vpp_session.activate_auth(key, delayed=True)
2269         for dummy in range(self.test_session.detect_mult * 2):
2270             p = wait_for_bfd_packet(self)
2271             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2272             self.test_session.send_packet()
2273         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2274         self.test_session.sha1_key = key
2275         self.test_session.send_packet()
2276         for dummy in range(self.test_session.detect_mult * 2):
2277             p = wait_for_bfd_packet(self)
2278             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2279             self.test_session.send_packet()
2280         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2281         self.assert_equal(len(self.vapi.collect_events()), 0,
2282                           "number of bfd events")
2283
2284     def test_auth_off_delayed(self):
2285         """ turn auth off without disturbing session state (delayed) """
2286         key = self.factory.create_random_key(self)
2287         key.add_vpp_config()
2288         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2289                                             self.pg0.remote_ip4, sha1_key=key)
2290         self.vpp_session.add_vpp_config()
2291         self.test_session = BFDTestSession(
2292             self, self.pg0, AF_INET, sha1_key=key,
2293             bfd_key_id=self.vpp_session.bfd_key_id)
2294         bfd_session_up(self)
2295         for dummy in range(self.test_session.detect_mult * 2):
2296             p = wait_for_bfd_packet(self)
2297             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2298             self.test_session.send_packet()
2299         self.vpp_session.deactivate_auth(delayed=True)
2300         for dummy in range(self.test_session.detect_mult * 2):
2301             p = wait_for_bfd_packet(self)
2302             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2303             self.test_session.send_packet()
2304         self.test_session.bfd_key_id = None
2305         self.test_session.sha1_key = None
2306         self.test_session.send_packet()
2307         for dummy in range(self.test_session.detect_mult * 2):
2308             p = wait_for_bfd_packet(self)
2309             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2310             self.test_session.send_packet()
2311         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2312         self.assert_equal(len(self.vapi.collect_events()), 0,
2313                           "number of bfd events")
2314
2315     def test_auth_change_key_delayed(self):
2316         """ change auth key without disturbing session state (delayed) """
2317         key1 = self.factory.create_random_key(self)
2318         key1.add_vpp_config()
2319         key2 = self.factory.create_random_key(self)
2320         key2.add_vpp_config()
2321         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2322                                             self.pg0.remote_ip4, sha1_key=key1)
2323         self.vpp_session.add_vpp_config()
2324         self.vpp_session.admin_up()
2325         self.test_session = BFDTestSession(
2326             self, self.pg0, AF_INET, sha1_key=key1,
2327             bfd_key_id=self.vpp_session.bfd_key_id)
2328         bfd_session_up(self)
2329         for dummy in range(self.test_session.detect_mult * 2):
2330             p = wait_for_bfd_packet(self)
2331             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2332             self.test_session.send_packet()
2333         self.vpp_session.activate_auth(key2, delayed=True)
2334         for dummy in range(self.test_session.detect_mult * 2):
2335             p = wait_for_bfd_packet(self)
2336             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2337             self.test_session.send_packet()
2338         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2339         self.test_session.sha1_key = key2
2340         self.test_session.send_packet()
2341         for dummy in range(self.test_session.detect_mult * 2):
2342             p = wait_for_bfd_packet(self)
2343             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2344             self.test_session.send_packet()
2345         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2346         self.assert_equal(len(self.vapi.collect_events()), 0,
2347                           "number of bfd events")
2348
2349
2350 class BFDCLITestCase(VppTestCase):
2351     """Bidirectional Forwarding Detection (BFD) (CLI) """
2352     pg0 = None
2353
2354     @classmethod
2355     def force_solo(cls):
2356         return True
2357
2358     @classmethod
2359     def setUpClass(cls):
2360         super(BFDCLITestCase, cls).setUpClass()
2361         cls.vapi.cli("set log class bfd level debug")
2362         try:
2363             cls.create_pg_interfaces((0,))
2364             cls.pg0.config_ip4()
2365             cls.pg0.config_ip6()
2366             cls.pg0.resolve_arp()
2367             cls.pg0.resolve_ndp()
2368
2369         except Exception:
2370             super(BFDCLITestCase, cls).tearDownClass()
2371             raise
2372
2373     @classmethod
2374     def tearDownClass(cls):
2375         super(BFDCLITestCase, cls).tearDownClass()
2376
2377     def setUp(self):
2378         super(BFDCLITestCase, self).setUp()
2379         self.factory = AuthKeyFactory()
2380         self.pg0.enable_capture()
2381
2382     def tearDown(self):
2383         try:
2384             self.vapi.want_bfd_events(enable_disable=False)
2385         except UnexpectedApiReturnValueError:
2386             # some tests aren't subscribed, so this is not an issue
2387             pass
2388         self.vapi.collect_events()  # clear the event queue
2389         super(BFDCLITestCase, self).tearDown()
2390
2391     def cli_verify_no_response(self, cli):
2392         """ execute a CLI, asserting that the response is empty """
2393         self.assert_equal(self.vapi.cli(cli),
2394                           "",
2395                           "CLI command response")
2396
2397     def cli_verify_response(self, cli, expected):
2398         """ execute a CLI, asserting that the response matches expectation """
2399         try:
2400             reply = self.vapi.cli(cli)
2401         except CliFailedCommandError as cli_error:
2402             reply = str(cli_error)
2403         self.assert_equal(reply.strip(),
2404                           expected,
2405                           "CLI command response")
2406
2407     def test_show(self):
2408         """ show commands """
2409         k1 = self.factory.create_random_key(self)
2410         k1.add_vpp_config()
2411         k2 = self.factory.create_random_key(
2412             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2413         k2.add_vpp_config()
2414         s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2415         s1.add_vpp_config()
2416         s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2417                               sha1_key=k2)
2418         s2.add_vpp_config()
2419         self.logger.info(self.vapi.ppcli("show bfd keys"))
2420         self.logger.info(self.vapi.ppcli("show bfd sessions"))
2421         self.logger.info(self.vapi.ppcli("show bfd"))
2422
2423     def test_set_del_sha1_key(self):
2424         """ set/delete SHA1 auth key """
2425         k = self.factory.create_random_key(self)
2426         self.registry.register(k, self.logger)
2427         self.cli_verify_no_response(
2428             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2429             (k.conf_key_id,
2430                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2431         self.assertTrue(k.query_vpp_config())
2432         self.vpp_session = VppBFDUDPSession(
2433             self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2434         self.vpp_session.add_vpp_config()
2435         self.test_session = \
2436             BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2437                            bfd_key_id=self.vpp_session.bfd_key_id)
2438         self.vapi.want_bfd_events()
2439         bfd_session_up(self)
2440         bfd_session_down(self)
2441         # try to replace the secret for the key - should fail because the key
2442         # is in-use
2443         k2 = self.factory.create_random_key(self)
2444         self.cli_verify_response(
2445             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2446             (k.conf_key_id,
2447                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2448             "bfd key set: `bfd_auth_set_key' API call failed, "
2449             "rv=-103:BFD object in use")
2450         # manipulating the session using old secret should still work
2451         bfd_session_up(self)
2452         bfd_session_down(self)
2453         self.vpp_session.remove_vpp_config()
2454         self.cli_verify_no_response(
2455             "bfd key del conf-key-id %s" % k.conf_key_id)
2456         self.assertFalse(k.query_vpp_config())
2457
2458     def test_set_del_meticulous_sha1_key(self):
2459         """ set/delete meticulous SHA1 auth key """
2460         k = self.factory.create_random_key(
2461             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2462         self.registry.register(k, self.logger)
2463         self.cli_verify_no_response(
2464             "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2465             (k.conf_key_id,
2466                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2467         self.assertTrue(k.query_vpp_config())
2468         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2469                                             self.pg0.remote_ip6, af=AF_INET6,
2470                                             sha1_key=k)
2471         self.vpp_session.add_vpp_config()
2472         self.vpp_session.admin_up()
2473         self.test_session = \
2474             BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2475                            bfd_key_id=self.vpp_session.bfd_key_id)
2476         self.vapi.want_bfd_events()
2477         bfd_session_up(self)
2478         bfd_session_down(self)
2479         # try to replace the secret for the key - should fail because the key
2480         # is in-use
2481         k2 = self.factory.create_random_key(self)
2482         self.cli_verify_response(
2483             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2484             (k.conf_key_id,
2485                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2486             "bfd key set: `bfd_auth_set_key' API call failed, "
2487             "rv=-103:BFD object in use")
2488         # manipulating the session using old secret should still work
2489         bfd_session_up(self)
2490         bfd_session_down(self)
2491         self.vpp_session.remove_vpp_config()
2492         self.cli_verify_no_response(
2493             "bfd key del conf-key-id %s" % k.conf_key_id)
2494         self.assertFalse(k.query_vpp_config())
2495
2496     def test_add_mod_del_bfd_udp(self):
2497         """ create/modify/delete IPv4 BFD UDP session """
2498         vpp_session = VppBFDUDPSession(
2499             self, self.pg0, self.pg0.remote_ip4)
2500         self.registry.register(vpp_session, self.logger)
2501         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2502             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2503             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2504                                 self.pg0.remote_ip4,
2505                                 vpp_session.desired_min_tx,
2506                                 vpp_session.required_min_rx,
2507                                 vpp_session.detect_mult)
2508         self.cli_verify_no_response(cli_add_cmd)
2509         # 2nd add should fail
2510         self.cli_verify_response(
2511             cli_add_cmd,
2512             "bfd udp session add: `bfd_add_add_session' API call"
2513             " failed, rv=-101:Duplicate BFD object")
2514         verify_bfd_session_config(self, vpp_session)
2515         mod_session = VppBFDUDPSession(
2516             self, self.pg0, self.pg0.remote_ip4,
2517             required_min_rx=2 * vpp_session.required_min_rx,
2518             desired_min_tx=3 * vpp_session.desired_min_tx,
2519             detect_mult=4 * vpp_session.detect_mult)
2520         self.cli_verify_no_response(
2521             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2522             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2523             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2524              mod_session.desired_min_tx, mod_session.required_min_rx,
2525              mod_session.detect_mult))
2526         verify_bfd_session_config(self, mod_session)
2527         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2528             "peer-addr %s" % (self.pg0.name,
2529                               self.pg0.local_ip4, self.pg0.remote_ip4)
2530         self.cli_verify_no_response(cli_del_cmd)
2531         # 2nd del is expected to fail
2532         self.cli_verify_response(
2533             cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2534             " failed, rv=-102:No such BFD object")
2535         self.assertFalse(vpp_session.query_vpp_config())
2536
2537     def test_add_mod_del_bfd_udp6(self):
2538         """ create/modify/delete IPv6 BFD UDP session """
2539         vpp_session = VppBFDUDPSession(
2540             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2541         self.registry.register(vpp_session, self.logger)
2542         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2543             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2544             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2545                                 self.pg0.remote_ip6,
2546                                 vpp_session.desired_min_tx,
2547                                 vpp_session.required_min_rx,
2548                                 vpp_session.detect_mult)
2549         self.cli_verify_no_response(cli_add_cmd)
2550         # 2nd add should fail
2551         self.cli_verify_response(
2552             cli_add_cmd,
2553             "bfd udp session add: `bfd_add_add_session' API call"
2554             " failed, rv=-101:Duplicate BFD object")
2555         verify_bfd_session_config(self, vpp_session)
2556         mod_session = VppBFDUDPSession(
2557             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2558             required_min_rx=2 * vpp_session.required_min_rx,
2559             desired_min_tx=3 * vpp_session.desired_min_tx,
2560             detect_mult=4 * vpp_session.detect_mult)
2561         self.cli_verify_no_response(
2562             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2563             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2564             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2565              mod_session.desired_min_tx,
2566              mod_session.required_min_rx, mod_session.detect_mult))
2567         verify_bfd_session_config(self, mod_session)
2568         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2569             "peer-addr %s" % (self.pg0.name,
2570                               self.pg0.local_ip6, self.pg0.remote_ip6)
2571         self.cli_verify_no_response(cli_del_cmd)
2572         # 2nd del is expected to fail
2573         self.cli_verify_response(
2574             cli_del_cmd,
2575             "bfd udp session del: `bfd_udp_del_session' API call"
2576             " failed, rv=-102:No such BFD object")
2577         self.assertFalse(vpp_session.query_vpp_config())
2578
2579     def test_add_mod_del_bfd_udp_auth(self):
2580         """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2581         key = self.factory.create_random_key(self)
2582         key.add_vpp_config()
2583         vpp_session = VppBFDUDPSession(
2584             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2585         self.registry.register(vpp_session, self.logger)
2586         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2587             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2588             "detect-mult %s conf-key-id %s bfd-key-id %s"\
2589             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2590                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2591                vpp_session.detect_mult, key.conf_key_id,
2592                vpp_session.bfd_key_id)
2593         self.cli_verify_no_response(cli_add_cmd)
2594         # 2nd add should fail
2595         self.cli_verify_response(
2596             cli_add_cmd,
2597             "bfd udp session add: `bfd_add_add_session' API call"
2598             " failed, rv=-101:Duplicate BFD object")
2599         verify_bfd_session_config(self, vpp_session)
2600         mod_session = VppBFDUDPSession(
2601             self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2602             bfd_key_id=vpp_session.bfd_key_id,
2603             required_min_rx=2 * vpp_session.required_min_rx,
2604             desired_min_tx=3 * vpp_session.desired_min_tx,
2605             detect_mult=4 * vpp_session.detect_mult)
2606         self.cli_verify_no_response(
2607             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2608             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2609             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2610              mod_session.desired_min_tx,
2611              mod_session.required_min_rx, mod_session.detect_mult))
2612         verify_bfd_session_config(self, mod_session)
2613         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2614             "peer-addr %s" % (self.pg0.name,
2615                               self.pg0.local_ip4, self.pg0.remote_ip4)
2616         self.cli_verify_no_response(cli_del_cmd)
2617         # 2nd del is expected to fail
2618         self.cli_verify_response(
2619             cli_del_cmd,
2620             "bfd udp session del: `bfd_udp_del_session' API call"
2621             " failed, rv=-102:No such BFD object")
2622         self.assertFalse(vpp_session.query_vpp_config())
2623
2624     def test_add_mod_del_bfd_udp6_auth(self):
2625         """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2626         key = self.factory.create_random_key(
2627             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2628         key.add_vpp_config()
2629         vpp_session = VppBFDUDPSession(
2630             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2631         self.registry.register(vpp_session, self.logger)
2632         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2633             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2634             "detect-mult %s conf-key-id %s bfd-key-id %s" \
2635             % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2636                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2637                vpp_session.detect_mult, key.conf_key_id,
2638                vpp_session.bfd_key_id)
2639         self.cli_verify_no_response(cli_add_cmd)
2640         # 2nd add should fail
2641         self.cli_verify_response(
2642             cli_add_cmd,
2643             "bfd udp session add: `bfd_add_add_session' API call"
2644             " failed, rv=-101:Duplicate BFD object")
2645         verify_bfd_session_config(self, vpp_session)
2646         mod_session = VppBFDUDPSession(
2647             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2648             bfd_key_id=vpp_session.bfd_key_id,
2649             required_min_rx=2 * vpp_session.required_min_rx,
2650             desired_min_tx=3 * vpp_session.desired_min_tx,
2651             detect_mult=4 * vpp_session.detect_mult)
2652         self.cli_verify_no_response(
2653             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2654             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2655             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2656              mod_session.desired_min_tx,
2657              mod_session.required_min_rx, mod_session.detect_mult))
2658         verify_bfd_session_config(self, mod_session)
2659         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2660             "peer-addr %s" % (self.pg0.name,
2661                               self.pg0.local_ip6, self.pg0.remote_ip6)
2662         self.cli_verify_no_response(cli_del_cmd)
2663         # 2nd del is expected to fail
2664         self.cli_verify_response(
2665             cli_del_cmd,
2666             "bfd udp session del: `bfd_udp_del_session' API call"
2667             " failed, rv=-102:No such BFD object")
2668         self.assertFalse(vpp_session.query_vpp_config())
2669
2670     def test_auth_on_off(self):
2671         """ turn authentication on and off """
2672         key = self.factory.create_random_key(
2673             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2674         key.add_vpp_config()
2675         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2676         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2677                                         sha1_key=key)
2678         session.add_vpp_config()
2679         cli_activate = \
2680             "bfd udp session auth activate interface %s local-addr %s "\
2681             "peer-addr %s conf-key-id %s bfd-key-id %s"\
2682             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2683                key.conf_key_id, auth_session.bfd_key_id)
2684         self.cli_verify_no_response(cli_activate)
2685         verify_bfd_session_config(self, auth_session)
2686         self.cli_verify_no_response(cli_activate)
2687         verify_bfd_session_config(self, auth_session)
2688         cli_deactivate = \
2689             "bfd udp session auth deactivate interface %s local-addr %s "\
2690             "peer-addr %s "\
2691             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2692         self.cli_verify_no_response(cli_deactivate)
2693         verify_bfd_session_config(self, session)
2694         self.cli_verify_no_response(cli_deactivate)
2695         verify_bfd_session_config(self, session)
2696
2697     def test_auth_on_off_delayed(self):
2698         """ turn authentication on and off (delayed) """
2699         key = self.factory.create_random_key(
2700             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2701         key.add_vpp_config()
2702         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2703         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2704                                         sha1_key=key)
2705         session.add_vpp_config()
2706         cli_activate = \
2707             "bfd udp session auth activate interface %s local-addr %s "\
2708             "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2709             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2710                key.conf_key_id, auth_session.bfd_key_id)
2711         self.cli_verify_no_response(cli_activate)
2712         verify_bfd_session_config(self, auth_session)
2713         self.cli_verify_no_response(cli_activate)
2714         verify_bfd_session_config(self, auth_session)
2715         cli_deactivate = \
2716             "bfd udp session auth deactivate interface %s local-addr %s "\
2717             "peer-addr %s delayed yes"\
2718             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2719         self.cli_verify_no_response(cli_deactivate)
2720         verify_bfd_session_config(self, session)
2721         self.cli_verify_no_response(cli_deactivate)
2722         verify_bfd_session_config(self, session)
2723
2724     def test_admin_up_down(self):
2725         """ put session admin-up and admin-down """
2726         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2727         session.add_vpp_config()
2728         cli_down = \
2729             "bfd udp session set-flags admin down interface %s local-addr %s "\
2730             "peer-addr %s "\
2731             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2732         cli_up = \
2733             "bfd udp session set-flags admin up interface %s local-addr %s "\
2734             "peer-addr %s "\
2735             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2736         self.cli_verify_no_response(cli_down)
2737         verify_bfd_session_config(self, session, state=BFDState.admin_down)
2738         self.cli_verify_no_response(cli_up)
2739         verify_bfd_session_config(self, session, state=BFDState.down)
2740
2741     def test_set_del_udp_echo_source(self):
2742         """ set/del udp echo source """
2743         self.create_loopback_interfaces(1)
2744         self.loopback0 = self.lo_interfaces[0]
2745         self.loopback0.admin_up()
2746         self.cli_verify_response("show bfd echo-source",
2747                                  "UDP echo source is not set.")
2748         cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2749         self.cli_verify_no_response(cli_set)
2750         self.cli_verify_response("show bfd echo-source",
2751                                  "UDP echo source is: %s\n"
2752                                  "IPv4 address usable as echo source: none\n"
2753                                  "IPv6 address usable as echo source: none" %
2754                                  self.loopback0.name)
2755         self.loopback0.config_ip4()
2756         echo_ip4 = str(ipaddress.IPv4Address(int(ipaddress.IPv4Address(
2757             self.loopback0.local_ip4)) ^ 1))
2758         self.cli_verify_response("show bfd echo-source",
2759                                  "UDP echo source is: %s\n"
2760                                  "IPv4 address usable as echo source: %s\n"
2761                                  "IPv6 address usable as echo source: none" %
2762                                  (self.loopback0.name, echo_ip4))
2763         echo_ip6 = str(ipaddress.IPv6Address(int(ipaddress.IPv6Address(
2764             self.loopback0.local_ip6)) ^ 1))
2765         self.loopback0.config_ip6()
2766         self.cli_verify_response("show bfd echo-source",
2767                                  "UDP echo source is: %s\n"
2768                                  "IPv4 address usable as echo source: %s\n"
2769                                  "IPv6 address usable as echo source: %s" %
2770                                  (self.loopback0.name, echo_ip4, echo_ip6))
2771         cli_del = "bfd udp echo-source del"
2772         self.cli_verify_no_response(cli_del)
2773         self.cli_verify_response("show bfd echo-source",
2774                                  "UDP echo source is not set.")
2775
2776
2777 if __name__ == '__main__':
2778     unittest.main(testRunner=VppTestRunner)