bfd: API cleanup
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python
2 """ BFD tests """
3
4 from __future__ import division
5
6 import binascii
7 import hashlib
8 import time
9 import unittest
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from struct import pack, unpack
13
14 from six import moves
15 import scapy.compat
16 from scapy.layers.inet import UDP, IP
17 from scapy.layers.inet6 import IPv6
18 from scapy.layers.l2 import Ether, GRE
19 from scapy.packet import Raw
20
21 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
22     BFDDiagCode, BFDState, BFD_vpp_echo
23 from framework import VppTestCase, VppTestRunner, running_extended_tests
24 from util import ppp
25 from vpp_ip import DpoProto
26 from vpp_ip_route import VppIpRoute, VppRoutePath
27 from vpp_lo_interface import VppLoInterface
28 from vpp_papi_provider import UnexpectedApiReturnValueError, \
29     CliFailedCommandError
30 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
31 from vpp_gre_interface import VppGreInterface
32 from vpp_papi import VppEnum
33
34 USEC_IN_SEC = 1000000
35
36
37 class AuthKeyFactory(object):
38     """Factory class for creating auth keys with unique conf key ID"""
39
40     def __init__(self):
41         self._conf_key_ids = {}
42
43     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
44         """ create a random key with unique conf key id """
45         conf_key_id = randint(0, 0xFFFFFFFF)
46         while conf_key_id in self._conf_key_ids:
47             conf_key_id = randint(0, 0xFFFFFFFF)
48         self._conf_key_ids[conf_key_id] = 1
49         key = scapy.compat.raw(
50             bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
51         return VppBFDAuthKey(test=test, auth_type=auth_type,
52                              conf_key_id=conf_key_id, key=key)
53
54
55 @unittest.skipUnless(running_extended_tests, "part of extended tests")
56 class BFDAPITestCase(VppTestCase):
57     """Bidirectional Forwarding Detection (BFD) - API"""
58
59     pg0 = None
60     pg1 = None
61
62     @classmethod
63     def setUpClass(cls):
64         super(BFDAPITestCase, cls).setUpClass()
65         cls.vapi.cli("set log class bfd level debug")
66         try:
67             cls.create_pg_interfaces(range(2))
68             for i in cls.pg_interfaces:
69                 i.config_ip4()
70                 i.config_ip6()
71                 i.resolve_arp()
72
73         except Exception:
74             super(BFDAPITestCase, cls).tearDownClass()
75             raise
76
77     @classmethod
78     def tearDownClass(cls):
79         super(BFDAPITestCase, cls).tearDownClass()
80
81     def setUp(self):
82         super(BFDAPITestCase, self).setUp()
83         self.factory = AuthKeyFactory()
84
85     def test_add_bfd(self):
86         """ create a BFD session """
87         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
88         session.add_vpp_config()
89         self.logger.debug("Session state is %s", session.state)
90         session.remove_vpp_config()
91         session.add_vpp_config()
92         self.logger.debug("Session state is %s", session.state)
93         session.remove_vpp_config()
94
95     def test_double_add(self):
96         """ create the same BFD session twice (negative case) """
97         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
98         session.add_vpp_config()
99
100         with self.vapi.assert_negative_api_retval():
101             session.add_vpp_config()
102
103         session.remove_vpp_config()
104
105     def test_add_bfd6(self):
106         """ create IPv6 BFD session """
107         session = VppBFDUDPSession(
108             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
109         session.add_vpp_config()
110         self.logger.debug("Session state is %s", session.state)
111         session.remove_vpp_config()
112         session.add_vpp_config()
113         self.logger.debug("Session state is %s", session.state)
114         session.remove_vpp_config()
115
116     def test_mod_bfd(self):
117         """ modify BFD session parameters """
118         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
119                                    desired_min_tx=50000,
120                                    required_min_rx=10000,
121                                    detect_mult=1)
122         session.add_vpp_config()
123         s = session.get_bfd_udp_session_dump_entry()
124         self.assert_equal(session.desired_min_tx,
125                           s.desired_min_tx,
126                           "desired min transmit interval")
127         self.assert_equal(session.required_min_rx,
128                           s.required_min_rx,
129                           "required min receive interval")
130         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
131         session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
132                                   required_min_rx=session.required_min_rx * 2,
133                                   detect_mult=session.detect_mult * 2)
134         s = session.get_bfd_udp_session_dump_entry()
135         self.assert_equal(session.desired_min_tx,
136                           s.desired_min_tx,
137                           "desired min transmit interval")
138         self.assert_equal(session.required_min_rx,
139                           s.required_min_rx,
140                           "required min receive interval")
141         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
142
143     def test_add_sha1_keys(self):
144         """ add SHA1 keys """
145         key_count = 10
146         keys = [self.factory.create_random_key(
147             self) for i in range(0, key_count)]
148         for key in keys:
149             self.assertFalse(key.query_vpp_config())
150         for key in keys:
151             key.add_vpp_config()
152         for key in keys:
153             self.assertTrue(key.query_vpp_config())
154         # remove randomly
155         indexes = range(key_count)
156         shuffle(indexes)
157         removed = []
158         for i in indexes:
159             key = keys[i]
160             key.remove_vpp_config()
161             removed.append(i)
162             for j in range(key_count):
163                 key = keys[j]
164                 if j in removed:
165                     self.assertFalse(key.query_vpp_config())
166                 else:
167                     self.assertTrue(key.query_vpp_config())
168         # should be removed now
169         for key in keys:
170             self.assertFalse(key.query_vpp_config())
171         # add back and remove again
172         for key in keys:
173             key.add_vpp_config()
174         for key in keys:
175             self.assertTrue(key.query_vpp_config())
176         for key in keys:
177             key.remove_vpp_config()
178         for key in keys:
179             self.assertFalse(key.query_vpp_config())
180
181     def test_add_bfd_sha1(self):
182         """ create a BFD session (SHA1) """
183         key = self.factory.create_random_key(self)
184         key.add_vpp_config()
185         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
186                                    sha1_key=key)
187         session.add_vpp_config()
188         self.logger.debug("Session state is %s", session.state)
189         session.remove_vpp_config()
190         session.add_vpp_config()
191         self.logger.debug("Session state is %s", session.state)
192         session.remove_vpp_config()
193
194     def test_double_add_sha1(self):
195         """ create the same BFD session twice (negative case) (SHA1) """
196         key = self.factory.create_random_key(self)
197         key.add_vpp_config()
198         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
199                                    sha1_key=key)
200         session.add_vpp_config()
201         with self.assertRaises(Exception):
202             session.add_vpp_config()
203
204     def test_add_auth_nonexistent_key(self):
205         """ create BFD session using non-existent SHA1 (negative case) """
206         session = VppBFDUDPSession(
207             self, self.pg0, self.pg0.remote_ip4,
208             sha1_key=self.factory.create_random_key(self))
209         with self.assertRaises(Exception):
210             session.add_vpp_config()
211
212     def test_shared_sha1_key(self):
213         """ share single SHA1 key between multiple BFD sessions """
214         key = self.factory.create_random_key(self)
215         key.add_vpp_config()
216         sessions = [
217             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
218                              sha1_key=key),
219             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
220                              sha1_key=key, af=AF_INET6),
221             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
222                              sha1_key=key),
223             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
224                              sha1_key=key, af=AF_INET6)]
225         for s in sessions:
226             s.add_vpp_config()
227         removed = 0
228         for s in sessions:
229             e = key.get_bfd_auth_keys_dump_entry()
230             self.assert_equal(e.use_count, len(sessions) - removed,
231                               "Use count for shared key")
232             s.remove_vpp_config()
233             removed += 1
234         e = key.get_bfd_auth_keys_dump_entry()
235         self.assert_equal(e.use_count, len(sessions) - removed,
236                           "Use count for shared key")
237
238     def test_activate_auth(self):
239         """ activate SHA1 authentication """
240         key = self.factory.create_random_key(self)
241         key.add_vpp_config()
242         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
243         session.add_vpp_config()
244         session.activate_auth(key)
245
246     def test_deactivate_auth(self):
247         """ deactivate SHA1 authentication """
248         key = self.factory.create_random_key(self)
249         key.add_vpp_config()
250         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
251         session.add_vpp_config()
252         session.activate_auth(key)
253         session.deactivate_auth()
254
255     def test_change_key(self):
256         """ change SHA1 key """
257         key1 = self.factory.create_random_key(self)
258         key2 = self.factory.create_random_key(self)
259         while key2.conf_key_id == key1.conf_key_id:
260             key2 = self.factory.create_random_key(self)
261         key1.add_vpp_config()
262         key2.add_vpp_config()
263         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
264                                    sha1_key=key1)
265         session.add_vpp_config()
266         session.activate_auth(key2)
267
268     def test_set_del_udp_echo_source(self):
269         """ set/del udp echo source """
270         self.create_loopback_interfaces(1)
271         self.loopback0 = self.lo_interfaces[0]
272         self.loopback0.admin_up()
273         echo_source = self.vapi.bfd_udp_get_echo_source()
274         self.assertFalse(echo_source.is_set)
275         self.assertFalse(echo_source.have_usable_ip4)
276         self.assertFalse(echo_source.have_usable_ip6)
277
278         self.vapi.bfd_udp_set_echo_source(
279             sw_if_index=self.loopback0.sw_if_index)
280         echo_source = self.vapi.bfd_udp_get_echo_source()
281         self.assertTrue(echo_source.is_set)
282         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
283         self.assertFalse(echo_source.have_usable_ip4)
284         self.assertFalse(echo_source.have_usable_ip6)
285
286         self.loopback0.config_ip4()
287         unpacked = unpack("!L", self.loopback0.local_ip4n)
288         echo_ip4 = pack("!L", unpacked[0] ^ 1)
289         echo_source = self.vapi.bfd_udp_get_echo_source()
290         self.assertTrue(echo_source.is_set)
291         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
292         self.assertTrue(echo_source.have_usable_ip4)
293         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
294         self.assertFalse(echo_source.have_usable_ip6)
295
296         self.loopback0.config_ip6()
297         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
298         echo_ip6 = pack("!LLLL", unpacked[0], unpacked[1], unpacked[2],
299                         unpacked[3] ^ 1)
300         echo_source = self.vapi.bfd_udp_get_echo_source()
301         self.assertTrue(echo_source.is_set)
302         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
303         self.assertTrue(echo_source.have_usable_ip4)
304         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
305         self.assertTrue(echo_source.have_usable_ip6)
306         self.assertEqual(echo_source.ip6_addr.packed, echo_ip6)
307
308         self.vapi.bfd_udp_del_echo_source()
309         echo_source = self.vapi.bfd_udp_get_echo_source()
310         self.assertFalse(echo_source.is_set)
311         self.assertFalse(echo_source.have_usable_ip4)
312         self.assertFalse(echo_source.have_usable_ip6)
313
314
315 class BFDTestSession(object):
316     """ BFD session as seen from test framework side """
317
318     def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
319                  bfd_key_id=None, our_seq_number=None,
320                  tunnel_header=None, phy_interface=None):
321         self.test = test
322         self.af = af
323         self.sha1_key = sha1_key
324         self.bfd_key_id = bfd_key_id
325         self.interface = interface
326         if phy_interface:
327             self.phy_interface = phy_interface
328         else:
329             self.phy_interface = self.interface
330         self.udp_sport = randint(49152, 65535)
331         if our_seq_number is None:
332             self.our_seq_number = randint(0, 40000000)
333         else:
334             self.our_seq_number = our_seq_number
335         self.vpp_seq_number = None
336         self.my_discriminator = 0
337         self.desired_min_tx = 300000
338         self.required_min_rx = 300000
339         self.required_min_echo_rx = None
340         self.detect_mult = detect_mult
341         self.diag = BFDDiagCode.no_diagnostic
342         self.your_discriminator = None
343         self.state = BFDState.down
344         self.auth_type = BFDAuthType.no_auth
345         self.tunnel_header = tunnel_header
346
347     def inc_seq_num(self):
348         """ increment sequence number, wrapping if needed """
349         if self.our_seq_number == 0xFFFFFFFF:
350             self.our_seq_number = 0
351         else:
352             self.our_seq_number += 1
353
354     def update(self, my_discriminator=None, your_discriminator=None,
355                desired_min_tx=None, required_min_rx=None,
356                required_min_echo_rx=None, detect_mult=None,
357                diag=None, state=None, auth_type=None):
358         """ update BFD parameters associated with session """
359         if my_discriminator is not None:
360             self.my_discriminator = my_discriminator
361         if your_discriminator is not None:
362             self.your_discriminator = your_discriminator
363         if required_min_rx is not None:
364             self.required_min_rx = required_min_rx
365         if required_min_echo_rx is not None:
366             self.required_min_echo_rx = required_min_echo_rx
367         if desired_min_tx is not None:
368             self.desired_min_tx = desired_min_tx
369         if detect_mult is not None:
370             self.detect_mult = detect_mult
371         if diag is not None:
372             self.diag = diag
373         if state is not None:
374             self.state = state
375         if auth_type is not None:
376             self.auth_type = auth_type
377
378     def fill_packet_fields(self, packet):
379         """ set packet fields with known values in packet """
380         bfd = packet[BFD]
381         if self.my_discriminator:
382             self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
383                                    self.my_discriminator)
384             bfd.my_discriminator = self.my_discriminator
385         if self.your_discriminator:
386             self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
387                                    self.your_discriminator)
388             bfd.your_discriminator = self.your_discriminator
389         if self.required_min_rx:
390             self.test.logger.debug(
391                 "BFD: setting packet.required_min_rx_interval=%s",
392                 self.required_min_rx)
393             bfd.required_min_rx_interval = self.required_min_rx
394         if self.required_min_echo_rx:
395             self.test.logger.debug(
396                 "BFD: setting packet.required_min_echo_rx=%s",
397                 self.required_min_echo_rx)
398             bfd.required_min_echo_rx_interval = self.required_min_echo_rx
399         if self.desired_min_tx:
400             self.test.logger.debug(
401                 "BFD: setting packet.desired_min_tx_interval=%s",
402                 self.desired_min_tx)
403             bfd.desired_min_tx_interval = self.desired_min_tx
404         if self.detect_mult:
405             self.test.logger.debug(
406                 "BFD: setting packet.detect_mult=%s", self.detect_mult)
407             bfd.detect_mult = self.detect_mult
408         if self.diag:
409             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
410             bfd.diag = self.diag
411         if self.state:
412             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
413             bfd.state = self.state
414         if self.auth_type:
415             # this is used by a negative test-case
416             self.test.logger.debug("BFD: setting packet.auth_type=%s",
417                                    self.auth_type)
418             bfd.auth_type = self.auth_type
419
420     def create_packet(self):
421         """ create a BFD packet, reflecting the current state of session """
422         if self.sha1_key:
423             bfd = BFD(flags="A")
424             bfd.auth_type = self.sha1_key.auth_type
425             bfd.auth_len = BFD.sha1_auth_len
426             bfd.auth_key_id = self.bfd_key_id
427             bfd.auth_seq_num = self.our_seq_number
428             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
429         else:
430             bfd = BFD()
431         packet = Ether(src=self.phy_interface.remote_mac,
432                        dst=self.phy_interface.local_mac)
433         if self.tunnel_header:
434             packet = packet / self.tunnel_header
435         if self.af == AF_INET6:
436             packet = (packet /
437                       IPv6(src=self.interface.remote_ip6,
438                            dst=self.interface.local_ip6,
439                            hlim=255) /
440                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
441                       bfd)
442         else:
443             packet = (packet /
444                       IP(src=self.interface.remote_ip4,
445                          dst=self.interface.local_ip4,
446                          ttl=255) /
447                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
448                       bfd)
449         self.test.logger.debug("BFD: Creating packet")
450         self.fill_packet_fields(packet)
451         if self.sha1_key:
452             hash_material = scapy.compat.raw(
453                 packet[BFD])[:32] + self.sha1_key.key + \
454                 "\0" * (20 - len(self.sha1_key.key))
455             self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
456                                    hashlib.sha1(hash_material).hexdigest())
457             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
458         return packet
459
460     def send_packet(self, packet=None, interface=None):
461         """ send packet on interface, creating the packet if needed """
462         if packet is None:
463             packet = self.create_packet()
464         if interface is None:
465             interface = self.phy_interface
466         self.test.logger.debug(ppp("Sending packet:", packet))
467         interface.add_stream(packet)
468         self.test.pg_start()
469
470     def verify_sha1_auth(self, packet):
471         """ Verify correctness of authentication in BFD layer. """
472         bfd = packet[BFD]
473         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
474         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
475                                BFDAuthType)
476         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
477         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
478         if self.vpp_seq_number is None:
479             self.vpp_seq_number = bfd.auth_seq_num
480             self.test.logger.debug("Received initial sequence number: %s" %
481                                    self.vpp_seq_number)
482         else:
483             recvd_seq_num = bfd.auth_seq_num
484             self.test.logger.debug("Received followup sequence number: %s" %
485                                    recvd_seq_num)
486             if self.vpp_seq_number < 0xffffffff:
487                 if self.sha1_key.auth_type == \
488                         BFDAuthType.meticulous_keyed_sha1:
489                     self.test.assert_equal(recvd_seq_num,
490                                            self.vpp_seq_number + 1,
491                                            "BFD sequence number")
492                 else:
493                     self.test.assert_in_range(recvd_seq_num,
494                                               self.vpp_seq_number,
495                                               self.vpp_seq_number + 1,
496                                               "BFD sequence number")
497             else:
498                 if self.sha1_key.auth_type == \
499                         BFDAuthType.meticulous_keyed_sha1:
500                     self.test.assert_equal(recvd_seq_num, 0,
501                                            "BFD sequence number")
502                 else:
503                     self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
504                                        "BFD sequence number not one of "
505                                        "(%s, 0)" % self.vpp_seq_number)
506             self.vpp_seq_number = recvd_seq_num
507         # last 20 bytes represent the hash - so replace them with the key,
508         # pad the result with zeros and hash the result
509         hash_material = bfd.original[:-20] + self.sha1_key.key + \
510             b"\0" * (20 - len(self.sha1_key.key))
511         expected_hash = hashlib.sha1(hash_material).hexdigest()
512         self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
513                                expected_hash, "Auth key hash")
514
515     def verify_bfd(self, packet):
516         """ Verify correctness of BFD layer. """
517         bfd = packet[BFD]
518         self.test.assert_equal(bfd.version, 1, "BFD version")
519         self.test.assert_equal(bfd.your_discriminator,
520                                self.my_discriminator,
521                                "BFD - your discriminator")
522         if self.sha1_key:
523             self.verify_sha1_auth(packet)
524
525
526 def bfd_session_up(test):
527     """ Bring BFD session up """
528     test.logger.info("BFD: Waiting for slow hello")
529     p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
530     old_offset = None
531     if hasattr(test, 'vpp_clock_offset'):
532         old_offset = test.vpp_clock_offset
533     test.vpp_clock_offset = time.time() - p.time
534     test.logger.debug("BFD: Calculated vpp clock offset: %s",
535                       test.vpp_clock_offset)
536     if old_offset:
537         test.assertAlmostEqual(
538             old_offset, test.vpp_clock_offset, delta=0.5,
539             msg="vpp clock offset not stable (new: %s, old: %s)" %
540             (test.vpp_clock_offset, old_offset))
541     test.logger.info("BFD: Sending Init")
542     test.test_session.update(my_discriminator=randint(0, 40000000),
543                              your_discriminator=p[BFD].my_discriminator,
544                              state=BFDState.init)
545     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
546             BFDAuthType.meticulous_keyed_sha1:
547         test.test_session.inc_seq_num()
548     test.test_session.send_packet()
549     test.logger.info("BFD: Waiting for event")
550     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
551     verify_event(test, e, expected_state=BFDState.up)
552     test.logger.info("BFD: Session is Up")
553     test.test_session.update(state=BFDState.up)
554     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
555             BFDAuthType.meticulous_keyed_sha1:
556         test.test_session.inc_seq_num()
557     test.test_session.send_packet()
558     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
559
560
561 def bfd_session_down(test):
562     """ Bring BFD session down """
563     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
564     test.test_session.update(state=BFDState.down)
565     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
566             BFDAuthType.meticulous_keyed_sha1:
567         test.test_session.inc_seq_num()
568     test.test_session.send_packet()
569     test.logger.info("BFD: Waiting for event")
570     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
571     verify_event(test, e, expected_state=BFDState.down)
572     test.logger.info("BFD: Session is Down")
573     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
574
575
576 def verify_bfd_session_config(test, session, state=None):
577     dump = session.get_bfd_udp_session_dump_entry()
578     test.assertIsNotNone(dump)
579     # since dump is not none, we have verified that sw_if_index and addresses
580     # are valid (in get_bfd_udp_session_dump_entry)
581     if state:
582         test.assert_equal(dump.state, state, "session state")
583     test.assert_equal(dump.required_min_rx, session.required_min_rx,
584                       "required min rx interval")
585     test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
586                       "desired min tx interval")
587     test.assert_equal(dump.detect_mult, session.detect_mult,
588                       "detect multiplier")
589     if session.sha1_key is None:
590         test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
591     else:
592         test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
593         test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
594                           "bfd key id")
595         test.assert_equal(dump.conf_key_id,
596                           session.sha1_key.conf_key_id,
597                           "config key id")
598
599
600 def verify_ip(test, packet):
601     """ Verify correctness of IP layer. """
602     if test.vpp_session.af == AF_INET6:
603         ip = packet[IPv6]
604         local_ip = test.vpp_session.interface.local_ip6
605         remote_ip = test.vpp_session.interface.remote_ip6
606         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
607     else:
608         ip = packet[IP]
609         local_ip = test.vpp_session.interface.local_ip4
610         remote_ip = test.vpp_session.interface.remote_ip4
611         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
612     test.assert_equal(ip.src, local_ip, "IP source address")
613     test.assert_equal(ip.dst, remote_ip, "IP destination address")
614
615
616 def verify_udp(test, packet):
617     """ Verify correctness of UDP layer. """
618     udp = packet[UDP]
619     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
620     test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
621                          "UDP source port")
622
623
624 def verify_event(test, event, expected_state):
625     """ Verify correctness of event values. """
626     e = event
627     test.logger.debug("BFD: Event: %s" % moves.reprlib.repr(e))
628     test.assert_equal(e.sw_if_index,
629                       test.vpp_session.interface.sw_if_index,
630                       "BFD interface index")
631
632     test.assert_equal(str(e.local_addr), test.vpp_session.local_addr,
633                       "Local IPv6 address")
634     test.assert_equal(str(e.peer_addr), test.vpp_session.peer_addr,
635                       "Peer IPv6 address")
636     test.assert_equal(e.state, expected_state, BFDState)
637
638
639 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
640     """ wait for BFD packet and verify its correctness
641
642     :param timeout: how long to wait
643     :param pcap_time_min: ignore packets with pcap timestamp lower than this
644
645     :returns: tuple (packet, time spent waiting for packet)
646     """
647     test.logger.info("BFD: Waiting for BFD packet")
648     deadline = time.time() + timeout
649     counter = 0
650     while True:
651         counter += 1
652         # sanity check
653         test.assert_in_range(counter, 0, 100, "number of packets ignored")
654         time_left = deadline - time.time()
655         if time_left < 0:
656             raise CaptureTimeoutError("Packet did not arrive within timeout")
657         p = test.pg0.wait_for_packet(timeout=time_left)
658         test.logger.debug(ppp("BFD: Got packet:", p))
659         if pcap_time_min is not None and p.time < pcap_time_min:
660             test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
661                                   "pcap time min %s):" %
662                                   (p.time, pcap_time_min), p))
663         else:
664             break
665     if is_tunnel:
666         # strip an IP layer and move to the next
667         p = p[IP].payload
668
669     bfd = p[BFD]
670     if bfd is None:
671         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
672     if bfd.payload:
673         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
674     verify_ip(test, p)
675     verify_udp(test, p)
676     test.test_session.verify_bfd(p)
677     return p
678
679
680 @unittest.skipUnless(running_extended_tests, "part of extended tests")
681 class BFD4TestCase(VppTestCase):
682     """Bidirectional Forwarding Detection (BFD)"""
683
684     pg0 = None
685     vpp_clock_offset = None
686     vpp_session = None
687     test_session = None
688
689     @classmethod
690     def setUpClass(cls):
691         super(BFD4TestCase, cls).setUpClass()
692         cls.vapi.cli("set log class bfd level debug")
693         try:
694             cls.create_pg_interfaces([0])
695             cls.create_loopback_interfaces(1)
696             cls.loopback0 = cls.lo_interfaces[0]
697             cls.loopback0.config_ip4()
698             cls.loopback0.admin_up()
699             cls.pg0.config_ip4()
700             cls.pg0.configure_ipv4_neighbors()
701             cls.pg0.admin_up()
702             cls.pg0.resolve_arp()
703
704         except Exception:
705             super(BFD4TestCase, cls).tearDownClass()
706             raise
707
708     @classmethod
709     def tearDownClass(cls):
710         super(BFD4TestCase, cls).tearDownClass()
711
712     def setUp(self):
713         super(BFD4TestCase, self).setUp()
714         self.factory = AuthKeyFactory()
715         self.vapi.want_bfd_events()
716         self.pg0.enable_capture()
717         try:
718             self.vpp_session = VppBFDUDPSession(self, self.pg0,
719                                                 self.pg0.remote_ip4)
720             self.vpp_session.add_vpp_config()
721             self.vpp_session.admin_up()
722             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
723         except BaseException:
724             self.vapi.want_bfd_events(enable_disable=0)
725             raise
726
727     def tearDown(self):
728         if not self.vpp_dead:
729             self.vapi.want_bfd_events(enable_disable=0)
730         self.vapi.collect_events()  # clear the event queue
731         super(BFD4TestCase, self).tearDown()
732
733     def test_session_up(self):
734         """ bring BFD session up """
735         bfd_session_up(self)
736
737     def test_session_up_by_ip(self):
738         """ bring BFD session up - first frame looked up by address pair """
739         self.logger.info("BFD: Sending Slow control frame")
740         self.test_session.update(my_discriminator=randint(0, 40000000))
741         self.test_session.send_packet()
742         self.pg0.enable_capture()
743         p = self.pg0.wait_for_packet(1)
744         self.assert_equal(p[BFD].your_discriminator,
745                           self.test_session.my_discriminator,
746                           "BFD - your discriminator")
747         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
748         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
749                                  state=BFDState.up)
750         self.logger.info("BFD: Waiting for event")
751         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
752         verify_event(self, e, expected_state=BFDState.init)
753         self.logger.info("BFD: Sending Up")
754         self.test_session.send_packet()
755         self.logger.info("BFD: Waiting for event")
756         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
757         verify_event(self, e, expected_state=BFDState.up)
758         self.logger.info("BFD: Session is Up")
759         self.test_session.update(state=BFDState.up)
760         self.test_session.send_packet()
761         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
762
763     def test_session_down(self):
764         """ bring BFD session down """
765         bfd_session_up(self)
766         bfd_session_down(self)
767
768     @unittest.skipUnless(running_extended_tests, "part of extended tests")
769     def test_hold_up(self):
770         """ hold BFD session up """
771         bfd_session_up(self)
772         for dummy in range(self.test_session.detect_mult * 2):
773             wait_for_bfd_packet(self)
774             self.test_session.send_packet()
775         self.assert_equal(len(self.vapi.collect_events()), 0,
776                           "number of bfd events")
777
778     @unittest.skipUnless(running_extended_tests, "part of extended tests")
779     def test_slow_timer(self):
780         """ verify slow periodic control frames while session down """
781         packet_count = 3
782         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
783         prev_packet = wait_for_bfd_packet(self, 2)
784         for dummy in range(packet_count):
785             next_packet = wait_for_bfd_packet(self, 2)
786             time_diff = next_packet.time - prev_packet.time
787             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
788             # to work around timing issues
789             self.assert_in_range(
790                 time_diff, 0.70, 1.05, "time between slow packets")
791             prev_packet = next_packet
792
793     @unittest.skipUnless(running_extended_tests, "part of extended tests")
794     def test_zero_remote_min_rx(self):
795         """ no packets when zero remote required min rx interval """
796         bfd_session_up(self)
797         self.test_session.update(required_min_rx=0)
798         self.test_session.send_packet()
799         for dummy in range(self.test_session.detect_mult):
800             self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
801                        "sleep before transmitting bfd packet")
802             self.test_session.send_packet()
803             try:
804                 p = wait_for_bfd_packet(self, timeout=0)
805                 self.logger.error(ppp("Received unexpected packet:", p))
806             except CaptureTimeoutError:
807                 pass
808         self.assert_equal(
809             len(self.vapi.collect_events()), 0, "number of bfd events")
810         self.test_session.update(required_min_rx=300000)
811         for dummy in range(3):
812             self.test_session.send_packet()
813             wait_for_bfd_packet(
814                 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
815         self.assert_equal(
816             len(self.vapi.collect_events()), 0, "number of bfd events")
817
818     @unittest.skipUnless(running_extended_tests, "part of extended tests")
819     def test_conn_down(self):
820         """ verify session goes down after inactivity """
821         bfd_session_up(self)
822         detection_time = self.test_session.detect_mult *\
823             self.vpp_session.required_min_rx / USEC_IN_SEC
824         self.sleep(detection_time, "waiting for BFD session time-out")
825         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
826         verify_event(self, e, expected_state=BFDState.down)
827
828     @unittest.skipUnless(running_extended_tests, "part of extended tests")
829     def test_large_required_min_rx(self):
830         """ large remote required min rx interval """
831         bfd_session_up(self)
832         p = wait_for_bfd_packet(self)
833         interval = 3000000
834         self.test_session.update(required_min_rx=interval)
835         self.test_session.send_packet()
836         time_mark = time.time()
837         count = 0
838         # busy wait here, trying to collect a packet or event, vpp is not
839         # allowed to send packets and the session will timeout first - so the
840         # Up->Down event must arrive before any packets do
841         while time.time() < time_mark + interval / USEC_IN_SEC:
842             try:
843                 p = wait_for_bfd_packet(self, timeout=0)
844                 # if vpp managed to send a packet before we did the session
845                 # session update, then that's fine, ignore it
846                 if p.time < time_mark - self.vpp_clock_offset:
847                     continue
848                 self.logger.error(ppp("Received unexpected packet:", p))
849                 count += 1
850             except CaptureTimeoutError:
851                 pass
852             events = self.vapi.collect_events()
853             if len(events) > 0:
854                 verify_event(self, events[0], BFDState.down)
855                 break
856         self.assert_equal(count, 0, "number of packets received")
857
858     @unittest.skipUnless(running_extended_tests, "part of extended tests")
859     def test_immediate_remote_min_rx_reduction(self):
860         """ immediately honor remote required min rx reduction """
861         self.vpp_session.remove_vpp_config()
862         self.vpp_session = VppBFDUDPSession(
863             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
864         self.pg0.enable_capture()
865         self.vpp_session.add_vpp_config()
866         self.test_session.update(desired_min_tx=1000000,
867                                  required_min_rx=1000000)
868         bfd_session_up(self)
869         reference_packet = wait_for_bfd_packet(self)
870         time_mark = time.time()
871         interval = 300000
872         self.test_session.update(required_min_rx=interval)
873         self.test_session.send_packet()
874         extra_time = time.time() - time_mark
875         p = wait_for_bfd_packet(self)
876         # first packet is allowed to be late by time we spent doing the update
877         # calculated in extra_time
878         self.assert_in_range(p.time - reference_packet.time,
879                              .95 * 0.75 * interval / USEC_IN_SEC,
880                              1.05 * interval / USEC_IN_SEC + extra_time,
881                              "time between BFD packets")
882         reference_packet = p
883         for dummy in range(3):
884             p = wait_for_bfd_packet(self)
885             diff = p.time - reference_packet.time
886             self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
887                                  1.05 * interval / USEC_IN_SEC,
888                                  "time between BFD packets")
889             reference_packet = p
890
891     @unittest.skipUnless(running_extended_tests, "part of extended tests")
892     def test_modify_req_min_rx_double(self):
893         """ modify session - double required min rx """
894         bfd_session_up(self)
895         p = wait_for_bfd_packet(self)
896         self.test_session.update(desired_min_tx=10000,
897                                  required_min_rx=10000)
898         self.test_session.send_packet()
899         # double required min rx
900         self.vpp_session.modify_parameters(
901             required_min_rx=2 * self.vpp_session.required_min_rx)
902         p = wait_for_bfd_packet(
903             self, pcap_time_min=time.time() - self.vpp_clock_offset)
904         # poll bit needs to be set
905         self.assertIn("P", p.sprintf("%BFD.flags%"),
906                       "Poll bit not set in BFD packet")
907         # finish poll sequence with final packet
908         final = self.test_session.create_packet()
909         final[BFD].flags = "F"
910         timeout = self.test_session.detect_mult * \
911             max(self.test_session.desired_min_tx,
912                 self.vpp_session.required_min_rx) / USEC_IN_SEC
913         self.test_session.send_packet(final)
914         time_mark = time.time()
915         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
916         verify_event(self, e, expected_state=BFDState.down)
917         time_to_event = time.time() - time_mark
918         self.assert_in_range(time_to_event, .9 * timeout,
919                              1.1 * timeout, "session timeout")
920
921     @unittest.skipUnless(running_extended_tests, "part of extended tests")
922     def test_modify_req_min_rx_halve(self):
923         """ modify session - halve required min rx """
924         self.vpp_session.modify_parameters(
925             required_min_rx=2 * self.vpp_session.required_min_rx)
926         bfd_session_up(self)
927         p = wait_for_bfd_packet(self)
928         self.test_session.update(desired_min_tx=10000,
929                                  required_min_rx=10000)
930         self.test_session.send_packet()
931         p = wait_for_bfd_packet(
932             self, pcap_time_min=time.time() - self.vpp_clock_offset)
933         # halve required min rx
934         old_required_min_rx = self.vpp_session.required_min_rx
935         self.vpp_session.modify_parameters(
936             required_min_rx=self.vpp_session.required_min_rx // 2)
937         # now we wait 0.8*3*old-req-min-rx and the session should still be up
938         self.sleep(0.8 * self.vpp_session.detect_mult *
939                    old_required_min_rx / USEC_IN_SEC,
940                    "wait before finishing poll sequence")
941         self.assert_equal(len(self.vapi.collect_events()), 0,
942                           "number of bfd events")
943         p = wait_for_bfd_packet(self)
944         # poll bit needs to be set
945         self.assertIn("P", p.sprintf("%BFD.flags%"),
946                       "Poll bit not set in BFD packet")
947         # finish poll sequence with final packet
948         final = self.test_session.create_packet()
949         final[BFD].flags = "F"
950         self.test_session.send_packet(final)
951         # now the session should time out under new conditions
952         detection_time = self.test_session.detect_mult *\
953             self.vpp_session.required_min_rx / USEC_IN_SEC
954         before = time.time()
955         e = self.vapi.wait_for_event(
956             2 * detection_time, "bfd_udp_session_details")
957         after = time.time()
958         self.assert_in_range(after - before,
959                              0.9 * detection_time,
960                              1.1 * detection_time,
961                              "time before bfd session goes down")
962         verify_event(self, e, expected_state=BFDState.down)
963
964     @unittest.skipUnless(running_extended_tests, "part of extended tests")
965     def test_modify_detect_mult(self):
966         """ modify detect multiplier """
967         bfd_session_up(self)
968         p = wait_for_bfd_packet(self)
969         self.vpp_session.modify_parameters(detect_mult=1)
970         p = wait_for_bfd_packet(
971             self, pcap_time_min=time.time() - self.vpp_clock_offset)
972         self.assert_equal(self.vpp_session.detect_mult,
973                           p[BFD].detect_mult,
974                           "detect mult")
975         # poll bit must not be set
976         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
977                          "Poll bit not set in BFD packet")
978         self.vpp_session.modify_parameters(detect_mult=10)
979         p = wait_for_bfd_packet(
980             self, pcap_time_min=time.time() - self.vpp_clock_offset)
981         self.assert_equal(self.vpp_session.detect_mult,
982                           p[BFD].detect_mult,
983                           "detect mult")
984         # poll bit must not be set
985         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
986                          "Poll bit not set in BFD packet")
987
988     @unittest.skipUnless(running_extended_tests, "part of extended tests")
989     def test_queued_poll(self):
990         """ test poll sequence queueing """
991         bfd_session_up(self)
992         p = wait_for_bfd_packet(self)
993         self.vpp_session.modify_parameters(
994             required_min_rx=2 * self.vpp_session.required_min_rx)
995         p = wait_for_bfd_packet(self)
996         poll_sequence_start = time.time()
997         poll_sequence_length_min = 0.5
998         send_final_after = time.time() + poll_sequence_length_min
999         # poll bit needs to be set
1000         self.assertIn("P", p.sprintf("%BFD.flags%"),
1001                       "Poll bit not set in BFD packet")
1002         self.assert_equal(p[BFD].required_min_rx_interval,
1003                           self.vpp_session.required_min_rx,
1004                           "BFD required min rx interval")
1005         self.vpp_session.modify_parameters(
1006             required_min_rx=2 * self.vpp_session.required_min_rx)
1007         # 2nd poll sequence should be queued now
1008         # don't send the reply back yet, wait for some time to emulate
1009         # longer round-trip time
1010         packet_count = 0
1011         while time.time() < send_final_after:
1012             self.test_session.send_packet()
1013             p = wait_for_bfd_packet(self)
1014             self.assert_equal(len(self.vapi.collect_events()), 0,
1015                               "number of bfd events")
1016             self.assert_equal(p[BFD].required_min_rx_interval,
1017                               self.vpp_session.required_min_rx,
1018                               "BFD required min rx interval")
1019             packet_count += 1
1020             # poll bit must be set
1021             self.assertIn("P", p.sprintf("%BFD.flags%"),
1022                           "Poll bit not set in BFD packet")
1023         final = self.test_session.create_packet()
1024         final[BFD].flags = "F"
1025         self.test_session.send_packet(final)
1026         # finish 1st with final
1027         poll_sequence_length = time.time() - poll_sequence_start
1028         # vpp must wait for some time before starting new poll sequence
1029         poll_no_2_started = False
1030         for dummy in range(2 * packet_count):
1031             p = wait_for_bfd_packet(self)
1032             self.assert_equal(len(self.vapi.collect_events()), 0,
1033                               "number of bfd events")
1034             if "P" in p.sprintf("%BFD.flags%"):
1035                 poll_no_2_started = True
1036                 if time.time() < poll_sequence_start + poll_sequence_length:
1037                     raise Exception("VPP started 2nd poll sequence too soon")
1038                 final = self.test_session.create_packet()
1039                 final[BFD].flags = "F"
1040                 self.test_session.send_packet(final)
1041                 break
1042             else:
1043                 self.test_session.send_packet()
1044         self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1045         # finish 2nd with final
1046         final = self.test_session.create_packet()
1047         final[BFD].flags = "F"
1048         self.test_session.send_packet(final)
1049         p = wait_for_bfd_packet(self)
1050         # poll bit must not be set
1051         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1052                          "Poll bit set in BFD packet")
1053
1054     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1055     def test_poll_response(self):
1056         """ test correct response to control frame with poll bit set """
1057         bfd_session_up(self)
1058         poll = self.test_session.create_packet()
1059         poll[BFD].flags = "P"
1060         self.test_session.send_packet(poll)
1061         final = wait_for_bfd_packet(
1062             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1063         self.assertIn("F", final.sprintf("%BFD.flags%"))
1064
1065     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1066     def test_no_periodic_if_remote_demand(self):
1067         """ no periodic frames outside poll sequence if remote demand set """
1068         bfd_session_up(self)
1069         demand = self.test_session.create_packet()
1070         demand[BFD].flags = "D"
1071         self.test_session.send_packet(demand)
1072         transmit_time = 0.9 \
1073             * max(self.vpp_session.required_min_rx,
1074                   self.test_session.desired_min_tx) \
1075             / USEC_IN_SEC
1076         count = 0
1077         for dummy in range(self.test_session.detect_mult * 2):
1078             self.sleep(transmit_time)
1079             self.test_session.send_packet(demand)
1080             try:
1081                 p = wait_for_bfd_packet(self, timeout=0)
1082                 self.logger.error(ppp("Received unexpected packet:", p))
1083                 count += 1
1084             except CaptureTimeoutError:
1085                 pass
1086         events = self.vapi.collect_events()
1087         for e in events:
1088             self.logger.error("Received unexpected event: %s", e)
1089         self.assert_equal(count, 0, "number of packets received")
1090         self.assert_equal(len(events), 0, "number of events received")
1091
1092     def test_echo_looped_back(self):
1093         """ echo packets looped back """
1094         # don't need a session in this case..
1095         self.vpp_session.remove_vpp_config()
1096         self.pg0.enable_capture()
1097         echo_packet_count = 10
1098         # random source port low enough to increment a few times..
1099         udp_sport_tx = randint(1, 50000)
1100         udp_sport_rx = udp_sport_tx
1101         echo_packet = (Ether(src=self.pg0.remote_mac,
1102                              dst=self.pg0.local_mac) /
1103                        IP(src=self.pg0.remote_ip4,
1104                           dst=self.pg0.remote_ip4) /
1105                        UDP(dport=BFD.udp_dport_echo) /
1106                        Raw("this should be looped back"))
1107         for dummy in range(echo_packet_count):
1108             self.sleep(.01, "delay between echo packets")
1109             echo_packet[UDP].sport = udp_sport_tx
1110             udp_sport_tx += 1
1111             self.logger.debug(ppp("Sending packet:", echo_packet))
1112             self.pg0.add_stream(echo_packet)
1113             self.pg_start()
1114         for dummy in range(echo_packet_count):
1115             p = self.pg0.wait_for_packet(1)
1116             self.logger.debug(ppp("Got packet:", p))
1117             ether = p[Ether]
1118             self.assert_equal(self.pg0.remote_mac,
1119                               ether.dst, "Destination MAC")
1120             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1121             ip = p[IP]
1122             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1123             self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1124             udp = p[UDP]
1125             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1126                               "UDP destination port")
1127             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1128             udp_sport_rx += 1
1129             # need to compare the hex payload here, otherwise BFD_vpp_echo
1130             # gets in way
1131             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1132                              scapy.compat.raw(echo_packet[UDP].payload),
1133                              "Received packet is not the echo packet sent")
1134         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1135                           "ECHO packet identifier for test purposes)")
1136
1137     def test_echo(self):
1138         """ echo function """
1139         bfd_session_up(self)
1140         self.test_session.update(required_min_echo_rx=150000)
1141         self.test_session.send_packet()
1142         detection_time = self.test_session.detect_mult *\
1143             self.vpp_session.required_min_rx / USEC_IN_SEC
1144         # echo shouldn't work without echo source set
1145         for dummy in range(10):
1146             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1147             self.sleep(sleep, "delay before sending bfd packet")
1148             self.test_session.send_packet()
1149         p = wait_for_bfd_packet(
1150             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1151         self.assert_equal(p[BFD].required_min_rx_interval,
1152                           self.vpp_session.required_min_rx,
1153                           "BFD required min rx interval")
1154         self.test_session.send_packet()
1155         self.vapi.bfd_udp_set_echo_source(
1156             sw_if_index=self.loopback0.sw_if_index)
1157         echo_seen = False
1158         # should be turned on - loopback echo packets
1159         for dummy in range(3):
1160             loop_until = time.time() + 0.75 * detection_time
1161             while time.time() < loop_until:
1162                 p = self.pg0.wait_for_packet(1)
1163                 self.logger.debug(ppp("Got packet:", p))
1164                 if p[UDP].dport == BFD.udp_dport_echo:
1165                     self.assert_equal(
1166                         p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1167                     self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1168                                         "BFD ECHO src IP equal to loopback IP")
1169                     self.logger.debug(ppp("Looping back packet:", p))
1170                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1171                                       "ECHO packet destination MAC address")
1172                     p[Ether].dst = self.pg0.local_mac
1173                     self.pg0.add_stream(p)
1174                     self.pg_start()
1175                     echo_seen = True
1176                 elif p.haslayer(BFD):
1177                     if echo_seen:
1178                         self.assertGreaterEqual(
1179                             p[BFD].required_min_rx_interval,
1180                             1000000)
1181                     if "P" in p.sprintf("%BFD.flags%"):
1182                         final = self.test_session.create_packet()
1183                         final[BFD].flags = "F"
1184                         self.test_session.send_packet(final)
1185                 else:
1186                     raise Exception(ppp("Received unknown packet:", p))
1187
1188                 self.assert_equal(len(self.vapi.collect_events()), 0,
1189                                   "number of bfd events")
1190             self.test_session.send_packet()
1191         self.assertTrue(echo_seen, "No echo packets received")
1192
1193     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1194     def test_echo_fail(self):
1195         """ session goes down if echo function fails """
1196         bfd_session_up(self)
1197         self.test_session.update(required_min_echo_rx=150000)
1198         self.test_session.send_packet()
1199         detection_time = self.test_session.detect_mult *\
1200             self.vpp_session.required_min_rx / USEC_IN_SEC
1201         self.vapi.bfd_udp_set_echo_source(
1202             sw_if_index=self.loopback0.sw_if_index)
1203         # echo function should be used now, but we will drop the echo packets
1204         verified_diag = False
1205         for dummy in range(3):
1206             loop_until = time.time() + 0.75 * detection_time
1207             while time.time() < loop_until:
1208                 p = self.pg0.wait_for_packet(1)
1209                 self.logger.debug(ppp("Got packet:", p))
1210                 if p[UDP].dport == BFD.udp_dport_echo:
1211                     # dropped
1212                     pass
1213                 elif p.haslayer(BFD):
1214                     if "P" in p.sprintf("%BFD.flags%"):
1215                         self.assertGreaterEqual(
1216                             p[BFD].required_min_rx_interval,
1217                             1000000)
1218                         final = self.test_session.create_packet()
1219                         final[BFD].flags = "F"
1220                         self.test_session.send_packet(final)
1221                     if p[BFD].state == BFDState.down:
1222                         self.assert_equal(p[BFD].diag,
1223                                           BFDDiagCode.echo_function_failed,
1224                                           BFDDiagCode)
1225                         verified_diag = True
1226                 else:
1227                     raise Exception(ppp("Received unknown packet:", p))
1228             self.test_session.send_packet()
1229         events = self.vapi.collect_events()
1230         self.assert_equal(len(events), 1, "number of bfd events")
1231         self.assert_equal(events[0].state, BFDState.down, BFDState)
1232         self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1233
1234     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1235     def test_echo_stop(self):
1236         """ echo function stops if peer sets required min echo rx zero """
1237         bfd_session_up(self)
1238         self.test_session.update(required_min_echo_rx=150000)
1239         self.test_session.send_packet()
1240         self.vapi.bfd_udp_set_echo_source(
1241             sw_if_index=self.loopback0.sw_if_index)
1242         # wait for first echo packet
1243         while True:
1244             p = self.pg0.wait_for_packet(1)
1245             self.logger.debug(ppp("Got packet:", p))
1246             if p[UDP].dport == BFD.udp_dport_echo:
1247                 self.logger.debug(ppp("Looping back packet:", p))
1248                 p[Ether].dst = self.pg0.local_mac
1249                 self.pg0.add_stream(p)
1250                 self.pg_start()
1251                 break
1252             elif p.haslayer(BFD):
1253                 # ignore BFD
1254                 pass
1255             else:
1256                 raise Exception(ppp("Received unknown packet:", p))
1257         self.test_session.update(required_min_echo_rx=0)
1258         self.test_session.send_packet()
1259         # echo packets shouldn't arrive anymore
1260         for dummy in range(5):
1261             wait_for_bfd_packet(
1262                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1263             self.test_session.send_packet()
1264             events = self.vapi.collect_events()
1265             self.assert_equal(len(events), 0, "number of bfd events")
1266
1267     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1268     def test_echo_source_removed(self):
1269         """ echo function stops if echo source is removed """
1270         bfd_session_up(self)
1271         self.test_session.update(required_min_echo_rx=150000)
1272         self.test_session.send_packet()
1273         self.vapi.bfd_udp_set_echo_source(
1274             sw_if_index=self.loopback0.sw_if_index)
1275         # wait for first echo packet
1276         while True:
1277             p = self.pg0.wait_for_packet(1)
1278             self.logger.debug(ppp("Got packet:", p))
1279             if p[UDP].dport == BFD.udp_dport_echo:
1280                 self.logger.debug(ppp("Looping back packet:", p))
1281                 p[Ether].dst = self.pg0.local_mac
1282                 self.pg0.add_stream(p)
1283                 self.pg_start()
1284                 break
1285             elif p.haslayer(BFD):
1286                 # ignore BFD
1287                 pass
1288             else:
1289                 raise Exception(ppp("Received unknown packet:", p))
1290         self.vapi.bfd_udp_del_echo_source()
1291         self.test_session.send_packet()
1292         # echo packets shouldn't arrive anymore
1293         for dummy in range(5):
1294             wait_for_bfd_packet(
1295                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1296             self.test_session.send_packet()
1297             events = self.vapi.collect_events()
1298             self.assert_equal(len(events), 0, "number of bfd events")
1299
1300     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1301     def test_stale_echo(self):
1302         """ stale echo packets don't keep a session up """
1303         bfd_session_up(self)
1304         self.test_session.update(required_min_echo_rx=150000)
1305         self.vapi.bfd_udp_set_echo_source(
1306             sw_if_index=self.loopback0.sw_if_index)
1307         self.test_session.send_packet()
1308         # should be turned on - loopback echo packets
1309         echo_packet = None
1310         timeout_at = None
1311         timeout_ok = False
1312         for dummy in range(10 * self.vpp_session.detect_mult):
1313             p = self.pg0.wait_for_packet(1)
1314             if p[UDP].dport == BFD.udp_dport_echo:
1315                 if echo_packet is None:
1316                     self.logger.debug(ppp("Got first echo packet:", p))
1317                     echo_packet = p
1318                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1319                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1320                 else:
1321                     self.logger.debug(ppp("Got followup echo packet:", p))
1322                 self.logger.debug(ppp("Looping back first echo packet:", p))
1323                 echo_packet[Ether].dst = self.pg0.local_mac
1324                 self.pg0.add_stream(echo_packet)
1325                 self.pg_start()
1326             elif p.haslayer(BFD):
1327                 self.logger.debug(ppp("Got packet:", p))
1328                 if "P" in p.sprintf("%BFD.flags%"):
1329                     final = self.test_session.create_packet()
1330                     final[BFD].flags = "F"
1331                     self.test_session.send_packet(final)
1332                 if p[BFD].state == BFDState.down:
1333                     self.assertIsNotNone(
1334                         timeout_at,
1335                         "Session went down before first echo packet received")
1336                     now = time.time()
1337                     self.assertGreaterEqual(
1338                         now, timeout_at,
1339                         "Session timeout at %s, but is expected at %s" %
1340                         (now, timeout_at))
1341                     self.assert_equal(p[BFD].diag,
1342                                       BFDDiagCode.echo_function_failed,
1343                                       BFDDiagCode)
1344                     events = self.vapi.collect_events()
1345                     self.assert_equal(len(events), 1, "number of bfd events")
1346                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1347                     timeout_ok = True
1348                     break
1349             else:
1350                 raise Exception(ppp("Received unknown packet:", p))
1351             self.test_session.send_packet()
1352         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1353
1354     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1355     def test_invalid_echo_checksum(self):
1356         """ echo packets with invalid checksum don't keep a session up """
1357         bfd_session_up(self)
1358         self.test_session.update(required_min_echo_rx=150000)
1359         self.vapi.bfd_udp_set_echo_source(
1360             sw_if_index=self.loopback0.sw_if_index)
1361         self.test_session.send_packet()
1362         # should be turned on - loopback echo packets
1363         timeout_at = None
1364         timeout_ok = False
1365         for dummy in range(10 * self.vpp_session.detect_mult):
1366             p = self.pg0.wait_for_packet(1)
1367             if p[UDP].dport == BFD.udp_dport_echo:
1368                 self.logger.debug(ppp("Got echo packet:", p))
1369                 if timeout_at is None:
1370                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1371                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1372                 p[BFD_vpp_echo].checksum = getrandbits(64)
1373                 p[Ether].dst = self.pg0.local_mac
1374                 self.logger.debug(ppp("Looping back modified echo packet:", p))
1375                 self.pg0.add_stream(p)
1376                 self.pg_start()
1377             elif p.haslayer(BFD):
1378                 self.logger.debug(ppp("Got packet:", p))
1379                 if "P" in p.sprintf("%BFD.flags%"):
1380                     final = self.test_session.create_packet()
1381                     final[BFD].flags = "F"
1382                     self.test_session.send_packet(final)
1383                 if p[BFD].state == BFDState.down:
1384                     self.assertIsNotNone(
1385                         timeout_at,
1386                         "Session went down before first echo packet received")
1387                     now = time.time()
1388                     self.assertGreaterEqual(
1389                         now, timeout_at,
1390                         "Session timeout at %s, but is expected at %s" %
1391                         (now, timeout_at))
1392                     self.assert_equal(p[BFD].diag,
1393                                       BFDDiagCode.echo_function_failed,
1394                                       BFDDiagCode)
1395                     events = self.vapi.collect_events()
1396                     self.assert_equal(len(events), 1, "number of bfd events")
1397                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1398                     timeout_ok = True
1399                     break
1400             else:
1401                 raise Exception(ppp("Received unknown packet:", p))
1402             self.test_session.send_packet()
1403         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1404
1405     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1406     def test_admin_up_down(self):
1407         """ put session admin-up and admin-down """
1408         bfd_session_up(self)
1409         self.vpp_session.admin_down()
1410         self.pg0.enable_capture()
1411         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1412         verify_event(self, e, expected_state=BFDState.admin_down)
1413         for dummy in range(2):
1414             p = wait_for_bfd_packet(self)
1415             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1416         # try to bring session up - shouldn't be possible
1417         self.test_session.update(state=BFDState.init)
1418         self.test_session.send_packet()
1419         for dummy in range(2):
1420             p = wait_for_bfd_packet(self)
1421             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1422         self.vpp_session.admin_up()
1423         self.test_session.update(state=BFDState.down)
1424         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1425         verify_event(self, e, expected_state=BFDState.down)
1426         p = wait_for_bfd_packet(
1427             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1428         self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1429         self.test_session.send_packet()
1430         p = wait_for_bfd_packet(
1431             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1432         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1433         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1434         verify_event(self, e, expected_state=BFDState.init)
1435         self.test_session.update(state=BFDState.up)
1436         self.test_session.send_packet()
1437         p = wait_for_bfd_packet(
1438             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1439         self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1440         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1441         verify_event(self, e, expected_state=BFDState.up)
1442
1443     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1444     def test_config_change_remote_demand(self):
1445         """ configuration change while peer in demand mode """
1446         bfd_session_up(self)
1447         demand = self.test_session.create_packet()
1448         demand[BFD].flags = "D"
1449         self.test_session.send_packet(demand)
1450         self.vpp_session.modify_parameters(
1451             required_min_rx=2 * self.vpp_session.required_min_rx)
1452         p = wait_for_bfd_packet(
1453             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1454         # poll bit must be set
1455         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1456         # terminate poll sequence
1457         final = self.test_session.create_packet()
1458         final[BFD].flags = "D+F"
1459         self.test_session.send_packet(final)
1460         # vpp should be quiet now again
1461         transmit_time = 0.9 \
1462             * max(self.vpp_session.required_min_rx,
1463                   self.test_session.desired_min_tx) \
1464             / USEC_IN_SEC
1465         count = 0
1466         for dummy in range(self.test_session.detect_mult * 2):
1467             self.sleep(transmit_time)
1468             self.test_session.send_packet(demand)
1469             try:
1470                 p = wait_for_bfd_packet(self, timeout=0)
1471                 self.logger.error(ppp("Received unexpected packet:", p))
1472                 count += 1
1473             except CaptureTimeoutError:
1474                 pass
1475         events = self.vapi.collect_events()
1476         for e in events:
1477             self.logger.error("Received unexpected event: %s", e)
1478         self.assert_equal(count, 0, "number of packets received")
1479         self.assert_equal(len(events), 0, "number of events received")
1480
1481     def test_intf_deleted(self):
1482         """ interface with bfd session deleted """
1483         intf = VppLoInterface(self)
1484         intf.config_ip4()
1485         intf.admin_up()
1486         sw_if_index = intf.sw_if_index
1487         vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1488         vpp_session.add_vpp_config()
1489         vpp_session.admin_up()
1490         intf.remove_vpp_config()
1491         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1492         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1493         self.assertFalse(vpp_session.query_vpp_config())
1494
1495
1496 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1497 class BFD6TestCase(VppTestCase):
1498     """Bidirectional Forwarding Detection (BFD) (IPv6) """
1499
1500     pg0 = None
1501     vpp_clock_offset = None
1502     vpp_session = None
1503     test_session = None
1504
1505     @classmethod
1506     def setUpClass(cls):
1507         super(BFD6TestCase, cls).setUpClass()
1508         cls.vapi.cli("set log class bfd level debug")
1509         try:
1510             cls.create_pg_interfaces([0])
1511             cls.pg0.config_ip6()
1512             cls.pg0.configure_ipv6_neighbors()
1513             cls.pg0.admin_up()
1514             cls.pg0.resolve_ndp()
1515             cls.create_loopback_interfaces(1)
1516             cls.loopback0 = cls.lo_interfaces[0]
1517             cls.loopback0.config_ip6()
1518             cls.loopback0.admin_up()
1519
1520         except Exception:
1521             super(BFD6TestCase, cls).tearDownClass()
1522             raise
1523
1524     @classmethod
1525     def tearDownClass(cls):
1526         super(BFD6TestCase, cls).tearDownClass()
1527
1528     def setUp(self):
1529         super(BFD6TestCase, self).setUp()
1530         self.factory = AuthKeyFactory()
1531         self.vapi.want_bfd_events()
1532         self.pg0.enable_capture()
1533         try:
1534             self.vpp_session = VppBFDUDPSession(self, self.pg0,
1535                                                 self.pg0.remote_ip6,
1536                                                 af=AF_INET6)
1537             self.vpp_session.add_vpp_config()
1538             self.vpp_session.admin_up()
1539             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1540             self.logger.debug(self.vapi.cli("show adj nbr"))
1541         except BaseException:
1542             self.vapi.want_bfd_events(enable_disable=0)
1543             raise
1544
1545     def tearDown(self):
1546         if not self.vpp_dead:
1547             self.vapi.want_bfd_events(enable_disable=0)
1548         self.vapi.collect_events()  # clear the event queue
1549         super(BFD6TestCase, self).tearDown()
1550
1551     def test_session_up(self):
1552         """ bring BFD session up """
1553         bfd_session_up(self)
1554
1555     def test_session_up_by_ip(self):
1556         """ bring BFD session up - first frame looked up by address pair """
1557         self.logger.info("BFD: Sending Slow control frame")
1558         self.test_session.update(my_discriminator=randint(0, 40000000))
1559         self.test_session.send_packet()
1560         self.pg0.enable_capture()
1561         p = self.pg0.wait_for_packet(1)
1562         self.assert_equal(p[BFD].your_discriminator,
1563                           self.test_session.my_discriminator,
1564                           "BFD - your discriminator")
1565         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1566         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1567                                  state=BFDState.up)
1568         self.logger.info("BFD: Waiting for event")
1569         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1570         verify_event(self, e, expected_state=BFDState.init)
1571         self.logger.info("BFD: Sending Up")
1572         self.test_session.send_packet()
1573         self.logger.info("BFD: Waiting for event")
1574         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1575         verify_event(self, e, expected_state=BFDState.up)
1576         self.logger.info("BFD: Session is Up")
1577         self.test_session.update(state=BFDState.up)
1578         self.test_session.send_packet()
1579         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1580
1581     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1582     def test_hold_up(self):
1583         """ hold BFD session up """
1584         bfd_session_up(self)
1585         for dummy in range(self.test_session.detect_mult * 2):
1586             wait_for_bfd_packet(self)
1587             self.test_session.send_packet()
1588         self.assert_equal(len(self.vapi.collect_events()), 0,
1589                           "number of bfd events")
1590         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1591
1592     def test_echo_looped_back(self):
1593         """ echo packets looped back """
1594         # don't need a session in this case..
1595         self.vpp_session.remove_vpp_config()
1596         self.pg0.enable_capture()
1597         echo_packet_count = 10
1598         # random source port low enough to increment a few times..
1599         udp_sport_tx = randint(1, 50000)
1600         udp_sport_rx = udp_sport_tx
1601         echo_packet = (Ether(src=self.pg0.remote_mac,
1602                              dst=self.pg0.local_mac) /
1603                        IPv6(src=self.pg0.remote_ip6,
1604                             dst=self.pg0.remote_ip6) /
1605                        UDP(dport=BFD.udp_dport_echo) /
1606                        Raw("this should be looped back"))
1607         for dummy in range(echo_packet_count):
1608             self.sleep(.01, "delay between echo packets")
1609             echo_packet[UDP].sport = udp_sport_tx
1610             udp_sport_tx += 1
1611             self.logger.debug(ppp("Sending packet:", echo_packet))
1612             self.pg0.add_stream(echo_packet)
1613             self.pg_start()
1614         for dummy in range(echo_packet_count):
1615             p = self.pg0.wait_for_packet(1)
1616             self.logger.debug(ppp("Got packet:", p))
1617             ether = p[Ether]
1618             self.assert_equal(self.pg0.remote_mac,
1619                               ether.dst, "Destination MAC")
1620             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1621             ip = p[IPv6]
1622             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1623             self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1624             udp = p[UDP]
1625             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1626                               "UDP destination port")
1627             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1628             udp_sport_rx += 1
1629             # need to compare the hex payload here, otherwise BFD_vpp_echo
1630             # gets in way
1631             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1632                              scapy.compat.raw(echo_packet[UDP].payload),
1633                              "Received packet is not the echo packet sent")
1634         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1635                           "ECHO packet identifier for test purposes)")
1636         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1637                           "ECHO packet identifier for test purposes)")
1638
1639     def test_echo(self):
1640         """ echo function """
1641         bfd_session_up(self)
1642         self.test_session.update(required_min_echo_rx=150000)
1643         self.test_session.send_packet()
1644         detection_time = self.test_session.detect_mult *\
1645             self.vpp_session.required_min_rx / USEC_IN_SEC
1646         # echo shouldn't work without echo source set
1647         for dummy in range(10):
1648             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1649             self.sleep(sleep, "delay before sending bfd packet")
1650             self.test_session.send_packet()
1651         p = wait_for_bfd_packet(
1652             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1653         self.assert_equal(p[BFD].required_min_rx_interval,
1654                           self.vpp_session.required_min_rx,
1655                           "BFD required min rx interval")
1656         self.test_session.send_packet()
1657         self.vapi.bfd_udp_set_echo_source(
1658             sw_if_index=self.loopback0.sw_if_index)
1659         echo_seen = False
1660         # should be turned on - loopback echo packets
1661         for dummy in range(3):
1662             loop_until = time.time() + 0.75 * detection_time
1663             while time.time() < loop_until:
1664                 p = self.pg0.wait_for_packet(1)
1665                 self.logger.debug(ppp("Got packet:", p))
1666                 if p[UDP].dport == BFD.udp_dport_echo:
1667                     self.assert_equal(
1668                         p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1669                     self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1670                                         "BFD ECHO src IP equal to loopback IP")
1671                     self.logger.debug(ppp("Looping back packet:", p))
1672                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1673                                       "ECHO packet destination MAC address")
1674                     p[Ether].dst = self.pg0.local_mac
1675                     self.pg0.add_stream(p)
1676                     self.pg_start()
1677                     echo_seen = True
1678                 elif p.haslayer(BFD):
1679                     if echo_seen:
1680                         self.assertGreaterEqual(
1681                             p[BFD].required_min_rx_interval,
1682                             1000000)
1683                     if "P" in p.sprintf("%BFD.flags%"):
1684                         final = self.test_session.create_packet()
1685                         final[BFD].flags = "F"
1686                         self.test_session.send_packet(final)
1687                 else:
1688                     raise Exception(ppp("Received unknown packet:", p))
1689
1690                 self.assert_equal(len(self.vapi.collect_events()), 0,
1691                                   "number of bfd events")
1692             self.test_session.send_packet()
1693         self.assertTrue(echo_seen, "No echo packets received")
1694
1695     def test_intf_deleted(self):
1696         """ interface with bfd session deleted """
1697         intf = VppLoInterface(self)
1698         intf.config_ip6()
1699         intf.admin_up()
1700         sw_if_index = intf.sw_if_index
1701         vpp_session = VppBFDUDPSession(
1702             self, intf, intf.remote_ip6, af=AF_INET6)
1703         vpp_session.add_vpp_config()
1704         vpp_session.admin_up()
1705         intf.remove_vpp_config()
1706         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1707         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1708         self.assertFalse(vpp_session.query_vpp_config())
1709
1710
1711 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1712 class BFDFIBTestCase(VppTestCase):
1713     """ BFD-FIB interactions (IPv6) """
1714
1715     vpp_session = None
1716     test_session = None
1717
1718     @classmethod
1719     def setUpClass(cls):
1720         super(BFDFIBTestCase, cls).setUpClass()
1721
1722     @classmethod
1723     def tearDownClass(cls):
1724         super(BFDFIBTestCase, cls).tearDownClass()
1725
1726     def setUp(self):
1727         super(BFDFIBTestCase, self).setUp()
1728         self.create_pg_interfaces(range(1))
1729
1730         self.vapi.want_bfd_events()
1731         self.pg0.enable_capture()
1732
1733         for i in self.pg_interfaces:
1734             i.admin_up()
1735             i.config_ip6()
1736             i.configure_ipv6_neighbors()
1737
1738     def tearDown(self):
1739         if not self.vpp_dead:
1740             self.vapi.want_bfd_events(enable_disable=False)
1741
1742         super(BFDFIBTestCase, self).tearDown()
1743
1744     @staticmethod
1745     def pkt_is_not_data_traffic(p):
1746         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1747         if p.haslayer(BFD) or is_ipv6_misc(p):
1748             return True
1749         return False
1750
1751     def test_session_with_fib(self):
1752         """ BFD-FIB interactions """
1753
1754         # packets to match against both of the routes
1755         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1756               IPv6(src="3001::1", dst="2001::1") /
1757               UDP(sport=1234, dport=1234) /
1758               Raw('\xa5' * 100)),
1759              (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1760               IPv6(src="3001::1", dst="2002::1") /
1761               UDP(sport=1234, dport=1234) /
1762               Raw('\xa5' * 100))]
1763
1764         # A recursive and a non-recursive route via a next-hop that
1765         # will have a BFD session
1766         ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1767                                   [VppRoutePath(self.pg0.remote_ip6,
1768                                                 self.pg0.sw_if_index)])
1769         ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1770                                   [VppRoutePath(self.pg0.remote_ip6,
1771                                                 0xffffffff)])
1772         ip_2001_s_64.add_vpp_config()
1773         ip_2002_s_64.add_vpp_config()
1774
1775         # bring the session up now the routes are present
1776         self.vpp_session = VppBFDUDPSession(self,
1777                                             self.pg0,
1778                                             self.pg0.remote_ip6,
1779                                             af=AF_INET6)
1780         self.vpp_session.add_vpp_config()
1781         self.vpp_session.admin_up()
1782         self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1783
1784         # session is up - traffic passes
1785         bfd_session_up(self)
1786
1787         self.pg0.add_stream(p)
1788         self.pg_start()
1789         for packet in p:
1790             captured = self.pg0.wait_for_packet(
1791                 1,
1792                 filter_out_fn=self.pkt_is_not_data_traffic)
1793             self.assertEqual(captured[IPv6].dst,
1794                              packet[IPv6].dst)
1795
1796         # session is up - traffic is dropped
1797         bfd_session_down(self)
1798
1799         self.pg0.add_stream(p)
1800         self.pg_start()
1801         with self.assertRaises(CaptureTimeoutError):
1802             self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1803
1804         # session is up - traffic passes
1805         bfd_session_up(self)
1806
1807         self.pg0.add_stream(p)
1808         self.pg_start()
1809         for packet in p:
1810             captured = self.pg0.wait_for_packet(
1811                 1,
1812                 filter_out_fn=self.pkt_is_not_data_traffic)
1813             self.assertEqual(captured[IPv6].dst,
1814                              packet[IPv6].dst)
1815
1816
1817 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1818 class BFDTunTestCase(VppTestCase):
1819     """ BFD over GRE tunnel """
1820
1821     vpp_session = None
1822     test_session = None
1823
1824     @classmethod
1825     def setUpClass(cls):
1826         super(BFDTunTestCase, cls).setUpClass()
1827
1828     @classmethod
1829     def tearDownClass(cls):
1830         super(BFDTunTestCase, cls).tearDownClass()
1831
1832     def setUp(self):
1833         super(BFDTunTestCase, self).setUp()
1834         self.create_pg_interfaces(range(1))
1835
1836         self.vapi.want_bfd_events()
1837         self.pg0.enable_capture()
1838
1839         for i in self.pg_interfaces:
1840             i.admin_up()
1841             i.config_ip4()
1842             i.resolve_arp()
1843
1844     def tearDown(self):
1845         if not self.vpp_dead:
1846             self.vapi.want_bfd_events(enable_disable=0)
1847
1848         super(BFDTunTestCase, self).tearDown()
1849
1850     @staticmethod
1851     def pkt_is_not_data_traffic(p):
1852         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1853         if p.haslayer(BFD) or is_ipv6_misc(p):
1854             return True
1855         return False
1856
1857     def test_bfd_o_gre(self):
1858         """ BFD-o-GRE  """
1859
1860         # A GRE interface over which to run a BFD session
1861         gre_if = VppGreInterface(self,
1862                                  self.pg0.local_ip4,
1863                                  self.pg0.remote_ip4)
1864         gre_if.add_vpp_config()
1865         gre_if.admin_up()
1866         gre_if.config_ip4()
1867
1868         # bring the session up now the routes are present
1869         self.vpp_session = VppBFDUDPSession(self,
1870                                             gre_if,
1871                                             gre_if.remote_ip4,
1872                                             is_tunnel=True)
1873         self.vpp_session.add_vpp_config()
1874         self.vpp_session.admin_up()
1875
1876         self.test_session = BFDTestSession(
1877             self, gre_if, AF_INET,
1878             tunnel_header=(IP(src=self.pg0.remote_ip4,
1879                               dst=self.pg0.local_ip4) /
1880                            GRE()),
1881             phy_interface=self.pg0)
1882
1883         # packets to match against both of the routes
1884         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1885               IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4) /
1886               UDP(sport=1234, dport=1234) /
1887               Raw('\xa5' * 100))]
1888
1889         # session is up - traffic passes
1890         bfd_session_up(self)
1891
1892         self.send_and_expect(self.pg0, p, self.pg0)
1893
1894         # bring session down
1895         bfd_session_down(self)
1896
1897
1898 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1899 class BFDSHA1TestCase(VppTestCase):
1900     """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1901
1902     pg0 = None
1903     vpp_clock_offset = None
1904     vpp_session = None
1905     test_session = None
1906
1907     @classmethod
1908     def setUpClass(cls):
1909         super(BFDSHA1TestCase, cls).setUpClass()
1910         cls.vapi.cli("set log class bfd level debug")
1911         try:
1912             cls.create_pg_interfaces([0])
1913             cls.pg0.config_ip4()
1914             cls.pg0.admin_up()
1915             cls.pg0.resolve_arp()
1916
1917         except Exception:
1918             super(BFDSHA1TestCase, cls).tearDownClass()
1919             raise
1920
1921     @classmethod
1922     def tearDownClass(cls):
1923         super(BFDSHA1TestCase, cls).tearDownClass()
1924
1925     def setUp(self):
1926         super(BFDSHA1TestCase, self).setUp()
1927         self.factory = AuthKeyFactory()
1928         self.vapi.want_bfd_events()
1929         self.pg0.enable_capture()
1930
1931     def tearDown(self):
1932         if not self.vpp_dead:
1933             self.vapi.want_bfd_events(enable_disable=False)
1934         self.vapi.collect_events()  # clear the event queue
1935         super(BFDSHA1TestCase, self).tearDown()
1936
1937     def test_session_up(self):
1938         """ bring BFD session up """
1939         key = self.factory.create_random_key(self)
1940         key.add_vpp_config()
1941         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1942                                             self.pg0.remote_ip4,
1943                                             sha1_key=key)
1944         self.vpp_session.add_vpp_config()
1945         self.vpp_session.admin_up()
1946         self.test_session = BFDTestSession(
1947             self, self.pg0, AF_INET, sha1_key=key,
1948             bfd_key_id=self.vpp_session.bfd_key_id)
1949         bfd_session_up(self)
1950
1951     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1952     def test_hold_up(self):
1953         """ hold BFD session up """
1954         key = self.factory.create_random_key(self)
1955         key.add_vpp_config()
1956         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1957                                             self.pg0.remote_ip4,
1958                                             sha1_key=key)
1959         self.vpp_session.add_vpp_config()
1960         self.vpp_session.admin_up()
1961         self.test_session = BFDTestSession(
1962             self, self.pg0, AF_INET, sha1_key=key,
1963             bfd_key_id=self.vpp_session.bfd_key_id)
1964         bfd_session_up(self)
1965         for dummy in range(self.test_session.detect_mult * 2):
1966             wait_for_bfd_packet(self)
1967             self.test_session.send_packet()
1968         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1969
1970     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1971     def test_hold_up_meticulous(self):
1972         """ hold BFD session up - meticulous auth """
1973         key = self.factory.create_random_key(
1974             self, BFDAuthType.meticulous_keyed_sha1)
1975         key.add_vpp_config()
1976         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1977                                             self.pg0.remote_ip4, sha1_key=key)
1978         self.vpp_session.add_vpp_config()
1979         self.vpp_session.admin_up()
1980         # specify sequence number so that it wraps
1981         self.test_session = BFDTestSession(
1982             self, self.pg0, AF_INET, sha1_key=key,
1983             bfd_key_id=self.vpp_session.bfd_key_id,
1984             our_seq_number=0xFFFFFFFF - 4)
1985         bfd_session_up(self)
1986         for dummy in range(30):
1987             wait_for_bfd_packet(self)
1988             self.test_session.inc_seq_num()
1989             self.test_session.send_packet()
1990         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1991
1992     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1993     def test_send_bad_seq_number(self):
1994         """ session is not kept alive by msgs with bad sequence numbers"""
1995         key = self.factory.create_random_key(
1996             self, BFDAuthType.meticulous_keyed_sha1)
1997         key.add_vpp_config()
1998         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1999                                             self.pg0.remote_ip4, sha1_key=key)
2000         self.vpp_session.add_vpp_config()
2001         self.test_session = BFDTestSession(
2002             self, self.pg0, AF_INET, sha1_key=key,
2003             bfd_key_id=self.vpp_session.bfd_key_id)
2004         bfd_session_up(self)
2005         detection_time = self.test_session.detect_mult *\
2006             self.vpp_session.required_min_rx / USEC_IN_SEC
2007         send_until = time.time() + 2 * detection_time
2008         while time.time() < send_until:
2009             self.test_session.send_packet()
2010             self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
2011                        "time between bfd packets")
2012         e = self.vapi.collect_events()
2013         # session should be down now, because the sequence numbers weren't
2014         # updated
2015         self.assert_equal(len(e), 1, "number of bfd events")
2016         verify_event(self, e[0], expected_state=BFDState.down)
2017
2018     def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
2019                                        legitimate_test_session,
2020                                        rogue_test_session,
2021                                        rogue_bfd_values=None):
2022         """ execute a rogue session interaction scenario
2023
2024         1. create vpp session, add config
2025         2. bring the legitimate session up
2026         3. copy the bfd values from legitimate session to rogue session
2027         4. apply rogue_bfd_values to rogue session
2028         5. set rogue session state to down
2029         6. send message to take the session down from the rogue session
2030         7. assert that the legitimate session is unaffected
2031         """
2032
2033         self.vpp_session = vpp_bfd_udp_session
2034         self.vpp_session.add_vpp_config()
2035         self.test_session = legitimate_test_session
2036         # bring vpp session up
2037         bfd_session_up(self)
2038         # send packet from rogue session
2039         rogue_test_session.update(
2040             my_discriminator=self.test_session.my_discriminator,
2041             your_discriminator=self.test_session.your_discriminator,
2042             desired_min_tx=self.test_session.desired_min_tx,
2043             required_min_rx=self.test_session.required_min_rx,
2044             detect_mult=self.test_session.detect_mult,
2045             diag=self.test_session.diag,
2046             state=self.test_session.state,
2047             auth_type=self.test_session.auth_type)
2048         if rogue_bfd_values:
2049             rogue_test_session.update(**rogue_bfd_values)
2050         rogue_test_session.update(state=BFDState.down)
2051         rogue_test_session.send_packet()
2052         wait_for_bfd_packet(self)
2053         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2054
2055     @unittest.skipUnless(running_extended_tests, "part of extended tests")
2056     def test_mismatch_auth(self):
2057         """ session is not brought down by unauthenticated msg """
2058         key = self.factory.create_random_key(self)
2059         key.add_vpp_config()
2060         vpp_session = VppBFDUDPSession(
2061             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2062         legitimate_test_session = BFDTestSession(
2063             self, self.pg0, AF_INET, sha1_key=key,
2064             bfd_key_id=vpp_session.bfd_key_id)
2065         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2066         self.execute_rogue_session_scenario(vpp_session,
2067                                             legitimate_test_session,
2068                                             rogue_test_session)
2069
2070     @unittest.skipUnless(running_extended_tests, "part of extended tests")
2071     def test_mismatch_bfd_key_id(self):
2072         """ session is not brought down by msg with non-existent key-id """
2073         key = self.factory.create_random_key(self)
2074         key.add_vpp_config()
2075         vpp_session = VppBFDUDPSession(
2076             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2077         # pick a different random bfd key id
2078         x = randint(0, 255)
2079         while x == vpp_session.bfd_key_id:
2080             x = randint(0, 255)
2081         legitimate_test_session = BFDTestSession(
2082             self, self.pg0, AF_INET, sha1_key=key,
2083             bfd_key_id=vpp_session.bfd_key_id)
2084         rogue_test_session = BFDTestSession(
2085             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
2086         self.execute_rogue_session_scenario(vpp_session,
2087                                             legitimate_test_session,
2088                                             rogue_test_session)
2089
2090     @unittest.skipUnless(running_extended_tests, "part of extended tests")
2091     def test_mismatched_auth_type(self):
2092         """ session is not brought down by msg with wrong auth type """
2093         key = self.factory.create_random_key(self)
2094         key.add_vpp_config()
2095         vpp_session = VppBFDUDPSession(
2096             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2097         legitimate_test_session = BFDTestSession(
2098             self, self.pg0, AF_INET, sha1_key=key,
2099             bfd_key_id=vpp_session.bfd_key_id)
2100         rogue_test_session = BFDTestSession(
2101             self, self.pg0, AF_INET, sha1_key=key,
2102             bfd_key_id=vpp_session.bfd_key_id)
2103         self.execute_rogue_session_scenario(
2104             vpp_session, legitimate_test_session, rogue_test_session,
2105             {'auth_type': BFDAuthType.keyed_md5})
2106
2107     @unittest.skipUnless(running_extended_tests, "part of extended tests")
2108     def test_restart(self):
2109         """ simulate remote peer restart and resynchronization """
2110         key = self.factory.create_random_key(
2111             self, BFDAuthType.meticulous_keyed_sha1)
2112         key.add_vpp_config()
2113         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2114                                             self.pg0.remote_ip4, sha1_key=key)
2115         self.vpp_session.add_vpp_config()
2116         self.test_session = BFDTestSession(
2117             self, self.pg0, AF_INET, sha1_key=key,
2118             bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2119         bfd_session_up(self)
2120         # don't send any packets for 2*detection_time
2121         detection_time = self.test_session.detect_mult *\
2122             self.vpp_session.required_min_rx / USEC_IN_SEC
2123         self.sleep(2 * detection_time, "simulating peer restart")
2124         events = self.vapi.collect_events()
2125         self.assert_equal(len(events), 1, "number of bfd events")
2126         verify_event(self, events[0], expected_state=BFDState.down)
2127         self.test_session.update(state=BFDState.down)
2128         # reset sequence number
2129         self.test_session.our_seq_number = 0
2130         self.test_session.vpp_seq_number = None
2131         # now throw away any pending packets
2132         self.pg0.enable_capture()
2133         bfd_session_up(self)
2134
2135
2136 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2137 class BFDAuthOnOffTestCase(VppTestCase):
2138     """Bidirectional Forwarding Detection (BFD) (changing auth) """
2139
2140     pg0 = None
2141     vpp_session = None
2142     test_session = None
2143
2144     @classmethod
2145     def setUpClass(cls):
2146         super(BFDAuthOnOffTestCase, cls).setUpClass()
2147         cls.vapi.cli("set log class bfd level debug")
2148         try:
2149             cls.create_pg_interfaces([0])
2150             cls.pg0.config_ip4()
2151             cls.pg0.admin_up()
2152             cls.pg0.resolve_arp()
2153
2154         except Exception:
2155             super(BFDAuthOnOffTestCase, cls).tearDownClass()
2156             raise
2157
2158     @classmethod
2159     def tearDownClass(cls):
2160         super(BFDAuthOnOffTestCase, cls).tearDownClass()
2161
2162     def setUp(self):
2163         super(BFDAuthOnOffTestCase, self).setUp()
2164         self.factory = AuthKeyFactory()
2165         self.vapi.want_bfd_events()
2166         self.pg0.enable_capture()
2167
2168     def tearDown(self):
2169         if not self.vpp_dead:
2170             self.vapi.want_bfd_events(enable_disable=False)
2171         self.vapi.collect_events()  # clear the event queue
2172         super(BFDAuthOnOffTestCase, self).tearDown()
2173
2174     def test_auth_on_immediate(self):
2175         """ turn auth on without disturbing session state (immediate) """
2176         key = self.factory.create_random_key(self)
2177         key.add_vpp_config()
2178         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2179                                             self.pg0.remote_ip4)
2180         self.vpp_session.add_vpp_config()
2181         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2182         bfd_session_up(self)
2183         for dummy in range(self.test_session.detect_mult * 2):
2184             p = wait_for_bfd_packet(self)
2185             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2186             self.test_session.send_packet()
2187         self.vpp_session.activate_auth(key)
2188         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2189         self.test_session.sha1_key = key
2190         for dummy in range(self.test_session.detect_mult * 2):
2191             p = wait_for_bfd_packet(self)
2192             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2193             self.test_session.send_packet()
2194         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2195         self.assert_equal(len(self.vapi.collect_events()), 0,
2196                           "number of bfd events")
2197
2198     def test_auth_off_immediate(self):
2199         """ turn auth off without disturbing session state (immediate) """
2200         key = self.factory.create_random_key(self)
2201         key.add_vpp_config()
2202         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2203                                             self.pg0.remote_ip4, sha1_key=key)
2204         self.vpp_session.add_vpp_config()
2205         self.test_session = BFDTestSession(
2206             self, self.pg0, AF_INET, sha1_key=key,
2207             bfd_key_id=self.vpp_session.bfd_key_id)
2208         bfd_session_up(self)
2209         # self.vapi.want_bfd_events(enable_disable=0)
2210         for dummy in range(self.test_session.detect_mult * 2):
2211             p = wait_for_bfd_packet(self)
2212             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2213             self.test_session.inc_seq_num()
2214             self.test_session.send_packet()
2215         self.vpp_session.deactivate_auth()
2216         self.test_session.bfd_key_id = None
2217         self.test_session.sha1_key = None
2218         for dummy in range(self.test_session.detect_mult * 2):
2219             p = wait_for_bfd_packet(self)
2220             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2221             self.test_session.inc_seq_num()
2222             self.test_session.send_packet()
2223         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2224         self.assert_equal(len(self.vapi.collect_events()), 0,
2225                           "number of bfd events")
2226
2227     def test_auth_change_key_immediate(self):
2228         """ change auth key without disturbing session state (immediate) """
2229         key1 = self.factory.create_random_key(self)
2230         key1.add_vpp_config()
2231         key2 = self.factory.create_random_key(self)
2232         key2.add_vpp_config()
2233         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2234                                             self.pg0.remote_ip4, sha1_key=key1)
2235         self.vpp_session.add_vpp_config()
2236         self.test_session = BFDTestSession(
2237             self, self.pg0, AF_INET, sha1_key=key1,
2238             bfd_key_id=self.vpp_session.bfd_key_id)
2239         bfd_session_up(self)
2240         for dummy in range(self.test_session.detect_mult * 2):
2241             p = wait_for_bfd_packet(self)
2242             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2243             self.test_session.send_packet()
2244         self.vpp_session.activate_auth(key2)
2245         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2246         self.test_session.sha1_key = key2
2247         for dummy in range(self.test_session.detect_mult * 2):
2248             p = wait_for_bfd_packet(self)
2249             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2250             self.test_session.send_packet()
2251         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2252         self.assert_equal(len(self.vapi.collect_events()), 0,
2253                           "number of bfd events")
2254
2255     def test_auth_on_delayed(self):
2256         """ turn auth on without disturbing session state (delayed) """
2257         key = self.factory.create_random_key(self)
2258         key.add_vpp_config()
2259         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2260                                             self.pg0.remote_ip4)
2261         self.vpp_session.add_vpp_config()
2262         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2263         bfd_session_up(self)
2264         for dummy in range(self.test_session.detect_mult * 2):
2265             wait_for_bfd_packet(self)
2266             self.test_session.send_packet()
2267         self.vpp_session.activate_auth(key, delayed=True)
2268         for dummy in range(self.test_session.detect_mult * 2):
2269             p = wait_for_bfd_packet(self)
2270             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2271             self.test_session.send_packet()
2272         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2273         self.test_session.sha1_key = key
2274         self.test_session.send_packet()
2275         for dummy in range(self.test_session.detect_mult * 2):
2276             p = wait_for_bfd_packet(self)
2277             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2278             self.test_session.send_packet()
2279         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2280         self.assert_equal(len(self.vapi.collect_events()), 0,
2281                           "number of bfd events")
2282
2283     def test_auth_off_delayed(self):
2284         """ turn auth off without disturbing session state (delayed) """
2285         key = self.factory.create_random_key(self)
2286         key.add_vpp_config()
2287         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2288                                             self.pg0.remote_ip4, sha1_key=key)
2289         self.vpp_session.add_vpp_config()
2290         self.test_session = BFDTestSession(
2291             self, self.pg0, AF_INET, sha1_key=key,
2292             bfd_key_id=self.vpp_session.bfd_key_id)
2293         bfd_session_up(self)
2294         for dummy in range(self.test_session.detect_mult * 2):
2295             p = wait_for_bfd_packet(self)
2296             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2297             self.test_session.send_packet()
2298         self.vpp_session.deactivate_auth(delayed=True)
2299         for dummy in range(self.test_session.detect_mult * 2):
2300             p = wait_for_bfd_packet(self)
2301             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2302             self.test_session.send_packet()
2303         self.test_session.bfd_key_id = None
2304         self.test_session.sha1_key = None
2305         self.test_session.send_packet()
2306         for dummy in range(self.test_session.detect_mult * 2):
2307             p = wait_for_bfd_packet(self)
2308             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2309             self.test_session.send_packet()
2310         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2311         self.assert_equal(len(self.vapi.collect_events()), 0,
2312                           "number of bfd events")
2313
2314     def test_auth_change_key_delayed(self):
2315         """ change auth key without disturbing session state (delayed) """
2316         key1 = self.factory.create_random_key(self)
2317         key1.add_vpp_config()
2318         key2 = self.factory.create_random_key(self)
2319         key2.add_vpp_config()
2320         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2321                                             self.pg0.remote_ip4, sha1_key=key1)
2322         self.vpp_session.add_vpp_config()
2323         self.vpp_session.admin_up()
2324         self.test_session = BFDTestSession(
2325             self, self.pg0, AF_INET, sha1_key=key1,
2326             bfd_key_id=self.vpp_session.bfd_key_id)
2327         bfd_session_up(self)
2328         for dummy in range(self.test_session.detect_mult * 2):
2329             p = wait_for_bfd_packet(self)
2330             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2331             self.test_session.send_packet()
2332         self.vpp_session.activate_auth(key2, delayed=True)
2333         for dummy in range(self.test_session.detect_mult * 2):
2334             p = wait_for_bfd_packet(self)
2335             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2336             self.test_session.send_packet()
2337         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2338         self.test_session.sha1_key = key2
2339         self.test_session.send_packet()
2340         for dummy in range(self.test_session.detect_mult * 2):
2341             p = wait_for_bfd_packet(self)
2342             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2343             self.test_session.send_packet()
2344         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2345         self.assert_equal(len(self.vapi.collect_events()), 0,
2346                           "number of bfd events")
2347
2348
2349 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2350 class BFDCLITestCase(VppTestCase):
2351     """Bidirectional Forwarding Detection (BFD) (CLI) """
2352     pg0 = None
2353
2354     @classmethod
2355     def setUpClass(cls):
2356         super(BFDCLITestCase, cls).setUpClass()
2357         cls.vapi.cli("set log class bfd level debug")
2358         try:
2359             cls.create_pg_interfaces((0,))
2360             cls.pg0.config_ip4()
2361             cls.pg0.config_ip6()
2362             cls.pg0.resolve_arp()
2363             cls.pg0.resolve_ndp()
2364
2365         except Exception:
2366             super(BFDCLITestCase, cls).tearDownClass()
2367             raise
2368
2369     @classmethod
2370     def tearDownClass(cls):
2371         super(BFDCLITestCase, cls).tearDownClass()
2372
2373     def setUp(self):
2374         super(BFDCLITestCase, self).setUp()
2375         self.factory = AuthKeyFactory()
2376         self.pg0.enable_capture()
2377
2378     def tearDown(self):
2379         try:
2380             self.vapi.want_bfd_events(enable_disable=False)
2381         except UnexpectedApiReturnValueError:
2382             # some tests aren't subscribed, so this is not an issue
2383             pass
2384         self.vapi.collect_events()  # clear the event queue
2385         super(BFDCLITestCase, self).tearDown()
2386
2387     def cli_verify_no_response(self, cli):
2388         """ execute a CLI, asserting that the response is empty """
2389         self.assert_equal(self.vapi.cli(cli),
2390                           b"",
2391                           "CLI command response")
2392
2393     def cli_verify_response(self, cli, expected):
2394         """ execute a CLI, asserting that the response matches expectation """
2395         try:
2396             reply = self.vapi.cli(cli)
2397         except CliFailedCommandError as cli_error:
2398             reply = str(cli_error)
2399         self.assert_equal(reply.strip(),
2400                           expected,
2401                           "CLI command response")
2402
2403     def test_show(self):
2404         """ show commands """
2405         k1 = self.factory.create_random_key(self)
2406         k1.add_vpp_config()
2407         k2 = self.factory.create_random_key(
2408             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2409         k2.add_vpp_config()
2410         s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2411         s1.add_vpp_config()
2412         s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2413                               sha1_key=k2)
2414         s2.add_vpp_config()
2415         self.logger.info(self.vapi.ppcli("show bfd keys"))
2416         self.logger.info(self.vapi.ppcli("show bfd sessions"))
2417         self.logger.info(self.vapi.ppcli("show bfd"))
2418
2419     def test_set_del_sha1_key(self):
2420         """ set/delete SHA1 auth key """
2421         k = self.factory.create_random_key(self)
2422         self.registry.register(k, self.logger)
2423         self.cli_verify_no_response(
2424             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2425             (k.conf_key_id,
2426                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2427         self.assertTrue(k.query_vpp_config())
2428         self.vpp_session = VppBFDUDPSession(
2429             self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2430         self.vpp_session.add_vpp_config()
2431         self.test_session = \
2432             BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2433                            bfd_key_id=self.vpp_session.bfd_key_id)
2434         self.vapi.want_bfd_events()
2435         bfd_session_up(self)
2436         bfd_session_down(self)
2437         # try to replace the secret for the key - should fail because the key
2438         # is in-use
2439         k2 = self.factory.create_random_key(self)
2440         self.cli_verify_response(
2441             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2442             (k.conf_key_id,
2443                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2444             "bfd key set: `bfd_auth_set_key' API call failed, "
2445             "rv=-103:BFD object in use")
2446         # manipulating the session using old secret should still work
2447         bfd_session_up(self)
2448         bfd_session_down(self)
2449         self.vpp_session.remove_vpp_config()
2450         self.cli_verify_no_response(
2451             "bfd key del conf-key-id %s" % k.conf_key_id)
2452         self.assertFalse(k.query_vpp_config())
2453
2454     def test_set_del_meticulous_sha1_key(self):
2455         """ set/delete meticulous SHA1 auth key """
2456         k = self.factory.create_random_key(
2457             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2458         self.registry.register(k, self.logger)
2459         self.cli_verify_no_response(
2460             "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2461             (k.conf_key_id,
2462                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2463         self.assertTrue(k.query_vpp_config())
2464         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2465                                             self.pg0.remote_ip6, af=AF_INET6,
2466                                             sha1_key=k)
2467         self.vpp_session.add_vpp_config()
2468         self.vpp_session.admin_up()
2469         self.test_session = \
2470             BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2471                            bfd_key_id=self.vpp_session.bfd_key_id)
2472         self.vapi.want_bfd_events()
2473         bfd_session_up(self)
2474         bfd_session_down(self)
2475         # try to replace the secret for the key - should fail because the key
2476         # is in-use
2477         k2 = self.factory.create_random_key(self)
2478         self.cli_verify_response(
2479             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2480             (k.conf_key_id,
2481                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2482             "bfd key set: `bfd_auth_set_key' API call failed, "
2483             "rv=-103:BFD object in use")
2484         # manipulating the session using old secret should still work
2485         bfd_session_up(self)
2486         bfd_session_down(self)
2487         self.vpp_session.remove_vpp_config()
2488         self.cli_verify_no_response(
2489             "bfd key del conf-key-id %s" % k.conf_key_id)
2490         self.assertFalse(k.query_vpp_config())
2491
2492     def test_add_mod_del_bfd_udp(self):
2493         """ create/modify/delete IPv4 BFD UDP session """
2494         vpp_session = VppBFDUDPSession(
2495             self, self.pg0, self.pg0.remote_ip4)
2496         self.registry.register(vpp_session, self.logger)
2497         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2498             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2499             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2500                                 self.pg0.remote_ip4,
2501                                 vpp_session.desired_min_tx,
2502                                 vpp_session.required_min_rx,
2503                                 vpp_session.detect_mult)
2504         self.cli_verify_no_response(cli_add_cmd)
2505         # 2nd add should fail
2506         self.cli_verify_response(
2507             cli_add_cmd,
2508             "bfd udp session add: `bfd_add_add_session' API call"
2509             " failed, rv=-101:Duplicate BFD object")
2510         verify_bfd_session_config(self, vpp_session)
2511         mod_session = VppBFDUDPSession(
2512             self, self.pg0, self.pg0.remote_ip4,
2513             required_min_rx=2 * vpp_session.required_min_rx,
2514             desired_min_tx=3 * vpp_session.desired_min_tx,
2515             detect_mult=4 * vpp_session.detect_mult)
2516         self.cli_verify_no_response(
2517             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2518             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2519             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2520              mod_session.desired_min_tx, mod_session.required_min_rx,
2521              mod_session.detect_mult))
2522         verify_bfd_session_config(self, mod_session)
2523         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2524             "peer-addr %s" % (self.pg0.name,
2525                               self.pg0.local_ip4, self.pg0.remote_ip4)
2526         self.cli_verify_no_response(cli_del_cmd)
2527         # 2nd del is expected to fail
2528         self.cli_verify_response(
2529             cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2530             " failed, rv=-102:No such BFD object")
2531         self.assertFalse(vpp_session.query_vpp_config())
2532
2533     def test_add_mod_del_bfd_udp6(self):
2534         """ create/modify/delete IPv6 BFD UDP session """
2535         vpp_session = VppBFDUDPSession(
2536             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2537         self.registry.register(vpp_session, self.logger)
2538         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2539             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2540             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2541                                 self.pg0.remote_ip6,
2542                                 vpp_session.desired_min_tx,
2543                                 vpp_session.required_min_rx,
2544                                 vpp_session.detect_mult)
2545         self.cli_verify_no_response(cli_add_cmd)
2546         # 2nd add should fail
2547         self.cli_verify_response(
2548             cli_add_cmd,
2549             "bfd udp session add: `bfd_add_add_session' API call"
2550             " failed, rv=-101:Duplicate BFD object")
2551         verify_bfd_session_config(self, vpp_session)
2552         mod_session = VppBFDUDPSession(
2553             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2554             required_min_rx=2 * vpp_session.required_min_rx,
2555             desired_min_tx=3 * vpp_session.desired_min_tx,
2556             detect_mult=4 * vpp_session.detect_mult)
2557         self.cli_verify_no_response(
2558             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2559             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2560             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2561              mod_session.desired_min_tx,
2562              mod_session.required_min_rx, mod_session.detect_mult))
2563         verify_bfd_session_config(self, mod_session)
2564         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2565             "peer-addr %s" % (self.pg0.name,
2566                               self.pg0.local_ip6, self.pg0.remote_ip6)
2567         self.cli_verify_no_response(cli_del_cmd)
2568         # 2nd del is expected to fail
2569         self.cli_verify_response(
2570             cli_del_cmd,
2571             "bfd udp session del: `bfd_udp_del_session' API call"
2572             " failed, rv=-102:No such BFD object")
2573         self.assertFalse(vpp_session.query_vpp_config())
2574
2575     def test_add_mod_del_bfd_udp_auth(self):
2576         """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2577         key = self.factory.create_random_key(self)
2578         key.add_vpp_config()
2579         vpp_session = VppBFDUDPSession(
2580             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2581         self.registry.register(vpp_session, self.logger)
2582         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2583             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2584             "detect-mult %s conf-key-id %s bfd-key-id %s"\
2585             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2586                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2587                vpp_session.detect_mult, key.conf_key_id,
2588                vpp_session.bfd_key_id)
2589         self.cli_verify_no_response(cli_add_cmd)
2590         # 2nd add should fail
2591         self.cli_verify_response(
2592             cli_add_cmd,
2593             "bfd udp session add: `bfd_add_add_session' API call"
2594             " failed, rv=-101:Duplicate BFD object")
2595         verify_bfd_session_config(self, vpp_session)
2596         mod_session = VppBFDUDPSession(
2597             self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2598             bfd_key_id=vpp_session.bfd_key_id,
2599             required_min_rx=2 * vpp_session.required_min_rx,
2600             desired_min_tx=3 * vpp_session.desired_min_tx,
2601             detect_mult=4 * vpp_session.detect_mult)
2602         self.cli_verify_no_response(
2603             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2604             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2605             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2606              mod_session.desired_min_tx,
2607              mod_session.required_min_rx, mod_session.detect_mult))
2608         verify_bfd_session_config(self, mod_session)
2609         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2610             "peer-addr %s" % (self.pg0.name,
2611                               self.pg0.local_ip4, self.pg0.remote_ip4)
2612         self.cli_verify_no_response(cli_del_cmd)
2613         # 2nd del is expected to fail
2614         self.cli_verify_response(
2615             cli_del_cmd,
2616             "bfd udp session del: `bfd_udp_del_session' API call"
2617             " failed, rv=-102:No such BFD object")
2618         self.assertFalse(vpp_session.query_vpp_config())
2619
2620     def test_add_mod_del_bfd_udp6_auth(self):
2621         """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2622         key = self.factory.create_random_key(
2623             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2624         key.add_vpp_config()
2625         vpp_session = VppBFDUDPSession(
2626             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2627         self.registry.register(vpp_session, self.logger)
2628         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2629             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2630             "detect-mult %s conf-key-id %s bfd-key-id %s" \
2631             % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2632                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2633                vpp_session.detect_mult, key.conf_key_id,
2634                vpp_session.bfd_key_id)
2635         self.cli_verify_no_response(cli_add_cmd)
2636         # 2nd add should fail
2637         self.cli_verify_response(
2638             cli_add_cmd,
2639             "bfd udp session add: `bfd_add_add_session' API call"
2640             " failed, rv=-101:Duplicate BFD object")
2641         verify_bfd_session_config(self, vpp_session)
2642         mod_session = VppBFDUDPSession(
2643             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2644             bfd_key_id=vpp_session.bfd_key_id,
2645             required_min_rx=2 * vpp_session.required_min_rx,
2646             desired_min_tx=3 * vpp_session.desired_min_tx,
2647             detect_mult=4 * vpp_session.detect_mult)
2648         self.cli_verify_no_response(
2649             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2650             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2651             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2652              mod_session.desired_min_tx,
2653              mod_session.required_min_rx, mod_session.detect_mult))
2654         verify_bfd_session_config(self, mod_session)
2655         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2656             "peer-addr %s" % (self.pg0.name,
2657                               self.pg0.local_ip6, self.pg0.remote_ip6)
2658         self.cli_verify_no_response(cli_del_cmd)
2659         # 2nd del is expected to fail
2660         self.cli_verify_response(
2661             cli_del_cmd,
2662             "bfd udp session del: `bfd_udp_del_session' API call"
2663             " failed, rv=-102:No such BFD object")
2664         self.assertFalse(vpp_session.query_vpp_config())
2665
2666     def test_auth_on_off(self):
2667         """ turn authentication on and off """
2668         key = self.factory.create_random_key(
2669             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2670         key.add_vpp_config()
2671         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2672         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2673                                         sha1_key=key)
2674         session.add_vpp_config()
2675         cli_activate = \
2676             "bfd udp session auth activate interface %s local-addr %s "\
2677             "peer-addr %s conf-key-id %s bfd-key-id %s"\
2678             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2679                key.conf_key_id, auth_session.bfd_key_id)
2680         self.cli_verify_no_response(cli_activate)
2681         verify_bfd_session_config(self, auth_session)
2682         self.cli_verify_no_response(cli_activate)
2683         verify_bfd_session_config(self, auth_session)
2684         cli_deactivate = \
2685             "bfd udp session auth deactivate interface %s local-addr %s "\
2686             "peer-addr %s "\
2687             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2688         self.cli_verify_no_response(cli_deactivate)
2689         verify_bfd_session_config(self, session)
2690         self.cli_verify_no_response(cli_deactivate)
2691         verify_bfd_session_config(self, session)
2692
2693     def test_auth_on_off_delayed(self):
2694         """ turn authentication on and off (delayed) """
2695         key = self.factory.create_random_key(
2696             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2697         key.add_vpp_config()
2698         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2699         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2700                                         sha1_key=key)
2701         session.add_vpp_config()
2702         cli_activate = \
2703             "bfd udp session auth activate interface %s local-addr %s "\
2704             "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2705             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2706                key.conf_key_id, auth_session.bfd_key_id)
2707         self.cli_verify_no_response(cli_activate)
2708         verify_bfd_session_config(self, auth_session)
2709         self.cli_verify_no_response(cli_activate)
2710         verify_bfd_session_config(self, auth_session)
2711         cli_deactivate = \
2712             "bfd udp session auth deactivate interface %s local-addr %s "\
2713             "peer-addr %s delayed yes"\
2714             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2715         self.cli_verify_no_response(cli_deactivate)
2716         verify_bfd_session_config(self, session)
2717         self.cli_verify_no_response(cli_deactivate)
2718         verify_bfd_session_config(self, session)
2719
2720     def test_admin_up_down(self):
2721         """ put session admin-up and admin-down """
2722         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2723         session.add_vpp_config()
2724         cli_down = \
2725             "bfd udp session set-flags admin down interface %s local-addr %s "\
2726             "peer-addr %s "\
2727             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2728         cli_up = \
2729             "bfd udp session set-flags admin up interface %s local-addr %s "\
2730             "peer-addr %s "\
2731             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2732         self.cli_verify_no_response(cli_down)
2733         verify_bfd_session_config(self, session, state=BFDState.admin_down)
2734         self.cli_verify_no_response(cli_up)
2735         verify_bfd_session_config(self, session, state=BFDState.down)
2736
2737     def test_set_del_udp_echo_source(self):
2738         """ set/del udp echo source """
2739         self.create_loopback_interfaces(1)
2740         self.loopback0 = self.lo_interfaces[0]
2741         self.loopback0.admin_up()
2742         self.cli_verify_response("show bfd echo-source",
2743                                  "UDP echo source is not set.")
2744         cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2745         self.cli_verify_no_response(cli_set)
2746         self.cli_verify_response("show bfd echo-source",
2747                                  "UDP echo source is: %s\n"
2748                                  "IPv4 address usable as echo source: none\n"
2749                                  "IPv6 address usable as echo source: none" %
2750                                  self.loopback0.name)
2751         self.loopback0.config_ip4()
2752         unpacked = unpack("!L", self.loopback0.local_ip4n)
2753         echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2754         self.cli_verify_response("show bfd echo-source",
2755                                  "UDP echo source is: %s\n"
2756                                  "IPv4 address usable as echo source: %s\n"
2757                                  "IPv6 address usable as echo source: none" %
2758                                  (self.loopback0.name, echo_ip4))
2759         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2760         echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2761                                             unpacked[2], unpacked[3] ^ 1))
2762         self.loopback0.config_ip6()
2763         self.cli_verify_response("show bfd echo-source",
2764                                  "UDP echo source is: %s\n"
2765                                  "IPv4 address usable as echo source: %s\n"
2766                                  "IPv6 address usable as echo source: %s" %
2767                                  (self.loopback0.name, echo_ip4, echo_ip6))
2768         cli_del = "bfd udp echo-source del"
2769         self.cli_verify_no_response(cli_del)
2770         self.cli_verify_response("show bfd echo-source",
2771                                  "UDP echo source is not set.")
2772
2773
2774 if __name__ == '__main__':
2775     unittest.main(testRunner=VppTestRunner)