4dca11247ce29e8d05da608b5884dc270137fb7e
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python
2 """ BFD tests """
3
4 from __future__ import division
5
6 import binascii
7 import hashlib
8 import time
9 import unittest
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from struct import pack, unpack
13
14 from six import moves
15 import scapy.compat
16 from scapy.layers.inet import UDP, IP
17 from scapy.layers.inet6 import IPv6
18 from scapy.layers.l2 import Ether, GRE
19 from scapy.packet import Raw
20
21 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
22     BFDDiagCode, BFDState, BFD_vpp_echo
23 from framework import VppTestCase, VppTestRunner, running_extended_tests
24 from util import ppp
25 from vpp_ip import DpoProto
26 from vpp_ip_route import VppIpRoute, VppRoutePath
27 from vpp_lo_interface import VppLoInterface
28 from vpp_papi_provider import UnexpectedApiReturnValueError
29 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
30 from vpp_gre_interface import VppGreInterface
31
32 USEC_IN_SEC = 1000000
33
34
35 class AuthKeyFactory(object):
36     """Factory class for creating auth keys with unique conf key ID"""
37
38     def __init__(self):
39         self._conf_key_ids = {}
40
41     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
42         """ create a random key with unique conf key id """
43         conf_key_id = randint(0, 0xFFFFFFFF)
44         while conf_key_id in self._conf_key_ids:
45             conf_key_id = randint(0, 0xFFFFFFFF)
46         self._conf_key_ids[conf_key_id] = 1
47         key = scapy.compat.raw(
48             bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
49         return VppBFDAuthKey(test=test, auth_type=auth_type,
50                              conf_key_id=conf_key_id, key=key)
51
52
53 @unittest.skipUnless(running_extended_tests, "part of extended tests")
54 class BFDAPITestCase(VppTestCase):
55     """Bidirectional Forwarding Detection (BFD) - API"""
56
57     pg0 = None
58     pg1 = None
59
60     @classmethod
61     def setUpClass(cls):
62         super(BFDAPITestCase, cls).setUpClass()
63         cls.vapi.cli("set log class bfd level debug")
64         try:
65             cls.create_pg_interfaces(range(2))
66             for i in cls.pg_interfaces:
67                 i.config_ip4()
68                 i.config_ip6()
69                 i.resolve_arp()
70
71         except Exception:
72             super(BFDAPITestCase, cls).tearDownClass()
73             raise
74
75     @classmethod
76     def tearDownClass(cls):
77         super(BFDAPITestCase, cls).tearDownClass()
78
79     def setUp(self):
80         super(BFDAPITestCase, self).setUp()
81         self.factory = AuthKeyFactory()
82
83     def test_add_bfd(self):
84         """ create a BFD session """
85         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
86         session.add_vpp_config()
87         self.logger.debug("Session state is %s", session.state)
88         session.remove_vpp_config()
89         session.add_vpp_config()
90         self.logger.debug("Session state is %s", session.state)
91         session.remove_vpp_config()
92
93     def test_double_add(self):
94         """ create the same BFD session twice (negative case) """
95         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
96         session.add_vpp_config()
97
98         with self.vapi.assert_negative_api_retval():
99             session.add_vpp_config()
100
101         session.remove_vpp_config()
102
103     def test_add_bfd6(self):
104         """ create IPv6 BFD session """
105         session = VppBFDUDPSession(
106             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
107         session.add_vpp_config()
108         self.logger.debug("Session state is %s", session.state)
109         session.remove_vpp_config()
110         session.add_vpp_config()
111         self.logger.debug("Session state is %s", session.state)
112         session.remove_vpp_config()
113
114     def test_mod_bfd(self):
115         """ modify BFD session parameters """
116         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
117                                    desired_min_tx=50000,
118                                    required_min_rx=10000,
119                                    detect_mult=1)
120         session.add_vpp_config()
121         s = session.get_bfd_udp_session_dump_entry()
122         self.assert_equal(session.desired_min_tx,
123                           s.desired_min_tx,
124                           "desired min transmit interval")
125         self.assert_equal(session.required_min_rx,
126                           s.required_min_rx,
127                           "required min receive interval")
128         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
129         session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
130                                   required_min_rx=session.required_min_rx * 2,
131                                   detect_mult=session.detect_mult * 2)
132         s = session.get_bfd_udp_session_dump_entry()
133         self.assert_equal(session.desired_min_tx,
134                           s.desired_min_tx,
135                           "desired min transmit interval")
136         self.assert_equal(session.required_min_rx,
137                           s.required_min_rx,
138                           "required min receive interval")
139         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
140
141     def test_add_sha1_keys(self):
142         """ add SHA1 keys """
143         key_count = 10
144         keys = [self.factory.create_random_key(
145             self) for i in range(0, key_count)]
146         for key in keys:
147             self.assertFalse(key.query_vpp_config())
148         for key in keys:
149             key.add_vpp_config()
150         for key in keys:
151             self.assertTrue(key.query_vpp_config())
152         # remove randomly
153         indexes = range(key_count)
154         shuffle(indexes)
155         removed = []
156         for i in indexes:
157             key = keys[i]
158             key.remove_vpp_config()
159             removed.append(i)
160             for j in range(key_count):
161                 key = keys[j]
162                 if j in removed:
163                     self.assertFalse(key.query_vpp_config())
164                 else:
165                     self.assertTrue(key.query_vpp_config())
166         # should be removed now
167         for key in keys:
168             self.assertFalse(key.query_vpp_config())
169         # add back and remove again
170         for key in keys:
171             key.add_vpp_config()
172         for key in keys:
173             self.assertTrue(key.query_vpp_config())
174         for key in keys:
175             key.remove_vpp_config()
176         for key in keys:
177             self.assertFalse(key.query_vpp_config())
178
179     def test_add_bfd_sha1(self):
180         """ create a BFD session (SHA1) """
181         key = self.factory.create_random_key(self)
182         key.add_vpp_config()
183         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
184                                    sha1_key=key)
185         session.add_vpp_config()
186         self.logger.debug("Session state is %s", session.state)
187         session.remove_vpp_config()
188         session.add_vpp_config()
189         self.logger.debug("Session state is %s", session.state)
190         session.remove_vpp_config()
191
192     def test_double_add_sha1(self):
193         """ create the same BFD session twice (negative case) (SHA1) """
194         key = self.factory.create_random_key(self)
195         key.add_vpp_config()
196         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
197                                    sha1_key=key)
198         session.add_vpp_config()
199         with self.assertRaises(Exception):
200             session.add_vpp_config()
201
202     def test_add_auth_nonexistent_key(self):
203         """ create BFD session using non-existent SHA1 (negative case) """
204         session = VppBFDUDPSession(
205             self, self.pg0, self.pg0.remote_ip4,
206             sha1_key=self.factory.create_random_key(self))
207         with self.assertRaises(Exception):
208             session.add_vpp_config()
209
210     def test_shared_sha1_key(self):
211         """ share single SHA1 key between multiple BFD sessions """
212         key = self.factory.create_random_key(self)
213         key.add_vpp_config()
214         sessions = [
215             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
216                              sha1_key=key),
217             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
218                              sha1_key=key, af=AF_INET6),
219             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
220                              sha1_key=key),
221             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
222                              sha1_key=key, af=AF_INET6)]
223         for s in sessions:
224             s.add_vpp_config()
225         removed = 0
226         for s in sessions:
227             e = key.get_bfd_auth_keys_dump_entry()
228             self.assert_equal(e.use_count, len(sessions) - removed,
229                               "Use count for shared key")
230             s.remove_vpp_config()
231             removed += 1
232         e = key.get_bfd_auth_keys_dump_entry()
233         self.assert_equal(e.use_count, len(sessions) - removed,
234                           "Use count for shared key")
235
236     def test_activate_auth(self):
237         """ activate SHA1 authentication """
238         key = self.factory.create_random_key(self)
239         key.add_vpp_config()
240         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
241         session.add_vpp_config()
242         session.activate_auth(key)
243
244     def test_deactivate_auth(self):
245         """ deactivate SHA1 authentication """
246         key = self.factory.create_random_key(self)
247         key.add_vpp_config()
248         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
249         session.add_vpp_config()
250         session.activate_auth(key)
251         session.deactivate_auth()
252
253     def test_change_key(self):
254         """ change SHA1 key """
255         key1 = self.factory.create_random_key(self)
256         key2 = self.factory.create_random_key(self)
257         while key2.conf_key_id == key1.conf_key_id:
258             key2 = self.factory.create_random_key(self)
259         key1.add_vpp_config()
260         key2.add_vpp_config()
261         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
262                                    sha1_key=key1)
263         session.add_vpp_config()
264         session.activate_auth(key2)
265
266     def test_set_del_udp_echo_source(self):
267         """ set/del udp echo source """
268         self.create_loopback_interfaces(1)
269         self.loopback0 = self.lo_interfaces[0]
270         self.loopback0.admin_up()
271         echo_source = self.vapi.bfd_udp_get_echo_source()
272         self.assertFalse(echo_source.is_set)
273         self.assertFalse(echo_source.have_usable_ip4)
274         self.assertFalse(echo_source.have_usable_ip6)
275
276         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
277         echo_source = self.vapi.bfd_udp_get_echo_source()
278         self.assertTrue(echo_source.is_set)
279         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
280         self.assertFalse(echo_source.have_usable_ip4)
281         self.assertFalse(echo_source.have_usable_ip6)
282
283         self.loopback0.config_ip4()
284         unpacked = unpack("!L", self.loopback0.local_ip4n)
285         echo_ip4 = pack("!L", unpacked[0] ^ 1)
286         echo_source = self.vapi.bfd_udp_get_echo_source()
287         self.assertTrue(echo_source.is_set)
288         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
289         self.assertTrue(echo_source.have_usable_ip4)
290         self.assertEqual(echo_source.ip4_addr, echo_ip4)
291         self.assertFalse(echo_source.have_usable_ip6)
292
293         self.loopback0.config_ip6()
294         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
295         echo_ip6 = pack("!LLLL", unpacked[0], unpacked[1], unpacked[2],
296                         unpacked[3] ^ 1)
297         echo_source = self.vapi.bfd_udp_get_echo_source()
298         self.assertTrue(echo_source.is_set)
299         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
300         self.assertTrue(echo_source.have_usable_ip4)
301         self.assertEqual(echo_source.ip4_addr, echo_ip4)
302         self.assertTrue(echo_source.have_usable_ip6)
303         self.assertEqual(echo_source.ip6_addr, echo_ip6)
304
305         self.vapi.bfd_udp_del_echo_source()
306         echo_source = self.vapi.bfd_udp_get_echo_source()
307         self.assertFalse(echo_source.is_set)
308         self.assertFalse(echo_source.have_usable_ip4)
309         self.assertFalse(echo_source.have_usable_ip6)
310
311
312 class BFDTestSession(object):
313     """ BFD session as seen from test framework side """
314
315     def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
316                  bfd_key_id=None, our_seq_number=None,
317                  tunnel_header=None, phy_interface=None):
318         self.test = test
319         self.af = af
320         self.sha1_key = sha1_key
321         self.bfd_key_id = bfd_key_id
322         self.interface = interface
323         if phy_interface:
324             self.phy_interface = phy_interface
325         else:
326             self.phy_interface = self.interface
327         self.udp_sport = randint(49152, 65535)
328         if our_seq_number is None:
329             self.our_seq_number = randint(0, 40000000)
330         else:
331             self.our_seq_number = our_seq_number
332         self.vpp_seq_number = None
333         self.my_discriminator = 0
334         self.desired_min_tx = 300000
335         self.required_min_rx = 300000
336         self.required_min_echo_rx = None
337         self.detect_mult = detect_mult
338         self.diag = BFDDiagCode.no_diagnostic
339         self.your_discriminator = None
340         self.state = BFDState.down
341         self.auth_type = BFDAuthType.no_auth
342         self.tunnel_header = tunnel_header
343
344     def inc_seq_num(self):
345         """ increment sequence number, wrapping if needed """
346         if self.our_seq_number == 0xFFFFFFFF:
347             self.our_seq_number = 0
348         else:
349             self.our_seq_number += 1
350
351     def update(self, my_discriminator=None, your_discriminator=None,
352                desired_min_tx=None, required_min_rx=None,
353                required_min_echo_rx=None, detect_mult=None,
354                diag=None, state=None, auth_type=None):
355         """ update BFD parameters associated with session """
356         if my_discriminator is not None:
357             self.my_discriminator = my_discriminator
358         if your_discriminator is not None:
359             self.your_discriminator = your_discriminator
360         if required_min_rx is not None:
361             self.required_min_rx = required_min_rx
362         if required_min_echo_rx is not None:
363             self.required_min_echo_rx = required_min_echo_rx
364         if desired_min_tx is not None:
365             self.desired_min_tx = desired_min_tx
366         if detect_mult is not None:
367             self.detect_mult = detect_mult
368         if diag is not None:
369             self.diag = diag
370         if state is not None:
371             self.state = state
372         if auth_type is not None:
373             self.auth_type = auth_type
374
375     def fill_packet_fields(self, packet):
376         """ set packet fields with known values in packet """
377         bfd = packet[BFD]
378         if self.my_discriminator:
379             self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
380                                    self.my_discriminator)
381             bfd.my_discriminator = self.my_discriminator
382         if self.your_discriminator:
383             self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
384                                    self.your_discriminator)
385             bfd.your_discriminator = self.your_discriminator
386         if self.required_min_rx:
387             self.test.logger.debug(
388                 "BFD: setting packet.required_min_rx_interval=%s",
389                 self.required_min_rx)
390             bfd.required_min_rx_interval = self.required_min_rx
391         if self.required_min_echo_rx:
392             self.test.logger.debug(
393                 "BFD: setting packet.required_min_echo_rx=%s",
394                 self.required_min_echo_rx)
395             bfd.required_min_echo_rx_interval = self.required_min_echo_rx
396         if self.desired_min_tx:
397             self.test.logger.debug(
398                 "BFD: setting packet.desired_min_tx_interval=%s",
399                 self.desired_min_tx)
400             bfd.desired_min_tx_interval = self.desired_min_tx
401         if self.detect_mult:
402             self.test.logger.debug(
403                 "BFD: setting packet.detect_mult=%s", self.detect_mult)
404             bfd.detect_mult = self.detect_mult
405         if self.diag:
406             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
407             bfd.diag = self.diag
408         if self.state:
409             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
410             bfd.state = self.state
411         if self.auth_type:
412             # this is used by a negative test-case
413             self.test.logger.debug("BFD: setting packet.auth_type=%s",
414                                    self.auth_type)
415             bfd.auth_type = self.auth_type
416
417     def create_packet(self):
418         """ create a BFD packet, reflecting the current state of session """
419         if self.sha1_key:
420             bfd = BFD(flags="A")
421             bfd.auth_type = self.sha1_key.auth_type
422             bfd.auth_len = BFD.sha1_auth_len
423             bfd.auth_key_id = self.bfd_key_id
424             bfd.auth_seq_num = self.our_seq_number
425             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
426         else:
427             bfd = BFD()
428         packet = Ether(src=self.phy_interface.remote_mac,
429                        dst=self.phy_interface.local_mac)
430         if self.tunnel_header:
431             packet = packet / self.tunnel_header
432         if self.af == AF_INET6:
433             packet = (packet /
434                       IPv6(src=self.interface.remote_ip6,
435                            dst=self.interface.local_ip6,
436                            hlim=255) /
437                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
438                       bfd)
439         else:
440             packet = (packet /
441                       IP(src=self.interface.remote_ip4,
442                          dst=self.interface.local_ip4,
443                          ttl=255) /
444                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
445                       bfd)
446         self.test.logger.debug("BFD: Creating packet")
447         self.fill_packet_fields(packet)
448         if self.sha1_key:
449             hash_material = scapy.compat.raw(
450                 packet[BFD])[:32] + self.sha1_key.key + \
451                 "\0" * (20 - len(self.sha1_key.key))
452             self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
453                                    hashlib.sha1(hash_material).hexdigest())
454             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
455         return packet
456
457     def send_packet(self, packet=None, interface=None):
458         """ send packet on interface, creating the packet if needed """
459         if packet is None:
460             packet = self.create_packet()
461         if interface is None:
462             interface = self.phy_interface
463         self.test.logger.debug(ppp("Sending packet:", packet))
464         interface.add_stream(packet)
465         self.test.pg_start()
466
467     def verify_sha1_auth(self, packet):
468         """ Verify correctness of authentication in BFD layer. """
469         bfd = packet[BFD]
470         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
471         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
472                                BFDAuthType)
473         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
474         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
475         if self.vpp_seq_number is None:
476             self.vpp_seq_number = bfd.auth_seq_num
477             self.test.logger.debug("Received initial sequence number: %s" %
478                                    self.vpp_seq_number)
479         else:
480             recvd_seq_num = bfd.auth_seq_num
481             self.test.logger.debug("Received followup sequence number: %s" %
482                                    recvd_seq_num)
483             if self.vpp_seq_number < 0xffffffff:
484                 if self.sha1_key.auth_type == \
485                         BFDAuthType.meticulous_keyed_sha1:
486                     self.test.assert_equal(recvd_seq_num,
487                                            self.vpp_seq_number + 1,
488                                            "BFD sequence number")
489                 else:
490                     self.test.assert_in_range(recvd_seq_num,
491                                               self.vpp_seq_number,
492                                               self.vpp_seq_number + 1,
493                                               "BFD sequence number")
494             else:
495                 if self.sha1_key.auth_type == \
496                         BFDAuthType.meticulous_keyed_sha1:
497                     self.test.assert_equal(recvd_seq_num, 0,
498                                            "BFD sequence number")
499                 else:
500                     self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
501                                        "BFD sequence number not one of "
502                                        "(%s, 0)" % self.vpp_seq_number)
503             self.vpp_seq_number = recvd_seq_num
504         # last 20 bytes represent the hash - so replace them with the key,
505         # pad the result with zeros and hash the result
506         hash_material = bfd.original[:-20] + self.sha1_key.key + \
507             b"\0" * (20 - len(self.sha1_key.key))
508         expected_hash = hashlib.sha1(hash_material).hexdigest()
509         self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
510                                expected_hash, "Auth key hash")
511
512     def verify_bfd(self, packet):
513         """ Verify correctness of BFD layer. """
514         bfd = packet[BFD]
515         self.test.assert_equal(bfd.version, 1, "BFD version")
516         self.test.assert_equal(bfd.your_discriminator,
517                                self.my_discriminator,
518                                "BFD - your discriminator")
519         if self.sha1_key:
520             self.verify_sha1_auth(packet)
521
522
523 def bfd_session_up(test):
524     """ Bring BFD session up """
525     test.logger.info("BFD: Waiting for slow hello")
526     p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
527     old_offset = None
528     if hasattr(test, 'vpp_clock_offset'):
529         old_offset = test.vpp_clock_offset
530     test.vpp_clock_offset = time.time() - p.time
531     test.logger.debug("BFD: Calculated vpp clock offset: %s",
532                       test.vpp_clock_offset)
533     if old_offset:
534         test.assertAlmostEqual(
535             old_offset, test.vpp_clock_offset, delta=0.5,
536             msg="vpp clock offset not stable (new: %s, old: %s)" %
537             (test.vpp_clock_offset, old_offset))
538     test.logger.info("BFD: Sending Init")
539     test.test_session.update(my_discriminator=randint(0, 40000000),
540                              your_discriminator=p[BFD].my_discriminator,
541                              state=BFDState.init)
542     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
543             BFDAuthType.meticulous_keyed_sha1:
544         test.test_session.inc_seq_num()
545     test.test_session.send_packet()
546     test.logger.info("BFD: Waiting for event")
547     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
548     verify_event(test, e, expected_state=BFDState.up)
549     test.logger.info("BFD: Session is Up")
550     test.test_session.update(state=BFDState.up)
551     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
552             BFDAuthType.meticulous_keyed_sha1:
553         test.test_session.inc_seq_num()
554     test.test_session.send_packet()
555     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
556
557
558 def bfd_session_down(test):
559     """ Bring BFD session down """
560     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
561     test.test_session.update(state=BFDState.down)
562     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
563             BFDAuthType.meticulous_keyed_sha1:
564         test.test_session.inc_seq_num()
565     test.test_session.send_packet()
566     test.logger.info("BFD: Waiting for event")
567     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
568     verify_event(test, e, expected_state=BFDState.down)
569     test.logger.info("BFD: Session is Down")
570     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
571
572
573 def verify_bfd_session_config(test, session, state=None):
574     dump = session.get_bfd_udp_session_dump_entry()
575     test.assertIsNotNone(dump)
576     # since dump is not none, we have verified that sw_if_index and addresses
577     # are valid (in get_bfd_udp_session_dump_entry)
578     if state:
579         test.assert_equal(dump.state, state, "session state")
580     test.assert_equal(dump.required_min_rx, session.required_min_rx,
581                       "required min rx interval")
582     test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
583                       "desired min tx interval")
584     test.assert_equal(dump.detect_mult, session.detect_mult,
585                       "detect multiplier")
586     if session.sha1_key is None:
587         test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
588     else:
589         test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
590         test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
591                           "bfd key id")
592         test.assert_equal(dump.conf_key_id,
593                           session.sha1_key.conf_key_id,
594                           "config key id")
595
596
597 def verify_ip(test, packet):
598     """ Verify correctness of IP layer. """
599     if test.vpp_session.af == AF_INET6:
600         ip = packet[IPv6]
601         local_ip = test.vpp_session.interface.local_ip6
602         remote_ip = test.vpp_session.interface.remote_ip6
603         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
604     else:
605         ip = packet[IP]
606         local_ip = test.vpp_session.interface.local_ip4
607         remote_ip = test.vpp_session.interface.remote_ip4
608         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
609     test.assert_equal(ip.src, local_ip, "IP source address")
610     test.assert_equal(ip.dst, remote_ip, "IP destination address")
611
612
613 def verify_udp(test, packet):
614     """ Verify correctness of UDP layer. """
615     udp = packet[UDP]
616     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
617     test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
618                          "UDP source port")
619
620
621 def verify_event(test, event, expected_state):
622     """ Verify correctness of event values. """
623     e = event
624     test.logger.debug("BFD: Event: %s" % moves.reprlib.repr(e))
625     test.assert_equal(e.sw_if_index,
626                       test.vpp_session.interface.sw_if_index,
627                       "BFD interface index")
628     is_ipv6 = 0
629     if test.vpp_session.af == AF_INET6:
630         is_ipv6 = 1
631     test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
632     if test.vpp_session.af == AF_INET:
633         test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
634                           "Local IPv4 address")
635         test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
636                           "Peer IPv4 address")
637     else:
638         test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
639                           "Local IPv6 address")
640         test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
641                           "Peer IPv6 address")
642     test.assert_equal(e.state, expected_state, BFDState)
643
644
645 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
646     """ wait for BFD packet and verify its correctness
647
648     :param timeout: how long to wait
649     :param pcap_time_min: ignore packets with pcap timestamp lower than this
650
651     :returns: tuple (packet, time spent waiting for packet)
652     """
653     test.logger.info("BFD: Waiting for BFD packet")
654     deadline = time.time() + timeout
655     counter = 0
656     while True:
657         counter += 1
658         # sanity check
659         test.assert_in_range(counter, 0, 100, "number of packets ignored")
660         time_left = deadline - time.time()
661         if time_left < 0:
662             raise CaptureTimeoutError("Packet did not arrive within timeout")
663         p = test.pg0.wait_for_packet(timeout=time_left)
664         test.logger.debug(ppp("BFD: Got packet:", p))
665         if pcap_time_min is not None and p.time < pcap_time_min:
666             test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
667                                   "pcap time min %s):" %
668                                   (p.time, pcap_time_min), p))
669         else:
670             break
671     if is_tunnel:
672         # strip an IP layer and move to the next
673         p = p[IP].payload
674
675     bfd = p[BFD]
676     if bfd is None:
677         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
678     if bfd.payload:
679         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
680     verify_ip(test, p)
681     verify_udp(test, p)
682     test.test_session.verify_bfd(p)
683     return p
684
685
686 @unittest.skipUnless(running_extended_tests, "part of extended tests")
687 class BFD4TestCase(VppTestCase):
688     """Bidirectional Forwarding Detection (BFD)"""
689
690     pg0 = None
691     vpp_clock_offset = None
692     vpp_session = None
693     test_session = None
694
695     @classmethod
696     def setUpClass(cls):
697         super(BFD4TestCase, cls).setUpClass()
698         cls.vapi.cli("set log class bfd level debug")
699         try:
700             cls.create_pg_interfaces([0])
701             cls.create_loopback_interfaces(1)
702             cls.loopback0 = cls.lo_interfaces[0]
703             cls.loopback0.config_ip4()
704             cls.loopback0.admin_up()
705             cls.pg0.config_ip4()
706             cls.pg0.configure_ipv4_neighbors()
707             cls.pg0.admin_up()
708             cls.pg0.resolve_arp()
709
710         except Exception:
711             super(BFD4TestCase, cls).tearDownClass()
712             raise
713
714     @classmethod
715     def tearDownClass(cls):
716         super(BFD4TestCase, cls).tearDownClass()
717
718     def setUp(self):
719         super(BFD4TestCase, self).setUp()
720         self.factory = AuthKeyFactory()
721         self.vapi.want_bfd_events()
722         self.pg0.enable_capture()
723         try:
724             self.vpp_session = VppBFDUDPSession(self, self.pg0,
725                                                 self.pg0.remote_ip4)
726             self.vpp_session.add_vpp_config()
727             self.vpp_session.admin_up()
728             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
729         except:
730             self.vapi.want_bfd_events(enable_disable=0)
731             raise
732
733     def tearDown(self):
734         if not self.vpp_dead:
735             self.vapi.want_bfd_events(enable_disable=0)
736         self.vapi.collect_events()  # clear the event queue
737         super(BFD4TestCase, self).tearDown()
738
739     def test_session_up(self):
740         """ bring BFD session up """
741         bfd_session_up(self)
742
743     def test_session_up_by_ip(self):
744         """ bring BFD session up - first frame looked up by address pair """
745         self.logger.info("BFD: Sending Slow control frame")
746         self.test_session.update(my_discriminator=randint(0, 40000000))
747         self.test_session.send_packet()
748         self.pg0.enable_capture()
749         p = self.pg0.wait_for_packet(1)
750         self.assert_equal(p[BFD].your_discriminator,
751                           self.test_session.my_discriminator,
752                           "BFD - your discriminator")
753         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
754         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
755                                  state=BFDState.up)
756         self.logger.info("BFD: Waiting for event")
757         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
758         verify_event(self, e, expected_state=BFDState.init)
759         self.logger.info("BFD: Sending Up")
760         self.test_session.send_packet()
761         self.logger.info("BFD: Waiting for event")
762         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
763         verify_event(self, e, expected_state=BFDState.up)
764         self.logger.info("BFD: Session is Up")
765         self.test_session.update(state=BFDState.up)
766         self.test_session.send_packet()
767         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
768
769     def test_session_down(self):
770         """ bring BFD session down """
771         bfd_session_up(self)
772         bfd_session_down(self)
773
774     @unittest.skipUnless(running_extended_tests, "part of extended tests")
775     def test_hold_up(self):
776         """ hold BFD session up """
777         bfd_session_up(self)
778         for dummy in range(self.test_session.detect_mult * 2):
779             wait_for_bfd_packet(self)
780             self.test_session.send_packet()
781         self.assert_equal(len(self.vapi.collect_events()), 0,
782                           "number of bfd events")
783
784     @unittest.skipUnless(running_extended_tests, "part of extended tests")
785     def test_slow_timer(self):
786         """ verify slow periodic control frames while session down """
787         packet_count = 3
788         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
789         prev_packet = wait_for_bfd_packet(self, 2)
790         for dummy in range(packet_count):
791             next_packet = wait_for_bfd_packet(self, 2)
792             time_diff = next_packet.time - prev_packet.time
793             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
794             # to work around timing issues
795             self.assert_in_range(
796                 time_diff, 0.70, 1.05, "time between slow packets")
797             prev_packet = next_packet
798
799     @unittest.skipUnless(running_extended_tests, "part of extended tests")
800     def test_zero_remote_min_rx(self):
801         """ no packets when zero remote required min rx interval """
802         bfd_session_up(self)
803         self.test_session.update(required_min_rx=0)
804         self.test_session.send_packet()
805         for dummy in range(self.test_session.detect_mult):
806             self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
807                        "sleep before transmitting bfd packet")
808             self.test_session.send_packet()
809             try:
810                 p = wait_for_bfd_packet(self, timeout=0)
811                 self.logger.error(ppp("Received unexpected packet:", p))
812             except CaptureTimeoutError:
813                 pass
814         self.assert_equal(
815             len(self.vapi.collect_events()), 0, "number of bfd events")
816         self.test_session.update(required_min_rx=300000)
817         for dummy in range(3):
818             self.test_session.send_packet()
819             wait_for_bfd_packet(
820                 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
821         self.assert_equal(
822             len(self.vapi.collect_events()), 0, "number of bfd events")
823
824     @unittest.skipUnless(running_extended_tests, "part of extended tests")
825     def test_conn_down(self):
826         """ verify session goes down after inactivity """
827         bfd_session_up(self)
828         detection_time = self.test_session.detect_mult *\
829             self.vpp_session.required_min_rx / USEC_IN_SEC
830         self.sleep(detection_time, "waiting for BFD session time-out")
831         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
832         verify_event(self, e, expected_state=BFDState.down)
833
834     @unittest.skipUnless(running_extended_tests, "part of extended tests")
835     def test_large_required_min_rx(self):
836         """ large remote required min rx interval """
837         bfd_session_up(self)
838         p = wait_for_bfd_packet(self)
839         interval = 3000000
840         self.test_session.update(required_min_rx=interval)
841         self.test_session.send_packet()
842         time_mark = time.time()
843         count = 0
844         # busy wait here, trying to collect a packet or event, vpp is not
845         # allowed to send packets and the session will timeout first - so the
846         # Up->Down event must arrive before any packets do
847         while time.time() < time_mark + interval / USEC_IN_SEC:
848             try:
849                 p = wait_for_bfd_packet(self, timeout=0)
850                 # if vpp managed to send a packet before we did the session
851                 # session update, then that's fine, ignore it
852                 if p.time < time_mark - self.vpp_clock_offset:
853                     continue
854                 self.logger.error(ppp("Received unexpected packet:", p))
855                 count += 1
856             except CaptureTimeoutError:
857                 pass
858             events = self.vapi.collect_events()
859             if len(events) > 0:
860                 verify_event(self, events[0], BFDState.down)
861                 break
862         self.assert_equal(count, 0, "number of packets received")
863
864     @unittest.skipUnless(running_extended_tests, "part of extended tests")
865     def test_immediate_remote_min_rx_reduction(self):
866         """ immediately honor remote required min rx reduction """
867         self.vpp_session.remove_vpp_config()
868         self.vpp_session = VppBFDUDPSession(
869             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
870         self.pg0.enable_capture()
871         self.vpp_session.add_vpp_config()
872         self.test_session.update(desired_min_tx=1000000,
873                                  required_min_rx=1000000)
874         bfd_session_up(self)
875         reference_packet = wait_for_bfd_packet(self)
876         time_mark = time.time()
877         interval = 300000
878         self.test_session.update(required_min_rx=interval)
879         self.test_session.send_packet()
880         extra_time = time.time() - time_mark
881         p = wait_for_bfd_packet(self)
882         # first packet is allowed to be late by time we spent doing the update
883         # calculated in extra_time
884         self.assert_in_range(p.time - reference_packet.time,
885                              .95 * 0.75 * interval / USEC_IN_SEC,
886                              1.05 * interval / USEC_IN_SEC + extra_time,
887                              "time between BFD packets")
888         reference_packet = p
889         for dummy in range(3):
890             p = wait_for_bfd_packet(self)
891             diff = p.time - reference_packet.time
892             self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
893                                  1.05 * interval / USEC_IN_SEC,
894                                  "time between BFD packets")
895             reference_packet = p
896
897     @unittest.skipUnless(running_extended_tests, "part of extended tests")
898     def test_modify_req_min_rx_double(self):
899         """ modify session - double required min rx """
900         bfd_session_up(self)
901         p = wait_for_bfd_packet(self)
902         self.test_session.update(desired_min_tx=10000,
903                                  required_min_rx=10000)
904         self.test_session.send_packet()
905         # double required min rx
906         self.vpp_session.modify_parameters(
907             required_min_rx=2 * self.vpp_session.required_min_rx)
908         p = wait_for_bfd_packet(
909             self, pcap_time_min=time.time() - self.vpp_clock_offset)
910         # poll bit needs to be set
911         self.assertIn("P", p.sprintf("%BFD.flags%"),
912                       "Poll bit not set in BFD packet")
913         # finish poll sequence with final packet
914         final = self.test_session.create_packet()
915         final[BFD].flags = "F"
916         timeout = self.test_session.detect_mult * \
917             max(self.test_session.desired_min_tx,
918                 self.vpp_session.required_min_rx) / USEC_IN_SEC
919         self.test_session.send_packet(final)
920         time_mark = time.time()
921         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
922         verify_event(self, e, expected_state=BFDState.down)
923         time_to_event = time.time() - time_mark
924         self.assert_in_range(time_to_event, .9 * timeout,
925                              1.1 * timeout, "session timeout")
926
927     @unittest.skipUnless(running_extended_tests, "part of extended tests")
928     def test_modify_req_min_rx_halve(self):
929         """ modify session - halve required min rx """
930         self.vpp_session.modify_parameters(
931             required_min_rx=2 * self.vpp_session.required_min_rx)
932         bfd_session_up(self)
933         p = wait_for_bfd_packet(self)
934         self.test_session.update(desired_min_tx=10000,
935                                  required_min_rx=10000)
936         self.test_session.send_packet()
937         p = wait_for_bfd_packet(
938             self, pcap_time_min=time.time() - self.vpp_clock_offset)
939         # halve required min rx
940         old_required_min_rx = self.vpp_session.required_min_rx
941         self.vpp_session.modify_parameters(
942             required_min_rx=self.vpp_session.required_min_rx // 2)
943         # now we wait 0.8*3*old-req-min-rx and the session should still be up
944         self.sleep(0.8 * self.vpp_session.detect_mult *
945                    old_required_min_rx / USEC_IN_SEC,
946                    "wait before finishing poll sequence")
947         self.assert_equal(len(self.vapi.collect_events()), 0,
948                           "number of bfd events")
949         p = wait_for_bfd_packet(self)
950         # poll bit needs to be set
951         self.assertIn("P", p.sprintf("%BFD.flags%"),
952                       "Poll bit not set in BFD packet")
953         # finish poll sequence with final packet
954         final = self.test_session.create_packet()
955         final[BFD].flags = "F"
956         self.test_session.send_packet(final)
957         # now the session should time out under new conditions
958         detection_time = self.test_session.detect_mult *\
959             self.vpp_session.required_min_rx / USEC_IN_SEC
960         before = time.time()
961         e = self.vapi.wait_for_event(
962             2 * detection_time, "bfd_udp_session_details")
963         after = time.time()
964         self.assert_in_range(after - before,
965                              0.9 * detection_time,
966                              1.1 * detection_time,
967                              "time before bfd session goes down")
968         verify_event(self, e, expected_state=BFDState.down)
969
970     @unittest.skipUnless(running_extended_tests, "part of extended tests")
971     def test_modify_detect_mult(self):
972         """ modify detect multiplier """
973         bfd_session_up(self)
974         p = wait_for_bfd_packet(self)
975         self.vpp_session.modify_parameters(detect_mult=1)
976         p = wait_for_bfd_packet(
977             self, pcap_time_min=time.time() - self.vpp_clock_offset)
978         self.assert_equal(self.vpp_session.detect_mult,
979                           p[BFD].detect_mult,
980                           "detect mult")
981         # poll bit must not be set
982         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
983                          "Poll bit not set in BFD packet")
984         self.vpp_session.modify_parameters(detect_mult=10)
985         p = wait_for_bfd_packet(
986             self, pcap_time_min=time.time() - self.vpp_clock_offset)
987         self.assert_equal(self.vpp_session.detect_mult,
988                           p[BFD].detect_mult,
989                           "detect mult")
990         # poll bit must not be set
991         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
992                          "Poll bit not set in BFD packet")
993
994     @unittest.skipUnless(running_extended_tests, "part of extended tests")
995     def test_queued_poll(self):
996         """ test poll sequence queueing """
997         bfd_session_up(self)
998         p = wait_for_bfd_packet(self)
999         self.vpp_session.modify_parameters(
1000             required_min_rx=2 * self.vpp_session.required_min_rx)
1001         p = wait_for_bfd_packet(self)
1002         poll_sequence_start = time.time()
1003         poll_sequence_length_min = 0.5
1004         send_final_after = time.time() + poll_sequence_length_min
1005         # poll bit needs to be set
1006         self.assertIn("P", p.sprintf("%BFD.flags%"),
1007                       "Poll bit not set in BFD packet")
1008         self.assert_equal(p[BFD].required_min_rx_interval,
1009                           self.vpp_session.required_min_rx,
1010                           "BFD required min rx interval")
1011         self.vpp_session.modify_parameters(
1012             required_min_rx=2 * self.vpp_session.required_min_rx)
1013         # 2nd poll sequence should be queued now
1014         # don't send the reply back yet, wait for some time to emulate
1015         # longer round-trip time
1016         packet_count = 0
1017         while time.time() < send_final_after:
1018             self.test_session.send_packet()
1019             p = wait_for_bfd_packet(self)
1020             self.assert_equal(len(self.vapi.collect_events()), 0,
1021                               "number of bfd events")
1022             self.assert_equal(p[BFD].required_min_rx_interval,
1023                               self.vpp_session.required_min_rx,
1024                               "BFD required min rx interval")
1025             packet_count += 1
1026             # poll bit must be set
1027             self.assertIn("P", p.sprintf("%BFD.flags%"),
1028                           "Poll bit not set in BFD packet")
1029         final = self.test_session.create_packet()
1030         final[BFD].flags = "F"
1031         self.test_session.send_packet(final)
1032         # finish 1st with final
1033         poll_sequence_length = time.time() - poll_sequence_start
1034         # vpp must wait for some time before starting new poll sequence
1035         poll_no_2_started = False
1036         for dummy in range(2 * packet_count):
1037             p = wait_for_bfd_packet(self)
1038             self.assert_equal(len(self.vapi.collect_events()), 0,
1039                               "number of bfd events")
1040             if "P" in p.sprintf("%BFD.flags%"):
1041                 poll_no_2_started = True
1042                 if time.time() < poll_sequence_start + poll_sequence_length:
1043                     raise Exception("VPP started 2nd poll sequence too soon")
1044                 final = self.test_session.create_packet()
1045                 final[BFD].flags = "F"
1046                 self.test_session.send_packet(final)
1047                 break
1048             else:
1049                 self.test_session.send_packet()
1050         self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1051         # finish 2nd with final
1052         final = self.test_session.create_packet()
1053         final[BFD].flags = "F"
1054         self.test_session.send_packet(final)
1055         p = wait_for_bfd_packet(self)
1056         # poll bit must not be set
1057         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1058                          "Poll bit set in BFD packet")
1059
1060     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1061     def test_poll_response(self):
1062         """ test correct response to control frame with poll bit set """
1063         bfd_session_up(self)
1064         poll = self.test_session.create_packet()
1065         poll[BFD].flags = "P"
1066         self.test_session.send_packet(poll)
1067         final = wait_for_bfd_packet(
1068             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1069         self.assertIn("F", final.sprintf("%BFD.flags%"))
1070
1071     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1072     def test_no_periodic_if_remote_demand(self):
1073         """ no periodic frames outside poll sequence if remote demand set """
1074         bfd_session_up(self)
1075         demand = self.test_session.create_packet()
1076         demand[BFD].flags = "D"
1077         self.test_session.send_packet(demand)
1078         transmit_time = 0.9 \
1079             * max(self.vpp_session.required_min_rx,
1080                   self.test_session.desired_min_tx) \
1081             / USEC_IN_SEC
1082         count = 0
1083         for dummy in range(self.test_session.detect_mult * 2):
1084             self.sleep(transmit_time)
1085             self.test_session.send_packet(demand)
1086             try:
1087                 p = wait_for_bfd_packet(self, timeout=0)
1088                 self.logger.error(ppp("Received unexpected packet:", p))
1089                 count += 1
1090             except CaptureTimeoutError:
1091                 pass
1092         events = self.vapi.collect_events()
1093         for e in events:
1094             self.logger.error("Received unexpected event: %s", e)
1095         self.assert_equal(count, 0, "number of packets received")
1096         self.assert_equal(len(events), 0, "number of events received")
1097
1098     def test_echo_looped_back(self):
1099         """ echo packets looped back """
1100         # don't need a session in this case..
1101         self.vpp_session.remove_vpp_config()
1102         self.pg0.enable_capture()
1103         echo_packet_count = 10
1104         # random source port low enough to increment a few times..
1105         udp_sport_tx = randint(1, 50000)
1106         udp_sport_rx = udp_sport_tx
1107         echo_packet = (Ether(src=self.pg0.remote_mac,
1108                              dst=self.pg0.local_mac) /
1109                        IP(src=self.pg0.remote_ip4,
1110                           dst=self.pg0.remote_ip4) /
1111                        UDP(dport=BFD.udp_dport_echo) /
1112                        Raw("this should be looped back"))
1113         for dummy in range(echo_packet_count):
1114             self.sleep(.01, "delay between echo packets")
1115             echo_packet[UDP].sport = udp_sport_tx
1116             udp_sport_tx += 1
1117             self.logger.debug(ppp("Sending packet:", echo_packet))
1118             self.pg0.add_stream(echo_packet)
1119             self.pg_start()
1120         for dummy in range(echo_packet_count):
1121             p = self.pg0.wait_for_packet(1)
1122             self.logger.debug(ppp("Got packet:", p))
1123             ether = p[Ether]
1124             self.assert_equal(self.pg0.remote_mac,
1125                               ether.dst, "Destination MAC")
1126             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1127             ip = p[IP]
1128             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1129             self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1130             udp = p[UDP]
1131             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1132                               "UDP destination port")
1133             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1134             udp_sport_rx += 1
1135             # need to compare the hex payload here, otherwise BFD_vpp_echo
1136             # gets in way
1137             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1138                              scapy.compat.raw(echo_packet[UDP].payload),
1139                              "Received packet is not the echo packet sent")
1140         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1141                           "ECHO packet identifier for test purposes)")
1142
1143     def test_echo(self):
1144         """ echo function """
1145         bfd_session_up(self)
1146         self.test_session.update(required_min_echo_rx=150000)
1147         self.test_session.send_packet()
1148         detection_time = self.test_session.detect_mult *\
1149             self.vpp_session.required_min_rx / USEC_IN_SEC
1150         # echo shouldn't work without echo source set
1151         for dummy in range(10):
1152             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1153             self.sleep(sleep, "delay before sending bfd packet")
1154             self.test_session.send_packet()
1155         p = wait_for_bfd_packet(
1156             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1157         self.assert_equal(p[BFD].required_min_rx_interval,
1158                           self.vpp_session.required_min_rx,
1159                           "BFD required min rx interval")
1160         self.test_session.send_packet()
1161         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1162         echo_seen = False
1163         # should be turned on - loopback echo packets
1164         for dummy in range(3):
1165             loop_until = time.time() + 0.75 * detection_time
1166             while time.time() < loop_until:
1167                 p = self.pg0.wait_for_packet(1)
1168                 self.logger.debug(ppp("Got packet:", p))
1169                 if p[UDP].dport == BFD.udp_dport_echo:
1170                     self.assert_equal(
1171                         p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1172                     self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1173                                         "BFD ECHO src IP equal to loopback IP")
1174                     self.logger.debug(ppp("Looping back packet:", p))
1175                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1176                                       "ECHO packet destination MAC address")
1177                     p[Ether].dst = self.pg0.local_mac
1178                     self.pg0.add_stream(p)
1179                     self.pg_start()
1180                     echo_seen = True
1181                 elif p.haslayer(BFD):
1182                     if echo_seen:
1183                         self.assertGreaterEqual(
1184                             p[BFD].required_min_rx_interval,
1185                             1000000)
1186                     if "P" in p.sprintf("%BFD.flags%"):
1187                         final = self.test_session.create_packet()
1188                         final[BFD].flags = "F"
1189                         self.test_session.send_packet(final)
1190                 else:
1191                     raise Exception(ppp("Received unknown packet:", p))
1192
1193                 self.assert_equal(len(self.vapi.collect_events()), 0,
1194                                   "number of bfd events")
1195             self.test_session.send_packet()
1196         self.assertTrue(echo_seen, "No echo packets received")
1197
1198     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1199     def test_echo_fail(self):
1200         """ session goes down if echo function fails """
1201         bfd_session_up(self)
1202         self.test_session.update(required_min_echo_rx=150000)
1203         self.test_session.send_packet()
1204         detection_time = self.test_session.detect_mult *\
1205             self.vpp_session.required_min_rx / USEC_IN_SEC
1206         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1207         # echo function should be used now, but we will drop the echo packets
1208         verified_diag = False
1209         for dummy in range(3):
1210             loop_until = time.time() + 0.75 * detection_time
1211             while time.time() < loop_until:
1212                 p = self.pg0.wait_for_packet(1)
1213                 self.logger.debug(ppp("Got packet:", p))
1214                 if p[UDP].dport == BFD.udp_dport_echo:
1215                     # dropped
1216                     pass
1217                 elif p.haslayer(BFD):
1218                     if "P" in p.sprintf("%BFD.flags%"):
1219                         self.assertGreaterEqual(
1220                             p[BFD].required_min_rx_interval,
1221                             1000000)
1222                         final = self.test_session.create_packet()
1223                         final[BFD].flags = "F"
1224                         self.test_session.send_packet(final)
1225                     if p[BFD].state == BFDState.down:
1226                         self.assert_equal(p[BFD].diag,
1227                                           BFDDiagCode.echo_function_failed,
1228                                           BFDDiagCode)
1229                         verified_diag = True
1230                 else:
1231                     raise Exception(ppp("Received unknown packet:", p))
1232             self.test_session.send_packet()
1233         events = self.vapi.collect_events()
1234         self.assert_equal(len(events), 1, "number of bfd events")
1235         self.assert_equal(events[0].state, BFDState.down, BFDState)
1236         self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1237
1238     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1239     def test_echo_stop(self):
1240         """ echo function stops if peer sets required min echo rx zero """
1241         bfd_session_up(self)
1242         self.test_session.update(required_min_echo_rx=150000)
1243         self.test_session.send_packet()
1244         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1245         # wait for first echo packet
1246         while True:
1247             p = self.pg0.wait_for_packet(1)
1248             self.logger.debug(ppp("Got packet:", p))
1249             if p[UDP].dport == BFD.udp_dport_echo:
1250                 self.logger.debug(ppp("Looping back packet:", p))
1251                 p[Ether].dst = self.pg0.local_mac
1252                 self.pg0.add_stream(p)
1253                 self.pg_start()
1254                 break
1255             elif p.haslayer(BFD):
1256                 # ignore BFD
1257                 pass
1258             else:
1259                 raise Exception(ppp("Received unknown packet:", p))
1260         self.test_session.update(required_min_echo_rx=0)
1261         self.test_session.send_packet()
1262         # echo packets shouldn't arrive anymore
1263         for dummy in range(5):
1264             wait_for_bfd_packet(
1265                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1266             self.test_session.send_packet()
1267             events = self.vapi.collect_events()
1268             self.assert_equal(len(events), 0, "number of bfd events")
1269
1270     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1271     def test_echo_source_removed(self):
1272         """ echo function stops if echo source is removed """
1273         bfd_session_up(self)
1274         self.test_session.update(required_min_echo_rx=150000)
1275         self.test_session.send_packet()
1276         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1277         # wait for first echo packet
1278         while True:
1279             p = self.pg0.wait_for_packet(1)
1280             self.logger.debug(ppp("Got packet:", p))
1281             if p[UDP].dport == BFD.udp_dport_echo:
1282                 self.logger.debug(ppp("Looping back packet:", p))
1283                 p[Ether].dst = self.pg0.local_mac
1284                 self.pg0.add_stream(p)
1285                 self.pg_start()
1286                 break
1287             elif p.haslayer(BFD):
1288                 # ignore BFD
1289                 pass
1290             else:
1291                 raise Exception(ppp("Received unknown packet:", p))
1292         self.vapi.bfd_udp_del_echo_source()
1293         self.test_session.send_packet()
1294         # echo packets shouldn't arrive anymore
1295         for dummy in range(5):
1296             wait_for_bfd_packet(
1297                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1298             self.test_session.send_packet()
1299             events = self.vapi.collect_events()
1300             self.assert_equal(len(events), 0, "number of bfd events")
1301
1302     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1303     def test_stale_echo(self):
1304         """ stale echo packets don't keep a session up """
1305         bfd_session_up(self)
1306         self.test_session.update(required_min_echo_rx=150000)
1307         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1308         self.test_session.send_packet()
1309         # should be turned on - loopback echo packets
1310         echo_packet = None
1311         timeout_at = None
1312         timeout_ok = False
1313         for dummy in range(10 * self.vpp_session.detect_mult):
1314             p = self.pg0.wait_for_packet(1)
1315             if p[UDP].dport == BFD.udp_dport_echo:
1316                 if echo_packet is None:
1317                     self.logger.debug(ppp("Got first echo packet:", p))
1318                     echo_packet = p
1319                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1320                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1321                 else:
1322                     self.logger.debug(ppp("Got followup echo packet:", p))
1323                 self.logger.debug(ppp("Looping back first echo packet:", p))
1324                 echo_packet[Ether].dst = self.pg0.local_mac
1325                 self.pg0.add_stream(echo_packet)
1326                 self.pg_start()
1327             elif p.haslayer(BFD):
1328                 self.logger.debug(ppp("Got packet:", p))
1329                 if "P" in p.sprintf("%BFD.flags%"):
1330                     final = self.test_session.create_packet()
1331                     final[BFD].flags = "F"
1332                     self.test_session.send_packet(final)
1333                 if p[BFD].state == BFDState.down:
1334                     self.assertIsNotNone(
1335                         timeout_at,
1336                         "Session went down before first echo packet received")
1337                     now = time.time()
1338                     self.assertGreaterEqual(
1339                         now, timeout_at,
1340                         "Session timeout at %s, but is expected at %s" %
1341                         (now, timeout_at))
1342                     self.assert_equal(p[BFD].diag,
1343                                       BFDDiagCode.echo_function_failed,
1344                                       BFDDiagCode)
1345                     events = self.vapi.collect_events()
1346                     self.assert_equal(len(events), 1, "number of bfd events")
1347                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1348                     timeout_ok = True
1349                     break
1350             else:
1351                 raise Exception(ppp("Received unknown packet:", p))
1352             self.test_session.send_packet()
1353         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1354
1355     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1356     def test_invalid_echo_checksum(self):
1357         """ echo packets with invalid checksum don't keep a session up """
1358         bfd_session_up(self)
1359         self.test_session.update(required_min_echo_rx=150000)
1360         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1361         self.test_session.send_packet()
1362         # should be turned on - loopback echo packets
1363         timeout_at = None
1364         timeout_ok = False
1365         for dummy in range(10 * self.vpp_session.detect_mult):
1366             p = self.pg0.wait_for_packet(1)
1367             if p[UDP].dport == BFD.udp_dport_echo:
1368                 self.logger.debug(ppp("Got echo packet:", p))
1369                 if timeout_at is None:
1370                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1371                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1372                 p[BFD_vpp_echo].checksum = getrandbits(64)
1373                 p[Ether].dst = self.pg0.local_mac
1374                 self.logger.debug(ppp("Looping back modified echo packet:", p))
1375                 self.pg0.add_stream(p)
1376                 self.pg_start()
1377             elif p.haslayer(BFD):
1378                 self.logger.debug(ppp("Got packet:", p))
1379                 if "P" in p.sprintf("%BFD.flags%"):
1380                     final = self.test_session.create_packet()
1381                     final[BFD].flags = "F"
1382                     self.test_session.send_packet(final)
1383                 if p[BFD].state == BFDState.down:
1384                     self.assertIsNotNone(
1385                         timeout_at,
1386                         "Session went down before first echo packet received")
1387                     now = time.time()
1388                     self.assertGreaterEqual(
1389                         now, timeout_at,
1390                         "Session timeout at %s, but is expected at %s" %
1391                         (now, timeout_at))
1392                     self.assert_equal(p[BFD].diag,
1393                                       BFDDiagCode.echo_function_failed,
1394                                       BFDDiagCode)
1395                     events = self.vapi.collect_events()
1396                     self.assert_equal(len(events), 1, "number of bfd events")
1397                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1398                     timeout_ok = True
1399                     break
1400             else:
1401                 raise Exception(ppp("Received unknown packet:", p))
1402             self.test_session.send_packet()
1403         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1404
1405     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1406     def test_admin_up_down(self):
1407         """ put session admin-up and admin-down """
1408         bfd_session_up(self)
1409         self.vpp_session.admin_down()
1410         self.pg0.enable_capture()
1411         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1412         verify_event(self, e, expected_state=BFDState.admin_down)
1413         for dummy in range(2):
1414             p = wait_for_bfd_packet(self)
1415             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1416         # try to bring session up - shouldn't be possible
1417         self.test_session.update(state=BFDState.init)
1418         self.test_session.send_packet()
1419         for dummy in range(2):
1420             p = wait_for_bfd_packet(self)
1421             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1422         self.vpp_session.admin_up()
1423         self.test_session.update(state=BFDState.down)
1424         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1425         verify_event(self, e, expected_state=BFDState.down)
1426         p = wait_for_bfd_packet(
1427             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1428         self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1429         self.test_session.send_packet()
1430         p = wait_for_bfd_packet(
1431             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1432         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1433         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1434         verify_event(self, e, expected_state=BFDState.init)
1435         self.test_session.update(state=BFDState.up)
1436         self.test_session.send_packet()
1437         p = wait_for_bfd_packet(
1438             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1439         self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1440         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1441         verify_event(self, e, expected_state=BFDState.up)
1442
1443     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1444     def test_config_change_remote_demand(self):
1445         """ configuration change while peer in demand mode """
1446         bfd_session_up(self)
1447         demand = self.test_session.create_packet()
1448         demand[BFD].flags = "D"
1449         self.test_session.send_packet(demand)
1450         self.vpp_session.modify_parameters(
1451             required_min_rx=2 * self.vpp_session.required_min_rx)
1452         p = wait_for_bfd_packet(
1453             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1454         # poll bit must be set
1455         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1456         # terminate poll sequence
1457         final = self.test_session.create_packet()
1458         final[BFD].flags = "D+F"
1459         self.test_session.send_packet(final)
1460         # vpp should be quiet now again
1461         transmit_time = 0.9 \
1462             * max(self.vpp_session.required_min_rx,
1463                   self.test_session.desired_min_tx) \
1464             / USEC_IN_SEC
1465         count = 0
1466         for dummy in range(self.test_session.detect_mult * 2):
1467             self.sleep(transmit_time)
1468             self.test_session.send_packet(demand)
1469             try:
1470                 p = wait_for_bfd_packet(self, timeout=0)
1471                 self.logger.error(ppp("Received unexpected packet:", p))
1472                 count += 1
1473             except CaptureTimeoutError:
1474                 pass
1475         events = self.vapi.collect_events()
1476         for e in events:
1477             self.logger.error("Received unexpected event: %s", e)
1478         self.assert_equal(count, 0, "number of packets received")
1479         self.assert_equal(len(events), 0, "number of events received")
1480
1481     def test_intf_deleted(self):
1482         """ interface with bfd session deleted """
1483         intf = VppLoInterface(self)
1484         intf.config_ip4()
1485         intf.admin_up()
1486         sw_if_index = intf.sw_if_index
1487         vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1488         vpp_session.add_vpp_config()
1489         vpp_session.admin_up()
1490         intf.remove_vpp_config()
1491         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1492         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1493         self.assertFalse(vpp_session.query_vpp_config())
1494
1495
1496 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1497 class BFD6TestCase(VppTestCase):
1498     """Bidirectional Forwarding Detection (BFD) (IPv6) """
1499
1500     pg0 = None
1501     vpp_clock_offset = None
1502     vpp_session = None
1503     test_session = None
1504
1505     @classmethod
1506     def setUpClass(cls):
1507         super(BFD6TestCase, cls).setUpClass()
1508         cls.vapi.cli("set log class bfd level debug")
1509         try:
1510             cls.create_pg_interfaces([0])
1511             cls.pg0.config_ip6()
1512             cls.pg0.configure_ipv6_neighbors()
1513             cls.pg0.admin_up()
1514             cls.pg0.resolve_ndp()
1515             cls.create_loopback_interfaces(1)
1516             cls.loopback0 = cls.lo_interfaces[0]
1517             cls.loopback0.config_ip6()
1518             cls.loopback0.admin_up()
1519
1520         except Exception:
1521             super(BFD6TestCase, cls).tearDownClass()
1522             raise
1523
1524     @classmethod
1525     def tearDownClass(cls):
1526         super(BFD6TestCase, cls).tearDownClass()
1527
1528     def setUp(self):
1529         super(BFD6TestCase, self).setUp()
1530         self.factory = AuthKeyFactory()
1531         self.vapi.want_bfd_events()
1532         self.pg0.enable_capture()
1533         try:
1534             self.vpp_session = VppBFDUDPSession(self, self.pg0,
1535                                                 self.pg0.remote_ip6,
1536                                                 af=AF_INET6)
1537             self.vpp_session.add_vpp_config()
1538             self.vpp_session.admin_up()
1539             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1540             self.logger.debug(self.vapi.cli("show adj nbr"))
1541         except:
1542             self.vapi.want_bfd_events(enable_disable=0)
1543             raise
1544
1545     def tearDown(self):
1546         if not self.vpp_dead:
1547             self.vapi.want_bfd_events(enable_disable=0)
1548         self.vapi.collect_events()  # clear the event queue
1549         super(BFD6TestCase, self).tearDown()
1550
1551     def test_session_up(self):
1552         """ bring BFD session up """
1553         bfd_session_up(self)
1554
1555     def test_session_up_by_ip(self):
1556         """ bring BFD session up - first frame looked up by address pair """
1557         self.logger.info("BFD: Sending Slow control frame")
1558         self.test_session.update(my_discriminator=randint(0, 40000000))
1559         self.test_session.send_packet()
1560         self.pg0.enable_capture()
1561         p = self.pg0.wait_for_packet(1)
1562         self.assert_equal(p[BFD].your_discriminator,
1563                           self.test_session.my_discriminator,
1564                           "BFD - your discriminator")
1565         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1566         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1567                                  state=BFDState.up)
1568         self.logger.info("BFD: Waiting for event")
1569         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1570         verify_event(self, e, expected_state=BFDState.init)
1571         self.logger.info("BFD: Sending Up")
1572         self.test_session.send_packet()
1573         self.logger.info("BFD: Waiting for event")
1574         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1575         verify_event(self, e, expected_state=BFDState.up)
1576         self.logger.info("BFD: Session is Up")
1577         self.test_session.update(state=BFDState.up)
1578         self.test_session.send_packet()
1579         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1580
1581     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1582     def test_hold_up(self):
1583         """ hold BFD session up """
1584         bfd_session_up(self)
1585         for dummy in range(self.test_session.detect_mult * 2):
1586             wait_for_bfd_packet(self)
1587             self.test_session.send_packet()
1588         self.assert_equal(len(self.vapi.collect_events()), 0,
1589                           "number of bfd events")
1590         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1591
1592     def test_echo_looped_back(self):
1593         """ echo packets looped back """
1594         # don't need a session in this case..
1595         self.vpp_session.remove_vpp_config()
1596         self.pg0.enable_capture()
1597         echo_packet_count = 10
1598         # random source port low enough to increment a few times..
1599         udp_sport_tx = randint(1, 50000)
1600         udp_sport_rx = udp_sport_tx
1601         echo_packet = (Ether(src=self.pg0.remote_mac,
1602                              dst=self.pg0.local_mac) /
1603                        IPv6(src=self.pg0.remote_ip6,
1604                             dst=self.pg0.remote_ip6) /
1605                        UDP(dport=BFD.udp_dport_echo) /
1606                        Raw("this should be looped back"))
1607         for dummy in range(echo_packet_count):
1608             self.sleep(.01, "delay between echo packets")
1609             echo_packet[UDP].sport = udp_sport_tx
1610             udp_sport_tx += 1
1611             self.logger.debug(ppp("Sending packet:", echo_packet))
1612             self.pg0.add_stream(echo_packet)
1613             self.pg_start()
1614         for dummy in range(echo_packet_count):
1615             p = self.pg0.wait_for_packet(1)
1616             self.logger.debug(ppp("Got packet:", p))
1617             ether = p[Ether]
1618             self.assert_equal(self.pg0.remote_mac,
1619                               ether.dst, "Destination MAC")
1620             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1621             ip = p[IPv6]
1622             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1623             self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1624             udp = p[UDP]
1625             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1626                               "UDP destination port")
1627             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1628             udp_sport_rx += 1
1629             # need to compare the hex payload here, otherwise BFD_vpp_echo
1630             # gets in way
1631             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1632                              scapy.compat.raw(echo_packet[UDP].payload),
1633                              "Received packet is not the echo packet sent")
1634         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1635                           "ECHO packet identifier for test purposes)")
1636         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1637                           "ECHO packet identifier for test purposes)")
1638
1639     def test_echo(self):
1640         """ echo function """
1641         bfd_session_up(self)
1642         self.test_session.update(required_min_echo_rx=150000)
1643         self.test_session.send_packet()
1644         detection_time = self.test_session.detect_mult *\
1645             self.vpp_session.required_min_rx / USEC_IN_SEC
1646         # echo shouldn't work without echo source set
1647         for dummy in range(10):
1648             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1649             self.sleep(sleep, "delay before sending bfd packet")
1650             self.test_session.send_packet()
1651         p = wait_for_bfd_packet(
1652             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1653         self.assert_equal(p[BFD].required_min_rx_interval,
1654                           self.vpp_session.required_min_rx,
1655                           "BFD required min rx interval")
1656         self.test_session.send_packet()
1657         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1658         echo_seen = False
1659         # should be turned on - loopback echo packets
1660         for dummy in range(3):
1661             loop_until = time.time() + 0.75 * detection_time
1662             while time.time() < loop_until:
1663                 p = self.pg0.wait_for_packet(1)
1664                 self.logger.debug(ppp("Got packet:", p))
1665                 if p[UDP].dport == BFD.udp_dport_echo:
1666                     self.assert_equal(
1667                         p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1668                     self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1669                                         "BFD ECHO src IP equal to loopback IP")
1670                     self.logger.debug(ppp("Looping back packet:", p))
1671                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1672                                       "ECHO packet destination MAC address")
1673                     p[Ether].dst = self.pg0.local_mac
1674                     self.pg0.add_stream(p)
1675                     self.pg_start()
1676                     echo_seen = True
1677                 elif p.haslayer(BFD):
1678                     if echo_seen:
1679                         self.assertGreaterEqual(
1680                             p[BFD].required_min_rx_interval,
1681                             1000000)
1682                     if "P" in p.sprintf("%BFD.flags%"):
1683                         final = self.test_session.create_packet()
1684                         final[BFD].flags = "F"
1685                         self.test_session.send_packet(final)
1686                 else:
1687                     raise Exception(ppp("Received unknown packet:", p))
1688
1689                 self.assert_equal(len(self.vapi.collect_events()), 0,
1690                                   "number of bfd events")
1691             self.test_session.send_packet()
1692         self.assertTrue(echo_seen, "No echo packets received")
1693
1694     def test_intf_deleted(self):
1695         """ interface with bfd session deleted """
1696         intf = VppLoInterface(self)
1697         intf.config_ip6()
1698         intf.admin_up()
1699         sw_if_index = intf.sw_if_index
1700         vpp_session = VppBFDUDPSession(
1701             self, intf, intf.remote_ip6, af=AF_INET6)
1702         vpp_session.add_vpp_config()
1703         vpp_session.admin_up()
1704         intf.remove_vpp_config()
1705         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1706         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1707         self.assertFalse(vpp_session.query_vpp_config())
1708
1709
1710 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1711 class BFDFIBTestCase(VppTestCase):
1712     """ BFD-FIB interactions (IPv6) """
1713
1714     vpp_session = None
1715     test_session = None
1716
1717     @classmethod
1718     def setUpClass(cls):
1719         super(BFDFIBTestCase, cls).setUpClass()
1720
1721     @classmethod
1722     def tearDownClass(cls):
1723         super(BFDFIBTestCase, cls).tearDownClass()
1724
1725     def setUp(self):
1726         super(BFDFIBTestCase, self).setUp()
1727         self.create_pg_interfaces(range(1))
1728
1729         self.vapi.want_bfd_events()
1730         self.pg0.enable_capture()
1731
1732         for i in self.pg_interfaces:
1733             i.admin_up()
1734             i.config_ip6()
1735             i.configure_ipv6_neighbors()
1736
1737     def tearDown(self):
1738         if not self.vpp_dead:
1739             self.vapi.want_bfd_events(enable_disable=0)
1740
1741         super(BFDFIBTestCase, self).tearDown()
1742
1743     @staticmethod
1744     def pkt_is_not_data_traffic(p):
1745         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1746         if p.haslayer(BFD) or is_ipv6_misc(p):
1747             return True
1748         return False
1749
1750     def test_session_with_fib(self):
1751         """ BFD-FIB interactions """
1752
1753         # packets to match against both of the routes
1754         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1755               IPv6(src="3001::1", dst="2001::1") /
1756               UDP(sport=1234, dport=1234) /
1757               Raw('\xa5' * 100)),
1758              (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1759               IPv6(src="3001::1", dst="2002::1") /
1760               UDP(sport=1234, dport=1234) /
1761               Raw('\xa5' * 100))]
1762
1763         # A recursive and a non-recursive route via a next-hop that
1764         # will have a BFD session
1765         ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1766                                   [VppRoutePath(self.pg0.remote_ip6,
1767                                                 self.pg0.sw_if_index,
1768                                                 proto=DpoProto.DPO_PROTO_IP6)],
1769                                   is_ip6=1)
1770         ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1771                                   [VppRoutePath(self.pg0.remote_ip6,
1772                                                 0xffffffff,
1773                                                 proto=DpoProto.DPO_PROTO_IP6)],
1774                                   is_ip6=1)
1775         ip_2001_s_64.add_vpp_config()
1776         ip_2002_s_64.add_vpp_config()
1777
1778         # bring the session up now the routes are present
1779         self.vpp_session = VppBFDUDPSession(self,
1780                                             self.pg0,
1781                                             self.pg0.remote_ip6,
1782                                             af=AF_INET6)
1783         self.vpp_session.add_vpp_config()
1784         self.vpp_session.admin_up()
1785         self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1786
1787         # session is up - traffic passes
1788         bfd_session_up(self)
1789
1790         self.pg0.add_stream(p)
1791         self.pg_start()
1792         for packet in p:
1793             captured = self.pg0.wait_for_packet(
1794                 1,
1795                 filter_out_fn=self.pkt_is_not_data_traffic)
1796             self.assertEqual(captured[IPv6].dst,
1797                              packet[IPv6].dst)
1798
1799         # session is up - traffic is dropped
1800         bfd_session_down(self)
1801
1802         self.pg0.add_stream(p)
1803         self.pg_start()
1804         with self.assertRaises(CaptureTimeoutError):
1805             self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1806
1807         # session is up - traffic passes
1808         bfd_session_up(self)
1809
1810         self.pg0.add_stream(p)
1811         self.pg_start()
1812         for packet in p:
1813             captured = self.pg0.wait_for_packet(
1814                 1,
1815                 filter_out_fn=self.pkt_is_not_data_traffic)
1816             self.assertEqual(captured[IPv6].dst,
1817                              packet[IPv6].dst)
1818
1819
1820 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1821 class BFDTunTestCase(VppTestCase):
1822     """ BFD over GRE tunnel """
1823
1824     vpp_session = None
1825     test_session = None
1826
1827     @classmethod
1828     def setUpClass(cls):
1829         super(BFDTunTestCase, cls).setUpClass()
1830
1831     @classmethod
1832     def tearDownClass(cls):
1833         super(BFDTunTestCase, cls).tearDownClass()
1834
1835     def setUp(self):
1836         super(BFDTunTestCase, self).setUp()
1837         self.create_pg_interfaces(range(1))
1838
1839         self.vapi.want_bfd_events()
1840         self.pg0.enable_capture()
1841
1842         for i in self.pg_interfaces:
1843             i.admin_up()
1844             i.config_ip4()
1845             i.resolve_arp()
1846
1847     def tearDown(self):
1848         if not self.vpp_dead:
1849             self.vapi.want_bfd_events(enable_disable=0)
1850
1851         super(BFDTunTestCase, self).tearDown()
1852
1853     @staticmethod
1854     def pkt_is_not_data_traffic(p):
1855         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1856         if p.haslayer(BFD) or is_ipv6_misc(p):
1857             return True
1858         return False
1859
1860     def test_bfd_o_gre(self):
1861         """ BFD-o-GRE  """
1862
1863         # A GRE interface over which to run a BFD session
1864         gre_if = VppGreInterface(self,
1865                                  self.pg0.local_ip4,
1866                                  self.pg0.remote_ip4)
1867         gre_if.add_vpp_config()
1868         gre_if.admin_up()
1869         gre_if.config_ip4()
1870
1871         # bring the session up now the routes are present
1872         self.vpp_session = VppBFDUDPSession(self,
1873                                             gre_if,
1874                                             gre_if.remote_ip4,
1875                                             is_tunnel=True)
1876         self.vpp_session.add_vpp_config()
1877         self.vpp_session.admin_up()
1878
1879         self.test_session = BFDTestSession(
1880             self, gre_if, AF_INET,
1881             tunnel_header=(IP(src=self.pg0.remote_ip4,
1882                               dst=self.pg0.local_ip4) /
1883                            GRE()),
1884             phy_interface=self.pg0)
1885
1886         # packets to match against both of the routes
1887         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1888               IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4) /
1889               UDP(sport=1234, dport=1234) /
1890               Raw('\xa5' * 100))]
1891
1892         # session is up - traffic passes
1893         bfd_session_up(self)
1894
1895         self.send_and_expect(self.pg0, p, self.pg0)
1896
1897         # bring session down
1898         bfd_session_down(self)
1899
1900
1901 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1902 class BFDSHA1TestCase(VppTestCase):
1903     """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1904
1905     pg0 = None
1906     vpp_clock_offset = None
1907     vpp_session = None
1908     test_session = None
1909
1910     @classmethod
1911     def setUpClass(cls):
1912         super(BFDSHA1TestCase, cls).setUpClass()
1913         cls.vapi.cli("set log class bfd level debug")
1914         try:
1915             cls.create_pg_interfaces([0])
1916             cls.pg0.config_ip4()
1917             cls.pg0.admin_up()
1918             cls.pg0.resolve_arp()
1919
1920         except Exception:
1921             super(BFDSHA1TestCase, cls).tearDownClass()
1922             raise
1923
1924     @classmethod
1925     def tearDownClass(cls):
1926         super(BFDSHA1TestCase, cls).tearDownClass()
1927
1928     def setUp(self):
1929         super(BFDSHA1TestCase, self).setUp()
1930         self.factory = AuthKeyFactory()
1931         self.vapi.want_bfd_events()
1932         self.pg0.enable_capture()
1933
1934     def tearDown(self):
1935         if not self.vpp_dead:
1936             self.vapi.want_bfd_events(enable_disable=0)
1937         self.vapi.collect_events()  # clear the event queue
1938         super(BFDSHA1TestCase, self).tearDown()
1939
1940     def test_session_up(self):
1941         """ bring BFD session up """
1942         key = self.factory.create_random_key(self)
1943         key.add_vpp_config()
1944         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1945                                             self.pg0.remote_ip4,
1946                                             sha1_key=key)
1947         self.vpp_session.add_vpp_config()
1948         self.vpp_session.admin_up()
1949         self.test_session = BFDTestSession(
1950             self, self.pg0, AF_INET, sha1_key=key,
1951             bfd_key_id=self.vpp_session.bfd_key_id)
1952         bfd_session_up(self)
1953
1954     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1955     def test_hold_up(self):
1956         """ hold BFD session up """
1957         key = self.factory.create_random_key(self)
1958         key.add_vpp_config()
1959         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1960                                             self.pg0.remote_ip4,
1961                                             sha1_key=key)
1962         self.vpp_session.add_vpp_config()
1963         self.vpp_session.admin_up()
1964         self.test_session = BFDTestSession(
1965             self, self.pg0, AF_INET, sha1_key=key,
1966             bfd_key_id=self.vpp_session.bfd_key_id)
1967         bfd_session_up(self)
1968         for dummy in range(self.test_session.detect_mult * 2):
1969             wait_for_bfd_packet(self)
1970             self.test_session.send_packet()
1971         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1972
1973     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1974     def test_hold_up_meticulous(self):
1975         """ hold BFD session up - meticulous auth """
1976         key = self.factory.create_random_key(
1977             self, BFDAuthType.meticulous_keyed_sha1)
1978         key.add_vpp_config()
1979         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1980                                             self.pg0.remote_ip4, sha1_key=key)
1981         self.vpp_session.add_vpp_config()
1982         self.vpp_session.admin_up()
1983         # specify sequence number so that it wraps
1984         self.test_session = BFDTestSession(
1985             self, self.pg0, AF_INET, sha1_key=key,
1986             bfd_key_id=self.vpp_session.bfd_key_id,
1987             our_seq_number=0xFFFFFFFF - 4)
1988         bfd_session_up(self)
1989         for dummy in range(30):
1990             wait_for_bfd_packet(self)
1991             self.test_session.inc_seq_num()
1992             self.test_session.send_packet()
1993         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1994
1995     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1996     def test_send_bad_seq_number(self):
1997         """ session is not kept alive by msgs with bad sequence numbers"""
1998         key = self.factory.create_random_key(
1999             self, BFDAuthType.meticulous_keyed_sha1)
2000         key.add_vpp_config()
2001         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2002                                             self.pg0.remote_ip4, sha1_key=key)
2003         self.vpp_session.add_vpp_config()
2004         self.test_session = BFDTestSession(
2005             self, self.pg0, AF_INET, sha1_key=key,
2006             bfd_key_id=self.vpp_session.bfd_key_id)
2007         bfd_session_up(self)
2008         detection_time = self.test_session.detect_mult *\
2009             self.vpp_session.required_min_rx / USEC_IN_SEC
2010         send_until = time.time() + 2 * detection_time
2011         while time.time() < send_until:
2012             self.test_session.send_packet()
2013             self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
2014                        "time between bfd packets")
2015         e = self.vapi.collect_events()
2016         # session should be down now, because the sequence numbers weren't
2017         # updated
2018         self.assert_equal(len(e), 1, "number of bfd events")
2019         verify_event(self, e[0], expected_state=BFDState.down)
2020
2021     def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
2022                                        legitimate_test_session,
2023                                        rogue_test_session,
2024                                        rogue_bfd_values=None):
2025         """ execute a rogue session interaction scenario
2026
2027         1. create vpp session, add config
2028         2. bring the legitimate session up
2029         3. copy the bfd values from legitimate session to rogue session
2030         4. apply rogue_bfd_values to rogue session
2031         5. set rogue session state to down
2032         6. send message to take the session down from the rogue session
2033         7. assert that the legitimate session is unaffected
2034         """
2035
2036         self.vpp_session = vpp_bfd_udp_session
2037         self.vpp_session.add_vpp_config()
2038         self.test_session = legitimate_test_session
2039         # bring vpp session up
2040         bfd_session_up(self)
2041         # send packet from rogue session
2042         rogue_test_session.update(
2043             my_discriminator=self.test_session.my_discriminator,
2044             your_discriminator=self.test_session.your_discriminator,
2045             desired_min_tx=self.test_session.desired_min_tx,
2046             required_min_rx=self.test_session.required_min_rx,
2047             detect_mult=self.test_session.detect_mult,
2048             diag=self.test_session.diag,
2049             state=self.test_session.state,
2050             auth_type=self.test_session.auth_type)
2051         if rogue_bfd_values:
2052             rogue_test_session.update(**rogue_bfd_values)
2053         rogue_test_session.update(state=BFDState.down)
2054         rogue_test_session.send_packet()
2055         wait_for_bfd_packet(self)
2056         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2057
2058     @unittest.skipUnless(running_extended_tests, "part of extended tests")
2059     def test_mismatch_auth(self):
2060         """ session is not brought down by unauthenticated msg """
2061         key = self.factory.create_random_key(self)
2062         key.add_vpp_config()
2063         vpp_session = VppBFDUDPSession(
2064             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2065         legitimate_test_session = BFDTestSession(
2066             self, self.pg0, AF_INET, sha1_key=key,
2067             bfd_key_id=vpp_session.bfd_key_id)
2068         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2069         self.execute_rogue_session_scenario(vpp_session,
2070                                             legitimate_test_session,
2071                                             rogue_test_session)
2072
2073     @unittest.skipUnless(running_extended_tests, "part of extended tests")
2074     def test_mismatch_bfd_key_id(self):
2075         """ session is not brought down by msg with non-existent key-id """
2076         key = self.factory.create_random_key(self)
2077         key.add_vpp_config()
2078         vpp_session = VppBFDUDPSession(
2079             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2080         # pick a different random bfd key id
2081         x = randint(0, 255)
2082         while x == vpp_session.bfd_key_id:
2083             x = randint(0, 255)
2084         legitimate_test_session = BFDTestSession(
2085             self, self.pg0, AF_INET, sha1_key=key,
2086             bfd_key_id=vpp_session.bfd_key_id)
2087         rogue_test_session = BFDTestSession(
2088             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
2089         self.execute_rogue_session_scenario(vpp_session,
2090                                             legitimate_test_session,
2091                                             rogue_test_session)
2092
2093     @unittest.skipUnless(running_extended_tests, "part of extended tests")
2094     def test_mismatched_auth_type(self):
2095         """ session is not brought down by msg with wrong auth type """
2096         key = self.factory.create_random_key(self)
2097         key.add_vpp_config()
2098         vpp_session = VppBFDUDPSession(
2099             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2100         legitimate_test_session = BFDTestSession(
2101             self, self.pg0, AF_INET, sha1_key=key,
2102             bfd_key_id=vpp_session.bfd_key_id)
2103         rogue_test_session = BFDTestSession(
2104             self, self.pg0, AF_INET, sha1_key=key,
2105             bfd_key_id=vpp_session.bfd_key_id)
2106         self.execute_rogue_session_scenario(
2107             vpp_session, legitimate_test_session, rogue_test_session,
2108             {'auth_type': BFDAuthType.keyed_md5})
2109
2110     @unittest.skipUnless(running_extended_tests, "part of extended tests")
2111     def test_restart(self):
2112         """ simulate remote peer restart and resynchronization """
2113         key = self.factory.create_random_key(
2114             self, BFDAuthType.meticulous_keyed_sha1)
2115         key.add_vpp_config()
2116         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2117                                             self.pg0.remote_ip4, sha1_key=key)
2118         self.vpp_session.add_vpp_config()
2119         self.test_session = BFDTestSession(
2120             self, self.pg0, AF_INET, sha1_key=key,
2121             bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2122         bfd_session_up(self)
2123         # don't send any packets for 2*detection_time
2124         detection_time = self.test_session.detect_mult *\
2125             self.vpp_session.required_min_rx / USEC_IN_SEC
2126         self.sleep(2 * detection_time, "simulating peer restart")
2127         events = self.vapi.collect_events()
2128         self.assert_equal(len(events), 1, "number of bfd events")
2129         verify_event(self, events[0], expected_state=BFDState.down)
2130         self.test_session.update(state=BFDState.down)
2131         # reset sequence number
2132         self.test_session.our_seq_number = 0
2133         self.test_session.vpp_seq_number = None
2134         # now throw away any pending packets
2135         self.pg0.enable_capture()
2136         bfd_session_up(self)
2137
2138
2139 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2140 class BFDAuthOnOffTestCase(VppTestCase):
2141     """Bidirectional Forwarding Detection (BFD) (changing auth) """
2142
2143     pg0 = None
2144     vpp_session = None
2145     test_session = None
2146
2147     @classmethod
2148     def setUpClass(cls):
2149         super(BFDAuthOnOffTestCase, cls).setUpClass()
2150         cls.vapi.cli("set log class bfd level debug")
2151         try:
2152             cls.create_pg_interfaces([0])
2153             cls.pg0.config_ip4()
2154             cls.pg0.admin_up()
2155             cls.pg0.resolve_arp()
2156
2157         except Exception:
2158             super(BFDAuthOnOffTestCase, cls).tearDownClass()
2159             raise
2160
2161     @classmethod
2162     def tearDownClass(cls):
2163         super(BFDAuthOnOffTestCase, cls).tearDownClass()
2164
2165     def setUp(self):
2166         super(BFDAuthOnOffTestCase, self).setUp()
2167         self.factory = AuthKeyFactory()
2168         self.vapi.want_bfd_events()
2169         self.pg0.enable_capture()
2170
2171     def tearDown(self):
2172         if not self.vpp_dead:
2173             self.vapi.want_bfd_events(enable_disable=0)
2174         self.vapi.collect_events()  # clear the event queue
2175         super(BFDAuthOnOffTestCase, self).tearDown()
2176
2177     def test_auth_on_immediate(self):
2178         """ turn auth on without disturbing session state (immediate) """
2179         key = self.factory.create_random_key(self)
2180         key.add_vpp_config()
2181         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2182                                             self.pg0.remote_ip4)
2183         self.vpp_session.add_vpp_config()
2184         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2185         bfd_session_up(self)
2186         for dummy in range(self.test_session.detect_mult * 2):
2187             p = wait_for_bfd_packet(self)
2188             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2189             self.test_session.send_packet()
2190         self.vpp_session.activate_auth(key)
2191         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2192         self.test_session.sha1_key = key
2193         for dummy in range(self.test_session.detect_mult * 2):
2194             p = wait_for_bfd_packet(self)
2195             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2196             self.test_session.send_packet()
2197         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2198         self.assert_equal(len(self.vapi.collect_events()), 0,
2199                           "number of bfd events")
2200
2201     def test_auth_off_immediate(self):
2202         """ turn auth off without disturbing session state (immediate) """
2203         key = self.factory.create_random_key(self)
2204         key.add_vpp_config()
2205         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2206                                             self.pg0.remote_ip4, sha1_key=key)
2207         self.vpp_session.add_vpp_config()
2208         self.test_session = BFDTestSession(
2209             self, self.pg0, AF_INET, sha1_key=key,
2210             bfd_key_id=self.vpp_session.bfd_key_id)
2211         bfd_session_up(self)
2212         # self.vapi.want_bfd_events(enable_disable=0)
2213         for dummy in range(self.test_session.detect_mult * 2):
2214             p = wait_for_bfd_packet(self)
2215             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2216             self.test_session.inc_seq_num()
2217             self.test_session.send_packet()
2218         self.vpp_session.deactivate_auth()
2219         self.test_session.bfd_key_id = None
2220         self.test_session.sha1_key = None
2221         for dummy in range(self.test_session.detect_mult * 2):
2222             p = wait_for_bfd_packet(self)
2223             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2224             self.test_session.inc_seq_num()
2225             self.test_session.send_packet()
2226         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2227         self.assert_equal(len(self.vapi.collect_events()), 0,
2228                           "number of bfd events")
2229
2230     def test_auth_change_key_immediate(self):
2231         """ change auth key without disturbing session state (immediate) """
2232         key1 = self.factory.create_random_key(self)
2233         key1.add_vpp_config()
2234         key2 = self.factory.create_random_key(self)
2235         key2.add_vpp_config()
2236         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2237                                             self.pg0.remote_ip4, sha1_key=key1)
2238         self.vpp_session.add_vpp_config()
2239         self.test_session = BFDTestSession(
2240             self, self.pg0, AF_INET, sha1_key=key1,
2241             bfd_key_id=self.vpp_session.bfd_key_id)
2242         bfd_session_up(self)
2243         for dummy in range(self.test_session.detect_mult * 2):
2244             p = wait_for_bfd_packet(self)
2245             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2246             self.test_session.send_packet()
2247         self.vpp_session.activate_auth(key2)
2248         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2249         self.test_session.sha1_key = key2
2250         for dummy in range(self.test_session.detect_mult * 2):
2251             p = wait_for_bfd_packet(self)
2252             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2253             self.test_session.send_packet()
2254         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2255         self.assert_equal(len(self.vapi.collect_events()), 0,
2256                           "number of bfd events")
2257
2258     def test_auth_on_delayed(self):
2259         """ turn auth on without disturbing session state (delayed) """
2260         key = self.factory.create_random_key(self)
2261         key.add_vpp_config()
2262         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2263                                             self.pg0.remote_ip4)
2264         self.vpp_session.add_vpp_config()
2265         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2266         bfd_session_up(self)
2267         for dummy in range(self.test_session.detect_mult * 2):
2268             wait_for_bfd_packet(self)
2269             self.test_session.send_packet()
2270         self.vpp_session.activate_auth(key, delayed=True)
2271         for dummy in range(self.test_session.detect_mult * 2):
2272             p = wait_for_bfd_packet(self)
2273             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2274             self.test_session.send_packet()
2275         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2276         self.test_session.sha1_key = key
2277         self.test_session.send_packet()
2278         for dummy in range(self.test_session.detect_mult * 2):
2279             p = wait_for_bfd_packet(self)
2280             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2281             self.test_session.send_packet()
2282         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2283         self.assert_equal(len(self.vapi.collect_events()), 0,
2284                           "number of bfd events")
2285
2286     def test_auth_off_delayed(self):
2287         """ turn auth off without disturbing session state (delayed) """
2288         key = self.factory.create_random_key(self)
2289         key.add_vpp_config()
2290         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2291                                             self.pg0.remote_ip4, sha1_key=key)
2292         self.vpp_session.add_vpp_config()
2293         self.test_session = BFDTestSession(
2294             self, self.pg0, AF_INET, sha1_key=key,
2295             bfd_key_id=self.vpp_session.bfd_key_id)
2296         bfd_session_up(self)
2297         for dummy in range(self.test_session.detect_mult * 2):
2298             p = wait_for_bfd_packet(self)
2299             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2300             self.test_session.send_packet()
2301         self.vpp_session.deactivate_auth(delayed=True)
2302         for dummy in range(self.test_session.detect_mult * 2):
2303             p = wait_for_bfd_packet(self)
2304             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2305             self.test_session.send_packet()
2306         self.test_session.bfd_key_id = None
2307         self.test_session.sha1_key = None
2308         self.test_session.send_packet()
2309         for dummy in range(self.test_session.detect_mult * 2):
2310             p = wait_for_bfd_packet(self)
2311             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2312             self.test_session.send_packet()
2313         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2314         self.assert_equal(len(self.vapi.collect_events()), 0,
2315                           "number of bfd events")
2316
2317     def test_auth_change_key_delayed(self):
2318         """ change auth key without disturbing session state (delayed) """
2319         key1 = self.factory.create_random_key(self)
2320         key1.add_vpp_config()
2321         key2 = self.factory.create_random_key(self)
2322         key2.add_vpp_config()
2323         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2324                                             self.pg0.remote_ip4, sha1_key=key1)
2325         self.vpp_session.add_vpp_config()
2326         self.vpp_session.admin_up()
2327         self.test_session = BFDTestSession(
2328             self, self.pg0, AF_INET, sha1_key=key1,
2329             bfd_key_id=self.vpp_session.bfd_key_id)
2330         bfd_session_up(self)
2331         for dummy in range(self.test_session.detect_mult * 2):
2332             p = wait_for_bfd_packet(self)
2333             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2334             self.test_session.send_packet()
2335         self.vpp_session.activate_auth(key2, delayed=True)
2336         for dummy in range(self.test_session.detect_mult * 2):
2337             p = wait_for_bfd_packet(self)
2338             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2339             self.test_session.send_packet()
2340         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2341         self.test_session.sha1_key = key2
2342         self.test_session.send_packet()
2343         for dummy in range(self.test_session.detect_mult * 2):
2344             p = wait_for_bfd_packet(self)
2345             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2346             self.test_session.send_packet()
2347         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2348         self.assert_equal(len(self.vapi.collect_events()), 0,
2349                           "number of bfd events")
2350
2351
2352 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2353 class BFDCLITestCase(VppTestCase):
2354     """Bidirectional Forwarding Detection (BFD) (CLI) """
2355     pg0 = None
2356
2357     @classmethod
2358     def setUpClass(cls):
2359         super(BFDCLITestCase, cls).setUpClass()
2360         cls.vapi.cli("set log class bfd level debug")
2361         try:
2362             cls.create_pg_interfaces((0,))
2363             cls.pg0.config_ip4()
2364             cls.pg0.config_ip6()
2365             cls.pg0.resolve_arp()
2366             cls.pg0.resolve_ndp()
2367
2368         except Exception:
2369             super(BFDCLITestCase, cls).tearDownClass()
2370             raise
2371
2372     @classmethod
2373     def tearDownClass(cls):
2374         super(BFDCLITestCase, cls).tearDownClass()
2375
2376     def setUp(self):
2377         super(BFDCLITestCase, self).setUp()
2378         self.factory = AuthKeyFactory()
2379         self.pg0.enable_capture()
2380
2381     def tearDown(self):
2382         try:
2383             self.vapi.want_bfd_events(enable_disable=0)
2384         except UnexpectedApiReturnValueError:
2385             # some tests aren't subscribed, so this is not an issue
2386             pass
2387         self.vapi.collect_events()  # clear the event queue
2388         super(BFDCLITestCase, self).tearDown()
2389
2390     def cli_verify_no_response(self, cli):
2391         """ execute a CLI, asserting that the response is empty """
2392         self.assert_equal(self.vapi.cli(cli),
2393                           b"",
2394                           "CLI command response")
2395
2396     def cli_verify_response(self, cli, expected):
2397         """ execute a CLI, asserting that the response matches expectation """
2398         self.assert_equal(self.vapi.cli(cli).strip(),
2399                           expected,
2400                           "CLI command response")
2401
2402     def test_show(self):
2403         """ show commands """
2404         k1 = self.factory.create_random_key(self)
2405         k1.add_vpp_config()
2406         k2 = self.factory.create_random_key(
2407             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2408         k2.add_vpp_config()
2409         s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2410         s1.add_vpp_config()
2411         s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2412                               sha1_key=k2)
2413         s2.add_vpp_config()
2414         self.logger.info(self.vapi.ppcli("show bfd keys"))
2415         self.logger.info(self.vapi.ppcli("show bfd sessions"))
2416         self.logger.info(self.vapi.ppcli("show bfd"))
2417
2418     def test_set_del_sha1_key(self):
2419         """ set/delete SHA1 auth key """
2420         k = self.factory.create_random_key(self)
2421         self.registry.register(k, self.logger)
2422         self.cli_verify_no_response(
2423             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2424             (k.conf_key_id,
2425                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2426         self.assertTrue(k.query_vpp_config())
2427         self.vpp_session = VppBFDUDPSession(
2428             self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2429         self.vpp_session.add_vpp_config()
2430         self.test_session = \
2431             BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2432                            bfd_key_id=self.vpp_session.bfd_key_id)
2433         self.vapi.want_bfd_events()
2434         bfd_session_up(self)
2435         bfd_session_down(self)
2436         # try to replace the secret for the key - should fail because the key
2437         # is in-use
2438         k2 = self.factory.create_random_key(self)
2439         self.cli_verify_response(
2440             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2441             (k.conf_key_id,
2442                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2443             "bfd key set: `bfd_auth_set_key' API call failed, "
2444             "rv=-103:BFD object in use")
2445         # manipulating the session using old secret should still work
2446         bfd_session_up(self)
2447         bfd_session_down(self)
2448         self.vpp_session.remove_vpp_config()
2449         self.cli_verify_no_response(
2450             "bfd key del conf-key-id %s" % k.conf_key_id)
2451         self.assertFalse(k.query_vpp_config())
2452
2453     def test_set_del_meticulous_sha1_key(self):
2454         """ set/delete meticulous SHA1 auth key """
2455         k = self.factory.create_random_key(
2456             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2457         self.registry.register(k, self.logger)
2458         self.cli_verify_no_response(
2459             "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2460             (k.conf_key_id,
2461                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2462         self.assertTrue(k.query_vpp_config())
2463         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2464                                             self.pg0.remote_ip6, af=AF_INET6,
2465                                             sha1_key=k)
2466         self.vpp_session.add_vpp_config()
2467         self.vpp_session.admin_up()
2468         self.test_session = \
2469             BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2470                            bfd_key_id=self.vpp_session.bfd_key_id)
2471         self.vapi.want_bfd_events()
2472         bfd_session_up(self)
2473         bfd_session_down(self)
2474         # try to replace the secret for the key - should fail because the key
2475         # is in-use
2476         k2 = self.factory.create_random_key(self)
2477         self.cli_verify_response(
2478             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2479             (k.conf_key_id,
2480                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2481             "bfd key set: `bfd_auth_set_key' API call failed, "
2482             "rv=-103:BFD object in use")
2483         # manipulating the session using old secret should still work
2484         bfd_session_up(self)
2485         bfd_session_down(self)
2486         self.vpp_session.remove_vpp_config()
2487         self.cli_verify_no_response(
2488             "bfd key del conf-key-id %s" % k.conf_key_id)
2489         self.assertFalse(k.query_vpp_config())
2490
2491     def test_add_mod_del_bfd_udp(self):
2492         """ create/modify/delete IPv4 BFD UDP session """
2493         vpp_session = VppBFDUDPSession(
2494             self, self.pg0, self.pg0.remote_ip4)
2495         self.registry.register(vpp_session, self.logger)
2496         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2497             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2498             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2499                                 self.pg0.remote_ip4,
2500                                 vpp_session.desired_min_tx,
2501                                 vpp_session.required_min_rx,
2502                                 vpp_session.detect_mult)
2503         self.cli_verify_no_response(cli_add_cmd)
2504         # 2nd add should fail
2505         self.cli_verify_response(
2506             cli_add_cmd,
2507             "bfd udp session add: `bfd_add_add_session' API call"
2508             " failed, rv=-101:Duplicate BFD object")
2509         verify_bfd_session_config(self, vpp_session)
2510         mod_session = VppBFDUDPSession(
2511             self, self.pg0, self.pg0.remote_ip4,
2512             required_min_rx=2 * vpp_session.required_min_rx,
2513             desired_min_tx=3 * vpp_session.desired_min_tx,
2514             detect_mult=4 * vpp_session.detect_mult)
2515         self.cli_verify_no_response(
2516             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2517             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2518             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2519              mod_session.desired_min_tx, mod_session.required_min_rx,
2520              mod_session.detect_mult))
2521         verify_bfd_session_config(self, mod_session)
2522         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2523             "peer-addr %s" % (self.pg0.name,
2524                               self.pg0.local_ip4, self.pg0.remote_ip4)
2525         self.cli_verify_no_response(cli_del_cmd)
2526         # 2nd del is expected to fail
2527         self.cli_verify_response(
2528             cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2529             " failed, rv=-102:No such BFD object")
2530         self.assertFalse(vpp_session.query_vpp_config())
2531
2532     def test_add_mod_del_bfd_udp6(self):
2533         """ create/modify/delete IPv6 BFD UDP session """
2534         vpp_session = VppBFDUDPSession(
2535             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2536         self.registry.register(vpp_session, self.logger)
2537         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2538             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2539             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2540                                 self.pg0.remote_ip6,
2541                                 vpp_session.desired_min_tx,
2542                                 vpp_session.required_min_rx,
2543                                 vpp_session.detect_mult)
2544         self.cli_verify_no_response(cli_add_cmd)
2545         # 2nd add should fail
2546         self.cli_verify_response(
2547             cli_add_cmd,
2548             "bfd udp session add: `bfd_add_add_session' API call"
2549             " failed, rv=-101:Duplicate BFD object")
2550         verify_bfd_session_config(self, vpp_session)
2551         mod_session = VppBFDUDPSession(
2552             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2553             required_min_rx=2 * vpp_session.required_min_rx,
2554             desired_min_tx=3 * vpp_session.desired_min_tx,
2555             detect_mult=4 * vpp_session.detect_mult)
2556         self.cli_verify_no_response(
2557             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2558             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2559             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2560              mod_session.desired_min_tx,
2561              mod_session.required_min_rx, mod_session.detect_mult))
2562         verify_bfd_session_config(self, mod_session)
2563         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2564             "peer-addr %s" % (self.pg0.name,
2565                               self.pg0.local_ip6, self.pg0.remote_ip6)
2566         self.cli_verify_no_response(cli_del_cmd)
2567         # 2nd del is expected to fail
2568         self.cli_verify_response(
2569             cli_del_cmd,
2570             "bfd udp session del: `bfd_udp_del_session' API call"
2571             " failed, rv=-102:No such BFD object")
2572         self.assertFalse(vpp_session.query_vpp_config())
2573
2574     def test_add_mod_del_bfd_udp_auth(self):
2575         """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2576         key = self.factory.create_random_key(self)
2577         key.add_vpp_config()
2578         vpp_session = VppBFDUDPSession(
2579             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2580         self.registry.register(vpp_session, self.logger)
2581         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2582             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2583             "detect-mult %s conf-key-id %s bfd-key-id %s"\
2584             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2585                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2586                vpp_session.detect_mult, key.conf_key_id,
2587                vpp_session.bfd_key_id)
2588         self.cli_verify_no_response(cli_add_cmd)
2589         # 2nd add should fail
2590         self.cli_verify_response(
2591             cli_add_cmd,
2592             "bfd udp session add: `bfd_add_add_session' API call"
2593             " failed, rv=-101:Duplicate BFD object")
2594         verify_bfd_session_config(self, vpp_session)
2595         mod_session = VppBFDUDPSession(
2596             self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2597             bfd_key_id=vpp_session.bfd_key_id,
2598             required_min_rx=2 * vpp_session.required_min_rx,
2599             desired_min_tx=3 * vpp_session.desired_min_tx,
2600             detect_mult=4 * vpp_session.detect_mult)
2601         self.cli_verify_no_response(
2602             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2603             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2604             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2605              mod_session.desired_min_tx,
2606              mod_session.required_min_rx, mod_session.detect_mult))
2607         verify_bfd_session_config(self, mod_session)
2608         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2609             "peer-addr %s" % (self.pg0.name,
2610                               self.pg0.local_ip4, self.pg0.remote_ip4)
2611         self.cli_verify_no_response(cli_del_cmd)
2612         # 2nd del is expected to fail
2613         self.cli_verify_response(
2614             cli_del_cmd,
2615             "bfd udp session del: `bfd_udp_del_session' API call"
2616             " failed, rv=-102:No such BFD object")
2617         self.assertFalse(vpp_session.query_vpp_config())
2618
2619     def test_add_mod_del_bfd_udp6_auth(self):
2620         """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2621         key = self.factory.create_random_key(
2622             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2623         key.add_vpp_config()
2624         vpp_session = VppBFDUDPSession(
2625             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2626         self.registry.register(vpp_session, self.logger)
2627         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2628             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2629             "detect-mult %s conf-key-id %s bfd-key-id %s" \
2630             % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2631                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2632                vpp_session.detect_mult, key.conf_key_id,
2633                vpp_session.bfd_key_id)
2634         self.cli_verify_no_response(cli_add_cmd)
2635         # 2nd add should fail
2636         self.cli_verify_response(
2637             cli_add_cmd,
2638             "bfd udp session add: `bfd_add_add_session' API call"
2639             " failed, rv=-101:Duplicate BFD object")
2640         verify_bfd_session_config(self, vpp_session)
2641         mod_session = VppBFDUDPSession(
2642             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2643             bfd_key_id=vpp_session.bfd_key_id,
2644             required_min_rx=2 * vpp_session.required_min_rx,
2645             desired_min_tx=3 * vpp_session.desired_min_tx,
2646             detect_mult=4 * vpp_session.detect_mult)
2647         self.cli_verify_no_response(
2648             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2649             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2650             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2651              mod_session.desired_min_tx,
2652              mod_session.required_min_rx, mod_session.detect_mult))
2653         verify_bfd_session_config(self, mod_session)
2654         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2655             "peer-addr %s" % (self.pg0.name,
2656                               self.pg0.local_ip6, self.pg0.remote_ip6)
2657         self.cli_verify_no_response(cli_del_cmd)
2658         # 2nd del is expected to fail
2659         self.cli_verify_response(
2660             cli_del_cmd,
2661             "bfd udp session del: `bfd_udp_del_session' API call"
2662             " failed, rv=-102:No such BFD object")
2663         self.assertFalse(vpp_session.query_vpp_config())
2664
2665     def test_auth_on_off(self):
2666         """ turn authentication on and off """
2667         key = self.factory.create_random_key(
2668             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2669         key.add_vpp_config()
2670         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2671         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2672                                         sha1_key=key)
2673         session.add_vpp_config()
2674         cli_activate = \
2675             "bfd udp session auth activate interface %s local-addr %s "\
2676             "peer-addr %s conf-key-id %s bfd-key-id %s"\
2677             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2678                key.conf_key_id, auth_session.bfd_key_id)
2679         self.cli_verify_no_response(cli_activate)
2680         verify_bfd_session_config(self, auth_session)
2681         self.cli_verify_no_response(cli_activate)
2682         verify_bfd_session_config(self, auth_session)
2683         cli_deactivate = \
2684             "bfd udp session auth deactivate interface %s local-addr %s "\
2685             "peer-addr %s "\
2686             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2687         self.cli_verify_no_response(cli_deactivate)
2688         verify_bfd_session_config(self, session)
2689         self.cli_verify_no_response(cli_deactivate)
2690         verify_bfd_session_config(self, session)
2691
2692     def test_auth_on_off_delayed(self):
2693         """ turn authentication on and off (delayed) """
2694         key = self.factory.create_random_key(
2695             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2696         key.add_vpp_config()
2697         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2698         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2699                                         sha1_key=key)
2700         session.add_vpp_config()
2701         cli_activate = \
2702             "bfd udp session auth activate interface %s local-addr %s "\
2703             "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2704             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2705                key.conf_key_id, auth_session.bfd_key_id)
2706         self.cli_verify_no_response(cli_activate)
2707         verify_bfd_session_config(self, auth_session)
2708         self.cli_verify_no_response(cli_activate)
2709         verify_bfd_session_config(self, auth_session)
2710         cli_deactivate = \
2711             "bfd udp session auth deactivate interface %s local-addr %s "\
2712             "peer-addr %s delayed yes"\
2713             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2714         self.cli_verify_no_response(cli_deactivate)
2715         verify_bfd_session_config(self, session)
2716         self.cli_verify_no_response(cli_deactivate)
2717         verify_bfd_session_config(self, session)
2718
2719     def test_admin_up_down(self):
2720         """ put session admin-up and admin-down """
2721         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2722         session.add_vpp_config()
2723         cli_down = \
2724             "bfd udp session set-flags admin down interface %s local-addr %s "\
2725             "peer-addr %s "\
2726             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2727         cli_up = \
2728             "bfd udp session set-flags admin up interface %s local-addr %s "\
2729             "peer-addr %s "\
2730             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2731         self.cli_verify_no_response(cli_down)
2732         verify_bfd_session_config(self, session, state=BFDState.admin_down)
2733         self.cli_verify_no_response(cli_up)
2734         verify_bfd_session_config(self, session, state=BFDState.down)
2735
2736     def test_set_del_udp_echo_source(self):
2737         """ set/del udp echo source """
2738         self.create_loopback_interfaces(1)
2739         self.loopback0 = self.lo_interfaces[0]
2740         self.loopback0.admin_up()
2741         self.cli_verify_response("show bfd echo-source",
2742                                  "UDP echo source is not set.")
2743         cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2744         self.cli_verify_no_response(cli_set)
2745         self.cli_verify_response("show bfd echo-source",
2746                                  "UDP echo source is: %s\n"
2747                                  "IPv4 address usable as echo source: none\n"
2748                                  "IPv6 address usable as echo source: none" %
2749                                  self.loopback0.name)
2750         self.loopback0.config_ip4()
2751         unpacked = unpack("!L", self.loopback0.local_ip4n)
2752         echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2753         self.cli_verify_response("show bfd echo-source",
2754                                  "UDP echo source is: %s\n"
2755                                  "IPv4 address usable as echo source: %s\n"
2756                                  "IPv6 address usable as echo source: none" %
2757                                  (self.loopback0.name, echo_ip4))
2758         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2759         echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2760                                             unpacked[2], unpacked[3] ^ 1))
2761         self.loopback0.config_ip6()
2762         self.cli_verify_response("show bfd echo-source",
2763                                  "UDP echo source is: %s\n"
2764                                  "IPv4 address usable as echo source: %s\n"
2765                                  "IPv6 address usable as echo source: %s" %
2766                                  (self.loopback0.name, echo_ip4, echo_ip6))
2767         cli_del = "bfd udp echo-source del"
2768         self.cli_verify_no_response(cli_del)
2769         self.cli_verify_response("show bfd echo-source",
2770                                  "UDP echo source is not set.")
2771
2772 if __name__ == '__main__':
2773     unittest.main(testRunner=VppTestRunner)