tests: make tests less make dependent
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python3
2 """ BFD tests """
3
4 from __future__ import division
5
6 import binascii
7 from collections import namedtuple
8 import hashlib
9 import ipaddress
10 import reprlib
11 import time
12 import unittest
13 from random import randint, shuffle, getrandbits
14 from socket import AF_INET, AF_INET6, inet_ntop
15 from struct import pack, unpack
16
17 import scapy.compat
18 from scapy.layers.inet import UDP, IP
19 from scapy.layers.inet6 import IPv6
20 from scapy.layers.l2 import Ether, GRE
21 from scapy.packet import Raw
22
23 from config import config
24 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
25     BFDDiagCode, BFDState, BFD_vpp_echo
26 from framework import tag_fixme_vpp_workers
27 from framework import VppTestCase, VppTestRunner
28 from framework import tag_run_solo
29 from util import ppp
30 from vpp_ip import DpoProto
31 from vpp_ip_route import VppIpRoute, VppRoutePath
32 from vpp_lo_interface import VppLoInterface
33 from vpp_papi_provider import UnexpectedApiReturnValueError, \
34     CliFailedCommandError
35 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
36 from vpp_gre_interface import VppGreInterface
37 from vpp_papi import VppEnum
38
39 USEC_IN_SEC = 1000000
40
41
42 class AuthKeyFactory(object):
43     """Factory class for creating auth keys with unique conf key ID"""
44
45     def __init__(self):
46         self._conf_key_ids = {}
47
48     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
49         """ create a random key with unique conf key id """
50         conf_key_id = randint(0, 0xFFFFFFFF)
51         while conf_key_id in self._conf_key_ids:
52             conf_key_id = randint(0, 0xFFFFFFFF)
53         self._conf_key_ids[conf_key_id] = 1
54         key = scapy.compat.raw(
55             bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
56         return VppBFDAuthKey(test=test, auth_type=auth_type,
57                              conf_key_id=conf_key_id, key=key)
58
59
60 class BFDAPITestCase(VppTestCase):
61     """Bidirectional Forwarding Detection (BFD) - API"""
62
63     pg0 = None
64     pg1 = None
65
66     @classmethod
67     def setUpClass(cls):
68         super(BFDAPITestCase, cls).setUpClass()
69         cls.vapi.cli("set log class bfd level debug")
70         try:
71             cls.create_pg_interfaces(range(2))
72             for i in cls.pg_interfaces:
73                 i.config_ip4()
74                 i.config_ip6()
75                 i.resolve_arp()
76
77         except Exception:
78             super(BFDAPITestCase, cls).tearDownClass()
79             raise
80
81     @classmethod
82     def tearDownClass(cls):
83         super(BFDAPITestCase, cls).tearDownClass()
84
85     def setUp(self):
86         super(BFDAPITestCase, self).setUp()
87         self.factory = AuthKeyFactory()
88
89     def test_add_bfd(self):
90         """ create a BFD session """
91         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
92         session.add_vpp_config()
93         self.logger.debug("Session state is %s", session.state)
94         session.remove_vpp_config()
95         session.add_vpp_config()
96         self.logger.debug("Session state is %s", session.state)
97         session.remove_vpp_config()
98
99     def test_double_add(self):
100         """ create the same BFD session twice (negative case) """
101         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
102         session.add_vpp_config()
103
104         with self.vapi.assert_negative_api_retval():
105             session.add_vpp_config()
106
107         session.remove_vpp_config()
108
109     def test_add_bfd6(self):
110         """ create IPv6 BFD session """
111         session = VppBFDUDPSession(
112             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
113         session.add_vpp_config()
114         self.logger.debug("Session state is %s", session.state)
115         session.remove_vpp_config()
116         session.add_vpp_config()
117         self.logger.debug("Session state is %s", session.state)
118         session.remove_vpp_config()
119
120     def test_mod_bfd(self):
121         """ modify BFD session parameters """
122         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
123                                    desired_min_tx=50000,
124                                    required_min_rx=10000,
125                                    detect_mult=1)
126         session.add_vpp_config()
127         s = session.get_bfd_udp_session_dump_entry()
128         self.assert_equal(session.desired_min_tx,
129                           s.desired_min_tx,
130                           "desired min transmit interval")
131         self.assert_equal(session.required_min_rx,
132                           s.required_min_rx,
133                           "required min receive interval")
134         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
135         session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
136                                   required_min_rx=session.required_min_rx * 2,
137                                   detect_mult=session.detect_mult * 2)
138         s = session.get_bfd_udp_session_dump_entry()
139         self.assert_equal(session.desired_min_tx,
140                           s.desired_min_tx,
141                           "desired min transmit interval")
142         self.assert_equal(session.required_min_rx,
143                           s.required_min_rx,
144                           "required min receive interval")
145         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
146
147     def test_upd_bfd(self):
148         """ Create/Modify w/ Update BFD session parameters """
149         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
150                                    desired_min_tx=50000,
151                                    required_min_rx=10000,
152                                    detect_mult=1)
153         session.upd_vpp_config()
154         s = session.get_bfd_udp_session_dump_entry()
155         self.assert_equal(session.desired_min_tx,
156                           s.desired_min_tx,
157                           "desired min transmit interval")
158         self.assert_equal(session.required_min_rx,
159                           s.required_min_rx,
160                           "required min receive interval")
161
162         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
163         session.upd_vpp_config(desired_min_tx=session.desired_min_tx * 2,
164                                required_min_rx=session.required_min_rx * 2,
165                                detect_mult=session.detect_mult * 2)
166         s = session.get_bfd_udp_session_dump_entry()
167         self.assert_equal(session.desired_min_tx,
168                           s.desired_min_tx,
169                           "desired min transmit interval")
170         self.assert_equal(session.required_min_rx,
171                           s.required_min_rx,
172                           "required min receive interval")
173         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
174
175     def test_add_sha1_keys(self):
176         """ add SHA1 keys """
177         key_count = 10
178         keys = [self.factory.create_random_key(
179             self) for i in range(0, key_count)]
180         for key in keys:
181             self.assertFalse(key.query_vpp_config())
182         for key in keys:
183             key.add_vpp_config()
184         for key in keys:
185             self.assertTrue(key.query_vpp_config())
186         # remove randomly
187         indexes = list(range(key_count))
188         shuffle(indexes)
189         removed = []
190         for i in indexes:
191             key = keys[i]
192             key.remove_vpp_config()
193             removed.append(i)
194             for j in range(key_count):
195                 key = keys[j]
196                 if j in removed:
197                     self.assertFalse(key.query_vpp_config())
198                 else:
199                     self.assertTrue(key.query_vpp_config())
200         # should be removed now
201         for key in keys:
202             self.assertFalse(key.query_vpp_config())
203         # add back and remove again
204         for key in keys:
205             key.add_vpp_config()
206         for key in keys:
207             self.assertTrue(key.query_vpp_config())
208         for key in keys:
209             key.remove_vpp_config()
210         for key in keys:
211             self.assertFalse(key.query_vpp_config())
212
213     def test_add_bfd_sha1(self):
214         """ create a BFD session (SHA1) """
215         key = self.factory.create_random_key(self)
216         key.add_vpp_config()
217         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
218                                    sha1_key=key)
219         session.add_vpp_config()
220         self.logger.debug("Session state is %s", session.state)
221         session.remove_vpp_config()
222         session.add_vpp_config()
223         self.logger.debug("Session state is %s", session.state)
224         session.remove_vpp_config()
225
226     def test_double_add_sha1(self):
227         """ create the same BFD session twice (negative case) (SHA1) """
228         key = self.factory.create_random_key(self)
229         key.add_vpp_config()
230         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
231                                    sha1_key=key)
232         session.add_vpp_config()
233         with self.assertRaises(Exception):
234             session.add_vpp_config()
235
236     def test_add_auth_nonexistent_key(self):
237         """ create BFD session using non-existent SHA1 (negative case) """
238         session = VppBFDUDPSession(
239             self, self.pg0, self.pg0.remote_ip4,
240             sha1_key=self.factory.create_random_key(self))
241         with self.assertRaises(Exception):
242             session.add_vpp_config()
243
244     def test_shared_sha1_key(self):
245         """ single SHA1 key shared by multiple BFD sessions """
246         key = self.factory.create_random_key(self)
247         key.add_vpp_config()
248         sessions = [
249             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
250                              sha1_key=key),
251             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
252                              sha1_key=key, af=AF_INET6),
253             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
254                              sha1_key=key),
255             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
256                              sha1_key=key, af=AF_INET6)]
257         for s in sessions:
258             s.add_vpp_config()
259         removed = 0
260         for s in sessions:
261             e = key.get_bfd_auth_keys_dump_entry()
262             self.assert_equal(e.use_count, len(sessions) - removed,
263                               "Use count for shared key")
264             s.remove_vpp_config()
265             removed += 1
266         e = key.get_bfd_auth_keys_dump_entry()
267         self.assert_equal(e.use_count, len(sessions) - removed,
268                           "Use count for shared key")
269
270     def test_activate_auth(self):
271         """ activate SHA1 authentication """
272         key = self.factory.create_random_key(self)
273         key.add_vpp_config()
274         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
275         session.add_vpp_config()
276         session.activate_auth(key)
277
278     def test_deactivate_auth(self):
279         """ deactivate SHA1 authentication """
280         key = self.factory.create_random_key(self)
281         key.add_vpp_config()
282         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
283         session.add_vpp_config()
284         session.activate_auth(key)
285         session.deactivate_auth()
286
287     def test_change_key(self):
288         """ change SHA1 key """
289         key1 = self.factory.create_random_key(self)
290         key2 = self.factory.create_random_key(self)
291         while key2.conf_key_id == key1.conf_key_id:
292             key2 = self.factory.create_random_key(self)
293         key1.add_vpp_config()
294         key2.add_vpp_config()
295         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
296                                    sha1_key=key1)
297         session.add_vpp_config()
298         session.activate_auth(key2)
299
300     def test_set_del_udp_echo_source(self):
301         """ set/del udp echo source """
302         self.create_loopback_interfaces(1)
303         self.loopback0 = self.lo_interfaces[0]
304         self.loopback0.admin_up()
305         echo_source = self.vapi.bfd_udp_get_echo_source()
306         self.assertFalse(echo_source.is_set)
307         self.assertFalse(echo_source.have_usable_ip4)
308         self.assertFalse(echo_source.have_usable_ip6)
309
310         self.vapi.bfd_udp_set_echo_source(
311             sw_if_index=self.loopback0.sw_if_index)
312         echo_source = self.vapi.bfd_udp_get_echo_source()
313         self.assertTrue(echo_source.is_set)
314         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
315         self.assertFalse(echo_source.have_usable_ip4)
316         self.assertFalse(echo_source.have_usable_ip6)
317
318         self.loopback0.config_ip4()
319         echo_ip4 = ipaddress.IPv4Address(int(ipaddress.IPv4Address(
320             self.loopback0.local_ip4)) ^ 1).packed
321         echo_source = self.vapi.bfd_udp_get_echo_source()
322         self.assertTrue(echo_source.is_set)
323         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
324         self.assertTrue(echo_source.have_usable_ip4)
325         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
326         self.assertFalse(echo_source.have_usable_ip6)
327
328         self.loopback0.config_ip6()
329         echo_ip6 = ipaddress.IPv6Address(int(ipaddress.IPv6Address(
330             self.loopback0.local_ip6)) ^ 1).packed
331
332         echo_source = self.vapi.bfd_udp_get_echo_source()
333         self.assertTrue(echo_source.is_set)
334         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
335         self.assertTrue(echo_source.have_usable_ip4)
336         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
337         self.assertTrue(echo_source.have_usable_ip6)
338         self.assertEqual(echo_source.ip6_addr.packed, echo_ip6)
339
340         self.vapi.bfd_udp_del_echo_source()
341         echo_source = self.vapi.bfd_udp_get_echo_source()
342         self.assertFalse(echo_source.is_set)
343         self.assertFalse(echo_source.have_usable_ip4)
344         self.assertFalse(echo_source.have_usable_ip6)
345
346
347 class BFDTestSession(object):
348     """ BFD session as seen from test framework side """
349
350     def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
351                  bfd_key_id=None, our_seq_number=None,
352                  tunnel_header=None, phy_interface=None):
353         self.test = test
354         self.af = af
355         self.sha1_key = sha1_key
356         self.bfd_key_id = bfd_key_id
357         self.interface = interface
358         if phy_interface:
359             self.phy_interface = phy_interface
360         else:
361             self.phy_interface = self.interface
362         self.udp_sport = randint(49152, 65535)
363         if our_seq_number is None:
364             self.our_seq_number = randint(0, 40000000)
365         else:
366             self.our_seq_number = our_seq_number
367         self.vpp_seq_number = None
368         self.my_discriminator = 0
369         self.desired_min_tx = 300000
370         self.required_min_rx = 300000
371         self.required_min_echo_rx = None
372         self.detect_mult = detect_mult
373         self.diag = BFDDiagCode.no_diagnostic
374         self.your_discriminator = None
375         self.state = BFDState.down
376         self.auth_type = BFDAuthType.no_auth
377         self.tunnel_header = tunnel_header
378         self.tx_packets = 0
379         self.rx_packets = 0
380         self.tx_packets_echo = 0
381         self.rx_packets_echo = 0
382
383     def inc_seq_num(self):
384         """ increment sequence number, wrapping if needed """
385         if self.our_seq_number == 0xFFFFFFFF:
386             self.our_seq_number = 0
387         else:
388             self.our_seq_number += 1
389
390     def update(self, my_discriminator=None, your_discriminator=None,
391                desired_min_tx=None, required_min_rx=None,
392                required_min_echo_rx=None, detect_mult=None,
393                diag=None, state=None, auth_type=None):
394         """ update BFD parameters associated with session """
395         if my_discriminator is not None:
396             self.my_discriminator = my_discriminator
397         if your_discriminator is not None:
398             self.your_discriminator = your_discriminator
399         if required_min_rx is not None:
400             self.required_min_rx = required_min_rx
401         if required_min_echo_rx is not None:
402             self.required_min_echo_rx = required_min_echo_rx
403         if desired_min_tx is not None:
404             self.desired_min_tx = desired_min_tx
405         if detect_mult is not None:
406             self.detect_mult = detect_mult
407         if diag is not None:
408             self.diag = diag
409         if state is not None:
410             self.state = state
411         if auth_type is not None:
412             self.auth_type = auth_type
413
414     def fill_packet_fields(self, packet):
415         """ set packet fields with known values in packet """
416         bfd = packet[BFD]
417         if self.my_discriminator:
418             self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
419                                    self.my_discriminator)
420             bfd.my_discriminator = self.my_discriminator
421         if self.your_discriminator:
422             self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
423                                    self.your_discriminator)
424             bfd.your_discriminator = self.your_discriminator
425         if self.required_min_rx:
426             self.test.logger.debug(
427                 "BFD: setting packet.required_min_rx_interval=%s",
428                 self.required_min_rx)
429             bfd.required_min_rx_interval = self.required_min_rx
430         if self.required_min_echo_rx:
431             self.test.logger.debug(
432                 "BFD: setting packet.required_min_echo_rx=%s",
433                 self.required_min_echo_rx)
434             bfd.required_min_echo_rx_interval = self.required_min_echo_rx
435         if self.desired_min_tx:
436             self.test.logger.debug(
437                 "BFD: setting packet.desired_min_tx_interval=%s",
438                 self.desired_min_tx)
439             bfd.desired_min_tx_interval = self.desired_min_tx
440         if self.detect_mult:
441             self.test.logger.debug(
442                 "BFD: setting packet.detect_mult=%s", self.detect_mult)
443             bfd.detect_mult = self.detect_mult
444         if self.diag:
445             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
446             bfd.diag = self.diag
447         if self.state:
448             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
449             bfd.state = self.state
450         if self.auth_type:
451             # this is used by a negative test-case
452             self.test.logger.debug("BFD: setting packet.auth_type=%s",
453                                    self.auth_type)
454             bfd.auth_type = self.auth_type
455
456     def create_packet(self):
457         """ create a BFD packet, reflecting the current state of session """
458         if self.sha1_key:
459             bfd = BFD(flags="A")
460             bfd.auth_type = self.sha1_key.auth_type
461             bfd.auth_len = BFD.sha1_auth_len
462             bfd.auth_key_id = self.bfd_key_id
463             bfd.auth_seq_num = self.our_seq_number
464             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
465         else:
466             bfd = BFD()
467         packet = Ether(src=self.phy_interface.remote_mac,
468                        dst=self.phy_interface.local_mac)
469         if self.tunnel_header:
470             packet = packet / self.tunnel_header
471         if self.af == AF_INET6:
472             packet = (packet /
473                       IPv6(src=self.interface.remote_ip6,
474                            dst=self.interface.local_ip6,
475                            hlim=255) /
476                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
477                       bfd)
478         else:
479             packet = (packet /
480                       IP(src=self.interface.remote_ip4,
481                          dst=self.interface.local_ip4,
482                          ttl=255) /
483                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
484                       bfd)
485         self.test.logger.debug("BFD: Creating packet")
486         self.fill_packet_fields(packet)
487         if self.sha1_key:
488             hash_material = scapy.compat.raw(
489                 packet[BFD])[:32] + self.sha1_key.key + \
490                 b"\0" * (20 - len(self.sha1_key.key))
491             self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
492                                    hashlib.sha1(hash_material).hexdigest())
493             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
494         return packet
495
496     def send_packet(self, packet=None, interface=None):
497         """ send packet on interface, creating the packet if needed """
498         if packet is None:
499             packet = self.create_packet()
500         if interface is None:
501             interface = self.phy_interface
502         self.test.logger.debug(ppp("Sending packet:", packet))
503         interface.add_stream(packet)
504         self.tx_packets += 1
505         self.test.pg_start()
506
507     def verify_sha1_auth(self, packet):
508         """ Verify correctness of authentication in BFD layer. """
509         bfd = packet[BFD]
510         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
511         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
512                                BFDAuthType)
513         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
514         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
515         if self.vpp_seq_number is None:
516             self.vpp_seq_number = bfd.auth_seq_num
517             self.test.logger.debug("Received initial sequence number: %s" %
518                                    self.vpp_seq_number)
519         else:
520             recvd_seq_num = bfd.auth_seq_num
521             self.test.logger.debug("Received followup sequence number: %s" %
522                                    recvd_seq_num)
523             if self.vpp_seq_number < 0xffffffff:
524                 if self.sha1_key.auth_type == \
525                         BFDAuthType.meticulous_keyed_sha1:
526                     self.test.assert_equal(recvd_seq_num,
527                                            self.vpp_seq_number + 1,
528                                            "BFD sequence number")
529                 else:
530                     self.test.assert_in_range(recvd_seq_num,
531                                               self.vpp_seq_number,
532                                               self.vpp_seq_number + 1,
533                                               "BFD sequence number")
534             else:
535                 if self.sha1_key.auth_type == \
536                         BFDAuthType.meticulous_keyed_sha1:
537                     self.test.assert_equal(recvd_seq_num, 0,
538                                            "BFD sequence number")
539                 else:
540                     self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
541                                        "BFD sequence number not one of "
542                                        "(%s, 0)" % self.vpp_seq_number)
543             self.vpp_seq_number = recvd_seq_num
544         # last 20 bytes represent the hash - so replace them with the key,
545         # pad the result with zeros and hash the result
546         hash_material = bfd.original[:-20] + self.sha1_key.key + \
547             b"\0" * (20 - len(self.sha1_key.key))
548         expected_hash = hashlib.sha1(hash_material).hexdigest()
549         self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
550                                expected_hash.encode(), "Auth key hash")
551
552     def verify_bfd(self, packet):
553         """ Verify correctness of BFD layer. """
554         bfd = packet[BFD]
555         self.test.assert_equal(bfd.version, 1, "BFD version")
556         self.test.assert_equal(bfd.your_discriminator,
557                                self.my_discriminator,
558                                "BFD - your discriminator")
559         if self.sha1_key:
560             self.verify_sha1_auth(packet)
561
562
563 def bfd_session_up(test):
564     """ Bring BFD session up """
565     test.logger.info("BFD: Waiting for slow hello")
566     p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
567     old_offset = None
568     if hasattr(test, 'vpp_clock_offset'):
569         old_offset = test.vpp_clock_offset
570     test.vpp_clock_offset = time.time() - float(p.time)
571     test.logger.debug("BFD: Calculated vpp clock offset: %s",
572                       test.vpp_clock_offset)
573     if old_offset:
574         test.assertAlmostEqual(
575             old_offset, test.vpp_clock_offset, delta=0.5,
576             msg="vpp clock offset not stable (new: %s, old: %s)" %
577             (test.vpp_clock_offset, old_offset))
578     test.logger.info("BFD: Sending Init")
579     test.test_session.update(my_discriminator=randint(0, 40000000),
580                              your_discriminator=p[BFD].my_discriminator,
581                              state=BFDState.init)
582     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
583             BFDAuthType.meticulous_keyed_sha1:
584         test.test_session.inc_seq_num()
585     test.test_session.send_packet()
586     test.logger.info("BFD: Waiting for event")
587     e = test.vapi.wait_for_event(1, "bfd_udp_session_event")
588     verify_event(test, e, expected_state=BFDState.up)
589     test.logger.info("BFD: Session is Up")
590     test.test_session.update(state=BFDState.up)
591     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
592             BFDAuthType.meticulous_keyed_sha1:
593         test.test_session.inc_seq_num()
594     test.test_session.send_packet()
595     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
596
597
598 def bfd_session_down(test):
599     """ Bring BFD session down """
600     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
601     test.test_session.update(state=BFDState.down)
602     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
603             BFDAuthType.meticulous_keyed_sha1:
604         test.test_session.inc_seq_num()
605     test.test_session.send_packet()
606     test.logger.info("BFD: Waiting for event")
607     e = test.vapi.wait_for_event(1, "bfd_udp_session_event")
608     verify_event(test, e, expected_state=BFDState.down)
609     test.logger.info("BFD: Session is Down")
610     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
611
612
613 def verify_bfd_session_config(test, session, state=None):
614     dump = session.get_bfd_udp_session_dump_entry()
615     test.assertIsNotNone(dump)
616     # since dump is not none, we have verified that sw_if_index and addresses
617     # are valid (in get_bfd_udp_session_dump_entry)
618     if state:
619         test.assert_equal(dump.state, state, "session state")
620     test.assert_equal(dump.required_min_rx, session.required_min_rx,
621                       "required min rx interval")
622     test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
623                       "desired min tx interval")
624     test.assert_equal(dump.detect_mult, session.detect_mult,
625                       "detect multiplier")
626     if session.sha1_key is None:
627         test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
628     else:
629         test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
630         test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
631                           "bfd key id")
632         test.assert_equal(dump.conf_key_id,
633                           session.sha1_key.conf_key_id,
634                           "config key id")
635
636
637 def verify_ip(test, packet):
638     """ Verify correctness of IP layer. """
639     if test.vpp_session.af == AF_INET6:
640         ip = packet[IPv6]
641         local_ip = test.vpp_session.interface.local_ip6
642         remote_ip = test.vpp_session.interface.remote_ip6
643         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
644     else:
645         ip = packet[IP]
646         local_ip = test.vpp_session.interface.local_ip4
647         remote_ip = test.vpp_session.interface.remote_ip4
648         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
649     test.assert_equal(ip.src, local_ip, "IP source address")
650     test.assert_equal(ip.dst, remote_ip, "IP destination address")
651
652
653 def verify_udp(test, packet):
654     """ Verify correctness of UDP layer. """
655     udp = packet[UDP]
656     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
657     test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
658                          "UDP source port")
659
660
661 def verify_event(test, event, expected_state):
662     """ Verify correctness of event values. """
663     e = event
664     test.logger.debug("BFD: Event: %s" % reprlib.repr(e))
665     test.assert_equal(e.sw_if_index,
666                       test.vpp_session.interface.sw_if_index,
667                       "BFD interface index")
668
669     test.assert_equal(str(e.local_addr), test.vpp_session.local_addr,
670                       "Local IPv6 address")
671     test.assert_equal(str(e.peer_addr), test.vpp_session.peer_addr,
672                       "Peer IPv6 address")
673     test.assert_equal(e.state, expected_state, BFDState)
674
675
676 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
677     """ wait for BFD packet and verify its correctness
678
679     :param timeout: how long to wait
680     :param pcap_time_min: ignore packets with pcap timestamp lower than this
681
682     :returns: tuple (packet, time spent waiting for packet)
683     """
684     test.logger.info("BFD: Waiting for BFD packet")
685     deadline = time.time() + timeout
686     counter = 0
687     while True:
688         counter += 1
689         # sanity check
690         test.assert_in_range(counter, 0, 100, "number of packets ignored")
691         time_left = deadline - time.time()
692         if time_left < 0:
693             raise CaptureTimeoutError("Packet did not arrive within timeout")
694         p = test.pg0.wait_for_packet(timeout=time_left)
695         test.test_session.rx_packets += 1
696         test.logger.debug(ppp("BFD: Got packet:", p))
697         if pcap_time_min is not None and p.time < pcap_time_min:
698             test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
699                                   "pcap time min %s):" %
700                                   (p.time, pcap_time_min), p))
701         else:
702             break
703     if is_tunnel:
704         # strip an IP layer and move to the next
705         p = p[IP].payload
706
707     bfd = p[BFD]
708     if bfd is None:
709         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
710     if bfd.payload:
711         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
712     verify_ip(test, p)
713     verify_udp(test, p)
714     test.test_session.verify_bfd(p)
715     return p
716
717
718 BFDStats = namedtuple("BFDStats", "rx rx_echo tx tx_echo")
719
720
721 def bfd_grab_stats_snapshot(test, bs_idx=0, thread_index=None):
722     s = test.statistics
723     ti = thread_index
724     if ti is None:
725         rx = s['/bfd/rx-session-counters'][:, bs_idx].sum_packets()
726         rx_echo = s['/bfd/rx-session-echo-counters'][:, bs_idx].sum_packets()
727         tx = s['/bfd/tx-session-counters'][:, bs_idx].sum_packets()
728         tx_echo = s['/bfd/tx-session-echo-counters'][:, bs_idx].sum_packets()
729     else:
730         rx = s['/bfd/rx-session-counters'][ti, bs_idx].sum_packets()
731         rx_echo = s['/bfd/rx-session-echo-counters'][ti, bs_idx].sum_packets()
732         tx = s['/bfd/tx-session-counters'][ti, bs_idx].sum_packets()
733         tx_echo = s['/bfd/tx-session-echo-counters'][ti, bs_idx].sum_packets()
734     return BFDStats(rx, rx_echo, tx, tx_echo)
735
736
737 def bfd_stats_diff(stats_before, stats_after):
738     rx = stats_after.rx - stats_before.rx
739     rx_echo = stats_after.rx_echo - stats_before.rx_echo
740     tx = stats_after.tx - stats_before.tx
741     tx_echo = stats_after.tx_echo - stats_before.tx_echo
742     return BFDStats(rx, rx_echo, tx, tx_echo)
743
744
745 @tag_run_solo
746 class BFD4TestCase(VppTestCase):
747     """Bidirectional Forwarding Detection (BFD)"""
748
749     pg0 = None
750     vpp_clock_offset = None
751     vpp_session = None
752     test_session = None
753
754     @classmethod
755     def setUpClass(cls):
756         super(BFD4TestCase, cls).setUpClass()
757         cls.vapi.cli("set log class bfd level debug")
758         try:
759             cls.create_pg_interfaces([0])
760             cls.create_loopback_interfaces(1)
761             cls.loopback0 = cls.lo_interfaces[0]
762             cls.loopback0.config_ip4()
763             cls.loopback0.admin_up()
764             cls.pg0.config_ip4()
765             cls.pg0.configure_ipv4_neighbors()
766             cls.pg0.admin_up()
767             cls.pg0.resolve_arp()
768
769         except Exception:
770             super(BFD4TestCase, cls).tearDownClass()
771             raise
772
773     @classmethod
774     def tearDownClass(cls):
775         super(BFD4TestCase, cls).tearDownClass()
776
777     def setUp(self):
778         super(BFD4TestCase, self).setUp()
779         self.factory = AuthKeyFactory()
780         self.vapi.want_bfd_events()
781         self.pg0.enable_capture()
782         try:
783             self.bfd_udp4_sessions = self.statistics['/bfd/udp4/sessions']
784             self.bfd_udp6_sessions = self.statistics['/bfd/udp6/sessions']
785             self.vpp_session = VppBFDUDPSession(self, self.pg0,
786                                                 self.pg0.remote_ip4)
787             self.vpp_session.add_vpp_config()
788             self.vpp_session.admin_up()
789             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
790         except BaseException:
791             self.vapi.want_bfd_events(enable_disable=0)
792             raise
793
794     def tearDown(self):
795         if not self.vpp_dead:
796             self.vapi.want_bfd_events(enable_disable=0)
797         self.vapi.collect_events()  # clear the event queue
798         super(BFD4TestCase, self).tearDown()
799
800     def test_session_up(self):
801         """ bring BFD session up """
802         bfd_session_up(self)
803         bfd_udp4_sessions = self.statistics['/bfd/udp4/sessions']
804         bfd_udp6_sessions = self.statistics['/bfd/udp6/sessions']
805         self.assert_equal(bfd_udp4_sessions - self.bfd_udp4_sessions, 1)
806         self.assert_equal(bfd_udp6_sessions, self.bfd_udp6_sessions)
807
808     def test_session_up_by_ip(self):
809         """ bring BFD session up - first frame looked up by address pair """
810         self.logger.info("BFD: Sending Slow control frame")
811         self.test_session.update(my_discriminator=randint(0, 40000000))
812         self.test_session.send_packet()
813         self.pg0.enable_capture()
814         p = self.pg0.wait_for_packet(1)
815         self.assert_equal(p[BFD].your_discriminator,
816                           self.test_session.my_discriminator,
817                           "BFD - your discriminator")
818         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
819         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
820                                  state=BFDState.up)
821         self.logger.info("BFD: Waiting for event")
822         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
823         verify_event(self, e, expected_state=BFDState.init)
824         self.logger.info("BFD: Sending Up")
825         self.test_session.send_packet()
826         self.logger.info("BFD: Waiting for event")
827         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
828         verify_event(self, e, expected_state=BFDState.up)
829         self.logger.info("BFD: Session is Up")
830         self.test_session.update(state=BFDState.up)
831         self.test_session.send_packet()
832         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
833
834     def test_session_down(self):
835         """ bring BFD session down """
836         bfd_session_up(self)
837         bfd_session_down(self)
838
839     def test_hold_up(self):
840         """ hold BFD session up """
841         bfd_session_up(self)
842         for dummy in range(self.test_session.detect_mult * 2):
843             wait_for_bfd_packet(self)
844             self.test_session.send_packet()
845         self.assert_equal(len(self.vapi.collect_events()), 0,
846                           "number of bfd events")
847
848     def test_slow_timer(self):
849         """ verify slow periodic control frames while session down """
850         packet_count = 3
851         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
852         prev_packet = wait_for_bfd_packet(self, 2)
853         for dummy in range(packet_count):
854             next_packet = wait_for_bfd_packet(self, 2)
855             time_diff = next_packet.time - prev_packet.time
856             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
857             # to work around timing issues
858             self.assert_in_range(
859                 time_diff, 0.70, 1.05, "time between slow packets")
860             prev_packet = next_packet
861
862     def test_zero_remote_min_rx(self):
863         """ no packets when zero remote required min rx interval """
864         bfd_session_up(self)
865         self.test_session.update(required_min_rx=0)
866         self.test_session.send_packet()
867         for dummy in range(self.test_session.detect_mult):
868             self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
869                        "sleep before transmitting bfd packet")
870             self.test_session.send_packet()
871             try:
872                 p = wait_for_bfd_packet(self, timeout=0)
873                 self.logger.error(ppp("Received unexpected packet:", p))
874             except CaptureTimeoutError:
875                 pass
876         self.assert_equal(
877             len(self.vapi.collect_events()), 0, "number of bfd events")
878         self.test_session.update(required_min_rx=300000)
879         for dummy in range(3):
880             self.test_session.send_packet()
881             wait_for_bfd_packet(
882                 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
883         self.assert_equal(
884             len(self.vapi.collect_events()), 0, "number of bfd events")
885
886     def test_conn_down(self):
887         """ verify session goes down after inactivity """
888         bfd_session_up(self)
889         detection_time = self.test_session.detect_mult *\
890             self.vpp_session.required_min_rx / USEC_IN_SEC
891         self.sleep(detection_time, "waiting for BFD session time-out")
892         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
893         verify_event(self, e, expected_state=BFDState.down)
894
895     def test_peer_discr_reset_sess_down(self):
896         """ peer discriminator reset after session goes down """
897         bfd_session_up(self)
898         detection_time = self.test_session.detect_mult *\
899             self.vpp_session.required_min_rx / USEC_IN_SEC
900         self.sleep(detection_time, "waiting for BFD session time-out")
901         self.test_session.my_discriminator = 0
902         wait_for_bfd_packet(self,
903                             pcap_time_min=time.time() - self.vpp_clock_offset)
904
905     def test_large_required_min_rx(self):
906         """ large remote required min rx interval """
907         bfd_session_up(self)
908         p = wait_for_bfd_packet(self)
909         interval = 3000000
910         self.test_session.update(required_min_rx=interval)
911         self.test_session.send_packet()
912         time_mark = time.time()
913         count = 0
914         # busy wait here, trying to collect a packet or event, vpp is not
915         # allowed to send packets and the session will timeout first - so the
916         # Up->Down event must arrive before any packets do
917         while time.time() < time_mark + interval / USEC_IN_SEC:
918             try:
919                 p = wait_for_bfd_packet(self, timeout=0)
920                 # if vpp managed to send a packet before we did the session
921                 # session update, then that's fine, ignore it
922                 if p.time < time_mark - self.vpp_clock_offset:
923                     continue
924                 self.logger.error(ppp("Received unexpected packet:", p))
925                 count += 1
926             except CaptureTimeoutError:
927                 pass
928             events = self.vapi.collect_events()
929             if len(events) > 0:
930                 verify_event(self, events[0], BFDState.down)
931                 break
932         self.assert_equal(count, 0, "number of packets received")
933
934     def test_immediate_remote_min_rx_reduction(self):
935         """ immediately honor remote required min rx reduction """
936         self.vpp_session.remove_vpp_config()
937         self.vpp_session = VppBFDUDPSession(
938             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
939         self.pg0.enable_capture()
940         self.vpp_session.add_vpp_config()
941         self.test_session.update(desired_min_tx=1000000,
942                                  required_min_rx=1000000)
943         bfd_session_up(self)
944         reference_packet = wait_for_bfd_packet(self)
945         time_mark = time.time()
946         interval = 300000
947         self.test_session.update(required_min_rx=interval)
948         self.test_session.send_packet()
949         extra_time = time.time() - time_mark
950         p = wait_for_bfd_packet(self)
951         # first packet is allowed to be late by time we spent doing the update
952         # calculated in extra_time
953         self.assert_in_range(p.time - reference_packet.time,
954                              .95 * 0.75 * interval / USEC_IN_SEC,
955                              1.05 * interval / USEC_IN_SEC + extra_time,
956                              "time between BFD packets")
957         reference_packet = p
958         for dummy in range(3):
959             p = wait_for_bfd_packet(self)
960             diff = p.time - reference_packet.time
961             self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
962                                  1.05 * interval / USEC_IN_SEC,
963                                  "time between BFD packets")
964             reference_packet = p
965
966     def test_modify_req_min_rx_double(self):
967         """ modify session - double required min rx """
968         bfd_session_up(self)
969         p = wait_for_bfd_packet(self)
970         self.test_session.update(desired_min_tx=10000,
971                                  required_min_rx=10000)
972         self.test_session.send_packet()
973         # double required min rx
974         self.vpp_session.modify_parameters(
975             required_min_rx=2 * self.vpp_session.required_min_rx)
976         p = wait_for_bfd_packet(
977             self, pcap_time_min=time.time() - self.vpp_clock_offset)
978         # poll bit needs to be set
979         self.assertIn("P", p.sprintf("%BFD.flags%"),
980                       "Poll bit not set in BFD packet")
981         # finish poll sequence with final packet
982         final = self.test_session.create_packet()
983         final[BFD].flags = "F"
984         timeout = self.test_session.detect_mult * \
985             max(self.test_session.desired_min_tx,
986                 self.vpp_session.required_min_rx) / USEC_IN_SEC
987         self.test_session.send_packet(final)
988         time_mark = time.time()
989         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_event")
990         verify_event(self, e, expected_state=BFDState.down)
991         time_to_event = time.time() - time_mark
992         self.assert_in_range(time_to_event, .9 * timeout,
993                              1.1 * timeout, "session timeout")
994
995     def test_modify_req_min_rx_halve(self):
996         """ modify session - halve required min rx """
997         self.vpp_session.modify_parameters(
998             required_min_rx=2 * self.vpp_session.required_min_rx)
999         bfd_session_up(self)
1000         p = wait_for_bfd_packet(self)
1001         self.test_session.update(desired_min_tx=10000,
1002                                  required_min_rx=10000)
1003         self.test_session.send_packet()
1004         p = wait_for_bfd_packet(
1005             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1006         # halve required min rx
1007         old_required_min_rx = self.vpp_session.required_min_rx
1008         self.vpp_session.modify_parameters(
1009             required_min_rx=self.vpp_session.required_min_rx // 2)
1010         # now we wait 0.8*3*old-req-min-rx and the session should still be up
1011         self.sleep(0.8 * self.vpp_session.detect_mult *
1012                    old_required_min_rx / USEC_IN_SEC,
1013                    "wait before finishing poll sequence")
1014         self.assert_equal(len(self.vapi.collect_events()), 0,
1015                           "number of bfd events")
1016         p = wait_for_bfd_packet(self)
1017         # poll bit needs to be set
1018         self.assertIn("P", p.sprintf("%BFD.flags%"),
1019                       "Poll bit not set in BFD packet")
1020         # finish poll sequence with final packet
1021         final = self.test_session.create_packet()
1022         final[BFD].flags = "F"
1023         self.test_session.send_packet(final)
1024         # now the session should time out under new conditions
1025         detection_time = self.test_session.detect_mult *\
1026             self.vpp_session.required_min_rx / USEC_IN_SEC
1027         before = time.time()
1028         e = self.vapi.wait_for_event(
1029             2 * detection_time, "bfd_udp_session_event")
1030         after = time.time()
1031         self.assert_in_range(after - before,
1032                              0.9 * detection_time,
1033                              1.1 * detection_time,
1034                              "time before bfd session goes down")
1035         verify_event(self, e, expected_state=BFDState.down)
1036
1037     def test_modify_detect_mult(self):
1038         """ modify detect multiplier """
1039         bfd_session_up(self)
1040         p = wait_for_bfd_packet(self)
1041         self.vpp_session.modify_parameters(detect_mult=1)
1042         p = wait_for_bfd_packet(
1043             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1044         self.assert_equal(self.vpp_session.detect_mult,
1045                           p[BFD].detect_mult,
1046                           "detect mult")
1047         # poll bit must not be set
1048         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1049                          "Poll bit not set in BFD packet")
1050         self.vpp_session.modify_parameters(detect_mult=10)
1051         p = wait_for_bfd_packet(
1052             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1053         self.assert_equal(self.vpp_session.detect_mult,
1054                           p[BFD].detect_mult,
1055                           "detect mult")
1056         # poll bit must not be set
1057         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1058                          "Poll bit not set in BFD packet")
1059
1060     def test_queued_poll(self):
1061         """ test poll sequence queueing """
1062         bfd_session_up(self)
1063         p = wait_for_bfd_packet(self)
1064         self.vpp_session.modify_parameters(
1065             required_min_rx=2 * self.vpp_session.required_min_rx)
1066         p = wait_for_bfd_packet(self)
1067         poll_sequence_start = time.time()
1068         poll_sequence_length_min = 0.5
1069         send_final_after = time.time() + poll_sequence_length_min
1070         # poll bit needs to be set
1071         self.assertIn("P", p.sprintf("%BFD.flags%"),
1072                       "Poll bit not set in BFD packet")
1073         self.assert_equal(p[BFD].required_min_rx_interval,
1074                           self.vpp_session.required_min_rx,
1075                           "BFD required min rx interval")
1076         self.vpp_session.modify_parameters(
1077             required_min_rx=2 * self.vpp_session.required_min_rx)
1078         # 2nd poll sequence should be queued now
1079         # don't send the reply back yet, wait for some time to emulate
1080         # longer round-trip time
1081         packet_count = 0
1082         while time.time() < send_final_after:
1083             self.test_session.send_packet()
1084             p = wait_for_bfd_packet(self)
1085             self.assert_equal(len(self.vapi.collect_events()), 0,
1086                               "number of bfd events")
1087             self.assert_equal(p[BFD].required_min_rx_interval,
1088                               self.vpp_session.required_min_rx,
1089                               "BFD required min rx interval")
1090             packet_count += 1
1091             # poll bit must be set
1092             self.assertIn("P", p.sprintf("%BFD.flags%"),
1093                           "Poll bit not set in BFD packet")
1094         final = self.test_session.create_packet()
1095         final[BFD].flags = "F"
1096         self.test_session.send_packet(final)
1097         # finish 1st with final
1098         poll_sequence_length = time.time() - poll_sequence_start
1099         # vpp must wait for some time before starting new poll sequence
1100         poll_no_2_started = False
1101         for dummy in range(2 * packet_count):
1102             p = wait_for_bfd_packet(self)
1103             self.assert_equal(len(self.vapi.collect_events()), 0,
1104                               "number of bfd events")
1105             if "P" in p.sprintf("%BFD.flags%"):
1106                 poll_no_2_started = True
1107                 if time.time() < poll_sequence_start + poll_sequence_length:
1108                     raise Exception("VPP started 2nd poll sequence too soon")
1109                 final = self.test_session.create_packet()
1110                 final[BFD].flags = "F"
1111                 self.test_session.send_packet(final)
1112                 break
1113             else:
1114                 self.test_session.send_packet()
1115         self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1116         # finish 2nd with final
1117         final = self.test_session.create_packet()
1118         final[BFD].flags = "F"
1119         self.test_session.send_packet(final)
1120         p = wait_for_bfd_packet(self)
1121         # poll bit must not be set
1122         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1123                          "Poll bit set in BFD packet")
1124
1125     # returning inconsistent results requiring retries in per-patch tests
1126     @unittest.skipUnless(config.extended, "part of extended tests")
1127     def test_poll_response(self):
1128         """ test correct response to control frame with poll bit set """
1129         bfd_session_up(self)
1130         poll = self.test_session.create_packet()
1131         poll[BFD].flags = "P"
1132         self.test_session.send_packet(poll)
1133         final = wait_for_bfd_packet(
1134             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1135         self.assertIn("F", final.sprintf("%BFD.flags%"))
1136
1137     def test_no_periodic_if_remote_demand(self):
1138         """ no periodic frames outside poll sequence if remote demand set """
1139         bfd_session_up(self)
1140         demand = self.test_session.create_packet()
1141         demand[BFD].flags = "D"
1142         self.test_session.send_packet(demand)
1143         transmit_time = 0.9 \
1144             * max(self.vpp_session.required_min_rx,
1145                   self.test_session.desired_min_tx) \
1146             / USEC_IN_SEC
1147         count = 0
1148         for dummy in range(self.test_session.detect_mult * 2):
1149             self.sleep(transmit_time)
1150             self.test_session.send_packet(demand)
1151             try:
1152                 p = wait_for_bfd_packet(self, timeout=0)
1153                 self.logger.error(ppp("Received unexpected packet:", p))
1154                 count += 1
1155             except CaptureTimeoutError:
1156                 pass
1157         events = self.vapi.collect_events()
1158         for e in events:
1159             self.logger.error("Received unexpected event: %s", e)
1160         self.assert_equal(count, 0, "number of packets received")
1161         self.assert_equal(len(events), 0, "number of events received")
1162
1163     def test_echo_looped_back(self):
1164         """ echo packets looped back """
1165         bfd_session_up(self)
1166         stats_before = bfd_grab_stats_snapshot(self)
1167         self.pg0.enable_capture()
1168         echo_packet_count = 10
1169         # random source port low enough to increment a few times..
1170         udp_sport_tx = randint(1, 50000)
1171         udp_sport_rx = udp_sport_tx
1172         echo_packet = (Ether(src=self.pg0.remote_mac,
1173                              dst=self.pg0.local_mac) /
1174                        IP(src=self.pg0.remote_ip4,
1175                           dst=self.pg0.remote_ip4) /
1176                        UDP(dport=BFD.udp_dport_echo) /
1177                        Raw("this should be looped back"))
1178         for dummy in range(echo_packet_count):
1179             self.sleep(.01, "delay between echo packets")
1180             echo_packet[UDP].sport = udp_sport_tx
1181             udp_sport_tx += 1
1182             self.logger.debug(ppp("Sending packet:", echo_packet))
1183             self.pg0.add_stream(echo_packet)
1184             self.pg_start()
1185             self.logger.debug(self.vapi.ppcli("show trace"))
1186         counter = 0
1187         bfd_control_packets_rx = 0
1188         while counter < echo_packet_count:
1189             p = self.pg0.wait_for_packet(1)
1190             self.logger.debug(ppp("Got packet:", p))
1191             ether = p[Ether]
1192             self.assert_equal(self.pg0.remote_mac,
1193                               ether.dst, "Destination MAC")
1194             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1195             ip = p[IP]
1196             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1197             udp = p[UDP]
1198             if udp.dport == BFD.udp_dport:
1199                 bfd_control_packets_rx += 1
1200                 continue
1201             self.assert_equal(self.pg0.remote_ip4, ip.src, "Source IP")
1202             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1203                               "UDP destination port")
1204             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1205             udp_sport_rx += 1
1206             # need to compare the hex payload here, otherwise BFD_vpp_echo
1207             # gets in way
1208             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1209                              scapy.compat.raw(echo_packet[UDP].payload),
1210                              "Received packet is not the echo packet sent")
1211             counter += 1
1212         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1213                           "ECHO packet identifier for test purposes)")
1214         stats_after = bfd_grab_stats_snapshot(self)
1215         diff = bfd_stats_diff(stats_before, stats_after)
1216         self.assertEqual(
1217             0, diff.rx, "RX counter bumped but no BFD packets sent")
1218         self.assertEqual(
1219             bfd_control_packets_rx, diff.tx, "TX counter incorrect")
1220         self.assertEqual(0, diff.rx_echo,
1221                          "RX echo counter bumped but no BFD session exists")
1222         self.assertEqual(0, diff.tx_echo,
1223                          "TX echo counter bumped but no BFD session exists")
1224
1225     def test_echo(self):
1226         """ echo function """
1227         stats_before = bfd_grab_stats_snapshot(self)
1228         bfd_session_up(self)
1229         self.test_session.update(required_min_echo_rx=150000)
1230         self.test_session.send_packet()
1231         detection_time = self.test_session.detect_mult *\
1232             self.vpp_session.required_min_rx / USEC_IN_SEC
1233         # echo shouldn't work without echo source set
1234         for dummy in range(10):
1235             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1236             self.sleep(sleep, "delay before sending bfd packet")
1237             self.test_session.send_packet()
1238         p = wait_for_bfd_packet(
1239             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1240         self.assert_equal(p[BFD].required_min_rx_interval,
1241                           self.vpp_session.required_min_rx,
1242                           "BFD required min rx interval")
1243         self.test_session.send_packet()
1244         self.vapi.bfd_udp_set_echo_source(
1245             sw_if_index=self.loopback0.sw_if_index)
1246         echo_seen = False
1247         # should be turned on - loopback echo packets
1248         for dummy in range(3):
1249             loop_until = time.time() + 0.75 * detection_time
1250             while time.time() < loop_until:
1251                 p = self.pg0.wait_for_packet(1)
1252                 self.logger.debug(ppp("Got packet:", p))
1253                 if p[UDP].dport == BFD.udp_dport_echo:
1254                     self.assert_equal(
1255                         p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1256                     self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1257                                         "BFD ECHO src IP equal to loopback IP")
1258                     self.logger.debug(ppp("Looping back packet:", p))
1259                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1260                                       "ECHO packet destination MAC address")
1261                     p[Ether].dst = self.pg0.local_mac
1262                     self.pg0.add_stream(p)
1263                     self.test_session.rx_packets_echo += 1
1264                     self.test_session.tx_packets_echo += 1
1265                     self.pg_start()
1266                     echo_seen = True
1267                 elif p.haslayer(BFD):
1268                     self.test_session.rx_packets += 1
1269                     if echo_seen:
1270                         self.assertGreaterEqual(
1271                             p[BFD].required_min_rx_interval,
1272                             1000000)
1273                     if "P" in p.sprintf("%BFD.flags%"):
1274                         final = self.test_session.create_packet()
1275                         final[BFD].flags = "F"
1276                         self.test_session.send_packet(final)
1277                 else:
1278                     raise Exception(ppp("Received unknown packet:", p))
1279
1280                 self.assert_equal(len(self.vapi.collect_events()), 0,
1281                                   "number of bfd events")
1282             self.test_session.send_packet()
1283         self.assertTrue(echo_seen, "No echo packets received")
1284
1285         stats_after = bfd_grab_stats_snapshot(self)
1286         diff = bfd_stats_diff(stats_before, stats_after)
1287         # our rx is vpp tx and vice versa, also tolerate one packet off
1288         self.assert_in_range(self.test_session.tx_packets,
1289                              diff.rx - 1, diff.rx + 1, "RX counter")
1290         self.assert_in_range(self.test_session.rx_packets,
1291                              diff.tx - 1, diff.tx + 1, "TX counter")
1292         self.assert_in_range(self.test_session.tx_packets_echo,
1293                              diff.rx_echo - 1, diff.rx_echo + 1,
1294                              "RX echo counter")
1295         self.assert_in_range(self.test_session.rx_packets_echo,
1296                              diff.tx_echo - 1, diff.tx_echo + 1,
1297                              "TX echo counter")
1298
1299     def test_echo_fail(self):
1300         """ session goes down if echo function fails """
1301         bfd_session_up(self)
1302         self.test_session.update(required_min_echo_rx=150000)
1303         self.test_session.send_packet()
1304         detection_time = self.test_session.detect_mult *\
1305             self.vpp_session.required_min_rx / USEC_IN_SEC
1306         self.vapi.bfd_udp_set_echo_source(
1307             sw_if_index=self.loopback0.sw_if_index)
1308         # echo function should be used now, but we will drop the echo packets
1309         verified_diag = False
1310         for dummy in range(3):
1311             loop_until = time.time() + 0.75 * detection_time
1312             while time.time() < loop_until:
1313                 p = self.pg0.wait_for_packet(1)
1314                 self.logger.debug(ppp("Got packet:", p))
1315                 if p[UDP].dport == BFD.udp_dport_echo:
1316                     # dropped
1317                     pass
1318                 elif p.haslayer(BFD):
1319                     if "P" in p.sprintf("%BFD.flags%"):
1320                         self.assertGreaterEqual(
1321                             p[BFD].required_min_rx_interval,
1322                             1000000)
1323                         final = self.test_session.create_packet()
1324                         final[BFD].flags = "F"
1325                         self.test_session.send_packet(final)
1326                     if p[BFD].state == BFDState.down:
1327                         self.assert_equal(p[BFD].diag,
1328                                           BFDDiagCode.echo_function_failed,
1329                                           BFDDiagCode)
1330                         verified_diag = True
1331                 else:
1332                     raise Exception(ppp("Received unknown packet:", p))
1333             self.test_session.send_packet()
1334         events = self.vapi.collect_events()
1335         self.assert_equal(len(events), 1, "number of bfd events")
1336         self.assert_equal(events[0].state, BFDState.down, BFDState)
1337         self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1338
1339     def test_echo_stop(self):
1340         """ echo function stops if peer sets required min echo rx zero """
1341         bfd_session_up(self)
1342         self.test_session.update(required_min_echo_rx=150000)
1343         self.test_session.send_packet()
1344         self.vapi.bfd_udp_set_echo_source(
1345             sw_if_index=self.loopback0.sw_if_index)
1346         # wait for first echo packet
1347         while True:
1348             p = self.pg0.wait_for_packet(1)
1349             self.logger.debug(ppp("Got packet:", p))
1350             if p[UDP].dport == BFD.udp_dport_echo:
1351                 self.logger.debug(ppp("Looping back packet:", p))
1352                 p[Ether].dst = self.pg0.local_mac
1353                 self.pg0.add_stream(p)
1354                 self.pg_start()
1355                 break
1356             elif p.haslayer(BFD):
1357                 # ignore BFD
1358                 pass
1359             else:
1360                 raise Exception(ppp("Received unknown packet:", p))
1361         self.test_session.update(required_min_echo_rx=0)
1362         self.test_session.send_packet()
1363         # echo packets shouldn't arrive anymore
1364         for dummy in range(5):
1365             wait_for_bfd_packet(
1366                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1367             self.test_session.send_packet()
1368             events = self.vapi.collect_events()
1369             self.assert_equal(len(events), 0, "number of bfd events")
1370
1371     def test_echo_source_removed(self):
1372         """ echo function stops if echo source is removed """
1373         bfd_session_up(self)
1374         self.test_session.update(required_min_echo_rx=150000)
1375         self.test_session.send_packet()
1376         self.vapi.bfd_udp_set_echo_source(
1377             sw_if_index=self.loopback0.sw_if_index)
1378         # wait for first echo packet
1379         while True:
1380             p = self.pg0.wait_for_packet(1)
1381             self.logger.debug(ppp("Got packet:", p))
1382             if p[UDP].dport == BFD.udp_dport_echo:
1383                 self.logger.debug(ppp("Looping back packet:", p))
1384                 p[Ether].dst = self.pg0.local_mac
1385                 self.pg0.add_stream(p)
1386                 self.pg_start()
1387                 break
1388             elif p.haslayer(BFD):
1389                 # ignore BFD
1390                 pass
1391             else:
1392                 raise Exception(ppp("Received unknown packet:", p))
1393         self.vapi.bfd_udp_del_echo_source()
1394         self.test_session.send_packet()
1395         # echo packets shouldn't arrive anymore
1396         for dummy in range(5):
1397             wait_for_bfd_packet(
1398                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1399             self.test_session.send_packet()
1400             events = self.vapi.collect_events()
1401             self.assert_equal(len(events), 0, "number of bfd events")
1402
1403     def test_stale_echo(self):
1404         """ stale echo packets don't keep a session up """
1405         bfd_session_up(self)
1406         self.test_session.update(required_min_echo_rx=150000)
1407         self.vapi.bfd_udp_set_echo_source(
1408             sw_if_index=self.loopback0.sw_if_index)
1409         self.test_session.send_packet()
1410         # should be turned on - loopback echo packets
1411         echo_packet = None
1412         timeout_at = None
1413         timeout_ok = False
1414         for dummy in range(10 * self.vpp_session.detect_mult):
1415             p = self.pg0.wait_for_packet(1)
1416             if p[UDP].dport == BFD.udp_dport_echo:
1417                 if echo_packet is None:
1418                     self.logger.debug(ppp("Got first echo packet:", p))
1419                     echo_packet = p
1420                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1421                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1422                 else:
1423                     self.logger.debug(ppp("Got followup echo packet:", p))
1424                 self.logger.debug(ppp("Looping back first echo packet:", p))
1425                 echo_packet[Ether].dst = self.pg0.local_mac
1426                 self.pg0.add_stream(echo_packet)
1427                 self.pg_start()
1428             elif p.haslayer(BFD):
1429                 self.logger.debug(ppp("Got packet:", p))
1430                 if "P" in p.sprintf("%BFD.flags%"):
1431                     final = self.test_session.create_packet()
1432                     final[BFD].flags = "F"
1433                     self.test_session.send_packet(final)
1434                 if p[BFD].state == BFDState.down:
1435                     self.assertIsNotNone(
1436                         timeout_at,
1437                         "Session went down before first echo packet received")
1438                     now = time.time()
1439                     self.assertGreaterEqual(
1440                         now, timeout_at,
1441                         "Session timeout at %s, but is expected at %s" %
1442                         (now, timeout_at))
1443                     self.assert_equal(p[BFD].diag,
1444                                       BFDDiagCode.echo_function_failed,
1445                                       BFDDiagCode)
1446                     events = self.vapi.collect_events()
1447                     self.assert_equal(len(events), 1, "number of bfd events")
1448                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1449                     timeout_ok = True
1450                     break
1451             else:
1452                 raise Exception(ppp("Received unknown packet:", p))
1453             self.test_session.send_packet()
1454         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1455
1456     def test_invalid_echo_checksum(self):
1457         """ echo packets with invalid checksum don't keep a session up """
1458         bfd_session_up(self)
1459         self.test_session.update(required_min_echo_rx=150000)
1460         self.vapi.bfd_udp_set_echo_source(
1461             sw_if_index=self.loopback0.sw_if_index)
1462         self.test_session.send_packet()
1463         # should be turned on - loopback echo packets
1464         timeout_at = None
1465         timeout_ok = False
1466         for dummy in range(10 * self.vpp_session.detect_mult):
1467             p = self.pg0.wait_for_packet(1)
1468             if p[UDP].dport == BFD.udp_dport_echo:
1469                 self.logger.debug(ppp("Got echo packet:", p))
1470                 if timeout_at is None:
1471                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1472                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1473                 p[BFD_vpp_echo].checksum = getrandbits(64)
1474                 p[Ether].dst = self.pg0.local_mac
1475                 self.logger.debug(ppp("Looping back modified echo packet:", p))
1476                 self.pg0.add_stream(p)
1477                 self.pg_start()
1478             elif p.haslayer(BFD):
1479                 self.logger.debug(ppp("Got packet:", p))
1480                 if "P" in p.sprintf("%BFD.flags%"):
1481                     final = self.test_session.create_packet()
1482                     final[BFD].flags = "F"
1483                     self.test_session.send_packet(final)
1484                 if p[BFD].state == BFDState.down:
1485                     self.assertIsNotNone(
1486                         timeout_at,
1487                         "Session went down before first echo packet received")
1488                     now = time.time()
1489                     self.assertGreaterEqual(
1490                         now, timeout_at,
1491                         "Session timeout at %s, but is expected at %s" %
1492                         (now, timeout_at))
1493                     self.assert_equal(p[BFD].diag,
1494                                       BFDDiagCode.echo_function_failed,
1495                                       BFDDiagCode)
1496                     events = self.vapi.collect_events()
1497                     self.assert_equal(len(events), 1, "number of bfd events")
1498                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1499                     timeout_ok = True
1500                     break
1501             else:
1502                 raise Exception(ppp("Received unknown packet:", p))
1503             self.test_session.send_packet()
1504         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1505
1506     def test_admin_up_down(self):
1507         """ put session admin-up and admin-down """
1508         bfd_session_up(self)
1509         self.vpp_session.admin_down()
1510         self.pg0.enable_capture()
1511         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1512         verify_event(self, e, expected_state=BFDState.admin_down)
1513         for dummy in range(2):
1514             p = wait_for_bfd_packet(self)
1515             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1516         # try to bring session up - shouldn't be possible
1517         self.test_session.update(state=BFDState.init)
1518         self.test_session.send_packet()
1519         for dummy in range(2):
1520             p = wait_for_bfd_packet(self)
1521             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1522         self.vpp_session.admin_up()
1523         self.test_session.update(state=BFDState.down)
1524         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1525         verify_event(self, e, expected_state=BFDState.down)
1526         p = wait_for_bfd_packet(
1527             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1528         self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1529         self.test_session.send_packet()
1530         p = wait_for_bfd_packet(
1531             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1532         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1533         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1534         verify_event(self, e, expected_state=BFDState.init)
1535         self.test_session.update(state=BFDState.up)
1536         self.test_session.send_packet()
1537         p = wait_for_bfd_packet(
1538             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1539         self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1540         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1541         verify_event(self, e, expected_state=BFDState.up)
1542
1543     def test_config_change_remote_demand(self):
1544         """ configuration change while peer in demand mode """
1545         bfd_session_up(self)
1546         demand = self.test_session.create_packet()
1547         demand[BFD].flags = "D"
1548         self.test_session.send_packet(demand)
1549         self.vpp_session.modify_parameters(
1550             required_min_rx=2 * self.vpp_session.required_min_rx)
1551         p = wait_for_bfd_packet(
1552             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1553         # poll bit must be set
1554         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1555         # terminate poll sequence
1556         final = self.test_session.create_packet()
1557         final[BFD].flags = "D+F"
1558         self.test_session.send_packet(final)
1559         # vpp should be quiet now again
1560         transmit_time = 0.9 \
1561             * max(self.vpp_session.required_min_rx,
1562                   self.test_session.desired_min_tx) \
1563             / USEC_IN_SEC
1564         count = 0
1565         for dummy in range(self.test_session.detect_mult * 2):
1566             self.sleep(transmit_time)
1567             self.test_session.send_packet(demand)
1568             try:
1569                 p = wait_for_bfd_packet(self, timeout=0)
1570                 self.logger.error(ppp("Received unexpected packet:", p))
1571                 count += 1
1572             except CaptureTimeoutError:
1573                 pass
1574         events = self.vapi.collect_events()
1575         for e in events:
1576             self.logger.error("Received unexpected event: %s", e)
1577         self.assert_equal(count, 0, "number of packets received")
1578         self.assert_equal(len(events), 0, "number of events received")
1579
1580     def test_intf_deleted(self):
1581         """ interface with bfd session deleted """
1582         intf = VppLoInterface(self)
1583         intf.config_ip4()
1584         intf.admin_up()
1585         sw_if_index = intf.sw_if_index
1586         vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1587         vpp_session.add_vpp_config()
1588         vpp_session.admin_up()
1589         intf.remove_vpp_config()
1590         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1591         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1592         self.assertFalse(vpp_session.query_vpp_config())
1593
1594
1595 @tag_run_solo
1596 @tag_fixme_vpp_workers
1597 class BFD6TestCase(VppTestCase):
1598     """Bidirectional Forwarding Detection (BFD) (IPv6) """
1599
1600     pg0 = None
1601     vpp_clock_offset = None
1602     vpp_session = None
1603     test_session = None
1604
1605     @classmethod
1606     def setUpClass(cls):
1607         super(BFD6TestCase, cls).setUpClass()
1608         cls.vapi.cli("set log class bfd level debug")
1609         try:
1610             cls.create_pg_interfaces([0])
1611             cls.pg0.config_ip6()
1612             cls.pg0.configure_ipv6_neighbors()
1613             cls.pg0.admin_up()
1614             cls.pg0.resolve_ndp()
1615             cls.create_loopback_interfaces(1)
1616             cls.loopback0 = cls.lo_interfaces[0]
1617             cls.loopback0.config_ip6()
1618             cls.loopback0.admin_up()
1619
1620         except Exception:
1621             super(BFD6TestCase, cls).tearDownClass()
1622             raise
1623
1624     @classmethod
1625     def tearDownClass(cls):
1626         super(BFD6TestCase, cls).tearDownClass()
1627
1628     def setUp(self):
1629         super(BFD6TestCase, self).setUp()
1630         self.factory = AuthKeyFactory()
1631         self.vapi.want_bfd_events()
1632         self.pg0.enable_capture()
1633         try:
1634             self.bfd_udp4_sessions = self.statistics['/bfd/udp4/sessions']
1635             self.bfd_udp6_sessions = self.statistics['/bfd/udp6/sessions']
1636             self.vpp_session = VppBFDUDPSession(self, self.pg0,
1637                                                 self.pg0.remote_ip6,
1638                                                 af=AF_INET6)
1639             self.vpp_session.add_vpp_config()
1640             self.vpp_session.admin_up()
1641             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1642             self.logger.debug(self.vapi.cli("show adj nbr"))
1643         except BaseException:
1644             self.vapi.want_bfd_events(enable_disable=0)
1645             raise
1646
1647     def tearDown(self):
1648         if not self.vpp_dead:
1649             self.vapi.want_bfd_events(enable_disable=0)
1650         self.vapi.collect_events()  # clear the event queue
1651         super(BFD6TestCase, self).tearDown()
1652
1653     def test_session_up(self):
1654         """ bring BFD session up """
1655         bfd_session_up(self)
1656         bfd_udp4_sessions = self.statistics['/bfd/udp4/sessions']
1657         bfd_udp6_sessions = self.statistics['/bfd/udp6/sessions']
1658         self.assert_equal(bfd_udp4_sessions, self.bfd_udp4_sessions)
1659         self.assert_equal(bfd_udp6_sessions - self.bfd_udp6_sessions, 1)
1660
1661     def test_session_up_by_ip(self):
1662         """ bring BFD session up - first frame looked up by address pair """
1663         self.logger.info("BFD: Sending Slow control frame")
1664         self.test_session.update(my_discriminator=randint(0, 40000000))
1665         self.test_session.send_packet()
1666         self.pg0.enable_capture()
1667         p = self.pg0.wait_for_packet(1)
1668         self.assert_equal(p[BFD].your_discriminator,
1669                           self.test_session.my_discriminator,
1670                           "BFD - your discriminator")
1671         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1672         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1673                                  state=BFDState.up)
1674         self.logger.info("BFD: Waiting for event")
1675         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1676         verify_event(self, e, expected_state=BFDState.init)
1677         self.logger.info("BFD: Sending Up")
1678         self.test_session.send_packet()
1679         self.logger.info("BFD: Waiting for event")
1680         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1681         verify_event(self, e, expected_state=BFDState.up)
1682         self.logger.info("BFD: Session is Up")
1683         self.test_session.update(state=BFDState.up)
1684         self.test_session.send_packet()
1685         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1686
1687     def test_hold_up(self):
1688         """ hold BFD session up """
1689         bfd_session_up(self)
1690         for dummy in range(self.test_session.detect_mult * 2):
1691             wait_for_bfd_packet(self)
1692             self.test_session.send_packet()
1693         self.assert_equal(len(self.vapi.collect_events()), 0,
1694                           "number of bfd events")
1695         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1696
1697     def test_echo_looped_back(self):
1698         """ echo packets looped back """
1699         bfd_session_up(self)
1700         stats_before = bfd_grab_stats_snapshot(self)
1701         self.pg0.enable_capture()
1702         echo_packet_count = 10
1703         # random source port low enough to increment a few times..
1704         udp_sport_tx = randint(1, 50000)
1705         udp_sport_rx = udp_sport_tx
1706         echo_packet = (Ether(src=self.pg0.remote_mac,
1707                              dst=self.pg0.local_mac) /
1708                        IPv6(src=self.pg0.remote_ip6,
1709                             dst=self.pg0.remote_ip6) /
1710                        UDP(dport=BFD.udp_dport_echo) /
1711                        Raw("this should be looped back"))
1712         for dummy in range(echo_packet_count):
1713             self.sleep(.01, "delay between echo packets")
1714             echo_packet[UDP].sport = udp_sport_tx
1715             udp_sport_tx += 1
1716             self.logger.debug(ppp("Sending packet:", echo_packet))
1717             self.pg0.add_stream(echo_packet)
1718             self.pg_start()
1719         counter = 0
1720         bfd_control_packets_rx = 0
1721         while counter < echo_packet_count:
1722             p = self.pg0.wait_for_packet(1)
1723             self.logger.debug(ppp("Got packet:", p))
1724             ether = p[Ether]
1725             self.assert_equal(self.pg0.remote_mac,
1726                               ether.dst, "Destination MAC")
1727             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1728             ip = p[IPv6]
1729             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1730             udp = p[UDP]
1731             if udp.dport == BFD.udp_dport:
1732                 bfd_control_packets_rx += 1
1733                 continue
1734             self.assert_equal(self.pg0.remote_ip6, ip.src, "Source IP")
1735             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1736                               "UDP destination port")
1737             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1738             udp_sport_rx += 1
1739             # need to compare the hex payload here, otherwise BFD_vpp_echo
1740             # gets in way
1741             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1742                              scapy.compat.raw(echo_packet[UDP].payload),
1743                              "Received packet is not the echo packet sent")
1744             counter += 1
1745         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1746                           "ECHO packet identifier for test purposes)")
1747         stats_after = bfd_grab_stats_snapshot(self)
1748         diff = bfd_stats_diff(stats_before, stats_after)
1749         self.assertEqual(
1750             0, diff.rx, "RX counter bumped but no BFD packets sent")
1751         self.assertEqual(bfd_control_packets_rx,
1752                          diff.tx, "TX counter incorrect")
1753         self.assertEqual(0, diff.rx_echo,
1754                          "RX echo counter bumped but no BFD session exists")
1755         self.assertEqual(0, diff.tx_echo,
1756                          "TX echo counter bumped but no BFD session exists")
1757
1758     def test_echo(self):
1759         """ echo function """
1760         stats_before = bfd_grab_stats_snapshot(self)
1761         bfd_session_up(self)
1762         self.test_session.update(required_min_echo_rx=150000)
1763         self.test_session.send_packet()
1764         detection_time = self.test_session.detect_mult *\
1765             self.vpp_session.required_min_rx / USEC_IN_SEC
1766         # echo shouldn't work without echo source set
1767         for dummy in range(10):
1768             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1769             self.sleep(sleep, "delay before sending bfd packet")
1770             self.test_session.send_packet()
1771         p = wait_for_bfd_packet(
1772             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1773         self.assert_equal(p[BFD].required_min_rx_interval,
1774                           self.vpp_session.required_min_rx,
1775                           "BFD required min rx interval")
1776         self.test_session.send_packet()
1777         self.vapi.bfd_udp_set_echo_source(
1778             sw_if_index=self.loopback0.sw_if_index)
1779         echo_seen = False
1780         # should be turned on - loopback echo packets
1781         for dummy in range(3):
1782             loop_until = time.time() + 0.75 * detection_time
1783             while time.time() < loop_until:
1784                 p = self.pg0.wait_for_packet(1)
1785                 self.logger.debug(ppp("Got packet:", p))
1786                 if p[UDP].dport == BFD.udp_dport_echo:
1787                     self.assert_equal(
1788                         p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1789                     self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1790                                         "BFD ECHO src IP equal to loopback IP")
1791                     self.logger.debug(ppp("Looping back packet:", p))
1792                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1793                                       "ECHO packet destination MAC address")
1794                     self.test_session.rx_packets_echo += 1
1795                     self.test_session.tx_packets_echo += 1
1796                     p[Ether].dst = self.pg0.local_mac
1797                     self.pg0.add_stream(p)
1798                     self.pg_start()
1799                     echo_seen = True
1800                 elif p.haslayer(BFD):
1801                     self.test_session.rx_packets += 1
1802                     if echo_seen:
1803                         self.assertGreaterEqual(
1804                             p[BFD].required_min_rx_interval,
1805                             1000000)
1806                     if "P" in p.sprintf("%BFD.flags%"):
1807                         final = self.test_session.create_packet()
1808                         final[BFD].flags = "F"
1809                         self.test_session.send_packet(final)
1810                 else:
1811                     raise Exception(ppp("Received unknown packet:", p))
1812
1813                 self.assert_equal(len(self.vapi.collect_events()), 0,
1814                                   "number of bfd events")
1815             self.test_session.send_packet()
1816         self.assertTrue(echo_seen, "No echo packets received")
1817
1818         stats_after = bfd_grab_stats_snapshot(self)
1819         diff = bfd_stats_diff(stats_before, stats_after)
1820         # our rx is vpp tx and vice versa, also tolerate one packet off
1821         self.assert_in_range(self.test_session.tx_packets,
1822                              diff.rx - 1, diff.rx + 1, "RX counter")
1823         self.assert_in_range(self.test_session.rx_packets,
1824                              diff.tx - 1, diff.tx + 1, "TX counter")
1825         self.assert_in_range(self.test_session.tx_packets_echo,
1826                              diff.rx_echo - 1, diff.rx_echo + 1,
1827                              "RX echo counter")
1828         self.assert_in_range(self.test_session.rx_packets_echo,
1829                              diff.tx_echo - 1, diff.tx_echo + 1,
1830                              "TX echo counter")
1831
1832     def test_intf_deleted(self):
1833         """ interface with bfd session deleted """
1834         intf = VppLoInterface(self)
1835         intf.config_ip6()
1836         intf.admin_up()
1837         sw_if_index = intf.sw_if_index
1838         vpp_session = VppBFDUDPSession(
1839             self, intf, intf.remote_ip6, af=AF_INET6)
1840         vpp_session.add_vpp_config()
1841         vpp_session.admin_up()
1842         intf.remove_vpp_config()
1843         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1844         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1845         self.assertFalse(vpp_session.query_vpp_config())
1846
1847
1848 @tag_run_solo
1849 class BFDFIBTestCase(VppTestCase):
1850     """ BFD-FIB interactions (IPv6) """
1851
1852     vpp_session = None
1853     test_session = None
1854
1855     @classmethod
1856     def setUpClass(cls):
1857         super(BFDFIBTestCase, cls).setUpClass()
1858
1859     @classmethod
1860     def tearDownClass(cls):
1861         super(BFDFIBTestCase, cls).tearDownClass()
1862
1863     def setUp(self):
1864         super(BFDFIBTestCase, self).setUp()
1865         self.create_pg_interfaces(range(1))
1866
1867         self.vapi.want_bfd_events()
1868         self.pg0.enable_capture()
1869
1870         for i in self.pg_interfaces:
1871             i.admin_up()
1872             i.config_ip6()
1873             i.configure_ipv6_neighbors()
1874
1875     def tearDown(self):
1876         if not self.vpp_dead:
1877             self.vapi.want_bfd_events(enable_disable=False)
1878
1879         super(BFDFIBTestCase, self).tearDown()
1880
1881     @staticmethod
1882     def pkt_is_not_data_traffic(p):
1883         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1884         if p.haslayer(BFD) or is_ipv6_misc(p):
1885             return True
1886         return False
1887
1888     def test_session_with_fib(self):
1889         """ BFD-FIB interactions """
1890
1891         # packets to match against both of the routes
1892         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1893               IPv6(src="3001::1", dst="2001::1") /
1894               UDP(sport=1234, dport=1234) /
1895               Raw(b'\xa5' * 100)),
1896              (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1897               IPv6(src="3001::1", dst="2002::1") /
1898               UDP(sport=1234, dport=1234) /
1899               Raw(b'\xa5' * 100))]
1900
1901         # A recursive and a non-recursive route via a next-hop that
1902         # will have a BFD session
1903         ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1904                                   [VppRoutePath(self.pg0.remote_ip6,
1905                                                 self.pg0.sw_if_index)])
1906         ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1907                                   [VppRoutePath(self.pg0.remote_ip6,
1908                                                 0xffffffff)])
1909         ip_2001_s_64.add_vpp_config()
1910         ip_2002_s_64.add_vpp_config()
1911
1912         # bring the session up now the routes are present
1913         self.vpp_session = VppBFDUDPSession(self,
1914                                             self.pg0,
1915                                             self.pg0.remote_ip6,
1916                                             af=AF_INET6)
1917         self.vpp_session.add_vpp_config()
1918         self.vpp_session.admin_up()
1919         self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1920
1921         # session is up - traffic passes
1922         bfd_session_up(self)
1923
1924         self.pg0.add_stream(p)
1925         self.pg_start()
1926         for packet in p:
1927             captured = self.pg0.wait_for_packet(
1928                 1,
1929                 filter_out_fn=self.pkt_is_not_data_traffic)
1930             self.assertEqual(captured[IPv6].dst,
1931                              packet[IPv6].dst)
1932
1933         # session is up - traffic is dropped
1934         bfd_session_down(self)
1935
1936         self.pg0.add_stream(p)
1937         self.pg_start()
1938         with self.assertRaises(CaptureTimeoutError):
1939             self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1940
1941         # session is up - traffic passes
1942         bfd_session_up(self)
1943
1944         self.pg0.add_stream(p)
1945         self.pg_start()
1946         for packet in p:
1947             captured = self.pg0.wait_for_packet(
1948                 1,
1949                 filter_out_fn=self.pkt_is_not_data_traffic)
1950             self.assertEqual(captured[IPv6].dst,
1951                              packet[IPv6].dst)
1952
1953
1954 @unittest.skipUnless(config.extended, "part of extended tests")
1955 class BFDTunTestCase(VppTestCase):
1956     """ BFD over GRE tunnel """
1957
1958     vpp_session = None
1959     test_session = None
1960
1961     @classmethod
1962     def setUpClass(cls):
1963         super(BFDTunTestCase, cls).setUpClass()
1964
1965     @classmethod
1966     def tearDownClass(cls):
1967         super(BFDTunTestCase, cls).tearDownClass()
1968
1969     def setUp(self):
1970         super(BFDTunTestCase, self).setUp()
1971         self.create_pg_interfaces(range(1))
1972
1973         self.vapi.want_bfd_events()
1974         self.pg0.enable_capture()
1975
1976         for i in self.pg_interfaces:
1977             i.admin_up()
1978             i.config_ip4()
1979             i.resolve_arp()
1980
1981     def tearDown(self):
1982         if not self.vpp_dead:
1983             self.vapi.want_bfd_events(enable_disable=0)
1984
1985         super(BFDTunTestCase, self).tearDown()
1986
1987     @staticmethod
1988     def pkt_is_not_data_traffic(p):
1989         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1990         if p.haslayer(BFD) or is_ipv6_misc(p):
1991             return True
1992         return False
1993
1994     def test_bfd_o_gre(self):
1995         """ BFD-o-GRE  """
1996
1997         # A GRE interface over which to run a BFD session
1998         gre_if = VppGreInterface(self,
1999                                  self.pg0.local_ip4,
2000                                  self.pg0.remote_ip4)
2001         gre_if.add_vpp_config()
2002         gre_if.admin_up()
2003         gre_if.config_ip4()
2004
2005         # bring the session up now the routes are present
2006         self.vpp_session = VppBFDUDPSession(self,
2007                                             gre_if,
2008                                             gre_if.remote_ip4,
2009                                             is_tunnel=True)
2010         self.vpp_session.add_vpp_config()
2011         self.vpp_session.admin_up()
2012
2013         self.test_session = BFDTestSession(
2014             self, gre_if, AF_INET,
2015             tunnel_header=(IP(src=self.pg0.remote_ip4,
2016                               dst=self.pg0.local_ip4) /
2017                            GRE()),
2018             phy_interface=self.pg0)
2019
2020         # packets to match against both of the routes
2021         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2022               IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4) /
2023               UDP(sport=1234, dport=1234) /
2024               Raw(b'\xa5' * 100))]
2025
2026         # session is up - traffic passes
2027         bfd_session_up(self)
2028
2029         self.send_and_expect(self.pg0, p, self.pg0)
2030
2031         # bring session down
2032         bfd_session_down(self)
2033
2034
2035 @tag_run_solo
2036 class BFDSHA1TestCase(VppTestCase):
2037     """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
2038
2039     pg0 = None
2040     vpp_clock_offset = None
2041     vpp_session = None
2042     test_session = None
2043
2044     @classmethod
2045     def setUpClass(cls):
2046         super(BFDSHA1TestCase, cls).setUpClass()
2047         cls.vapi.cli("set log class bfd level debug")
2048         try:
2049             cls.create_pg_interfaces([0])
2050             cls.pg0.config_ip4()
2051             cls.pg0.admin_up()
2052             cls.pg0.resolve_arp()
2053
2054         except Exception:
2055             super(BFDSHA1TestCase, cls).tearDownClass()
2056             raise
2057
2058     @classmethod
2059     def tearDownClass(cls):
2060         super(BFDSHA1TestCase, cls).tearDownClass()
2061
2062     def setUp(self):
2063         super(BFDSHA1TestCase, self).setUp()
2064         self.factory = AuthKeyFactory()
2065         self.vapi.want_bfd_events()
2066         self.pg0.enable_capture()
2067
2068     def tearDown(self):
2069         if not self.vpp_dead:
2070             self.vapi.want_bfd_events(enable_disable=False)
2071         self.vapi.collect_events()  # clear the event queue
2072         super(BFDSHA1TestCase, self).tearDown()
2073
2074     def test_session_up(self):
2075         """ bring BFD session up """
2076         key = self.factory.create_random_key(self)
2077         key.add_vpp_config()
2078         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2079                                             self.pg0.remote_ip4,
2080                                             sha1_key=key)
2081         self.vpp_session.add_vpp_config()
2082         self.vpp_session.admin_up()
2083         self.test_session = BFDTestSession(
2084             self, self.pg0, AF_INET, sha1_key=key,
2085             bfd_key_id=self.vpp_session.bfd_key_id)
2086         bfd_session_up(self)
2087
2088     def test_hold_up(self):
2089         """ hold BFD session up """
2090         key = self.factory.create_random_key(self)
2091         key.add_vpp_config()
2092         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2093                                             self.pg0.remote_ip4,
2094                                             sha1_key=key)
2095         self.vpp_session.add_vpp_config()
2096         self.vpp_session.admin_up()
2097         self.test_session = BFDTestSession(
2098             self, self.pg0, AF_INET, sha1_key=key,
2099             bfd_key_id=self.vpp_session.bfd_key_id)
2100         bfd_session_up(self)
2101         for dummy in range(self.test_session.detect_mult * 2):
2102             wait_for_bfd_packet(self)
2103             self.test_session.send_packet()
2104         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2105
2106     def test_hold_up_meticulous(self):
2107         """ hold BFD session up - meticulous auth """
2108         key = self.factory.create_random_key(
2109             self, BFDAuthType.meticulous_keyed_sha1)
2110         key.add_vpp_config()
2111         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2112                                             self.pg0.remote_ip4, sha1_key=key)
2113         self.vpp_session.add_vpp_config()
2114         self.vpp_session.admin_up()
2115         # specify sequence number so that it wraps
2116         self.test_session = BFDTestSession(
2117             self, self.pg0, AF_INET, sha1_key=key,
2118             bfd_key_id=self.vpp_session.bfd_key_id,
2119             our_seq_number=0xFFFFFFFF - 4)
2120         bfd_session_up(self)
2121         for dummy in range(30):
2122             wait_for_bfd_packet(self)
2123             self.test_session.inc_seq_num()
2124             self.test_session.send_packet()
2125         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2126
2127     def test_send_bad_seq_number(self):
2128         """ session is not kept alive by msgs with bad sequence numbers"""
2129         key = self.factory.create_random_key(
2130             self, BFDAuthType.meticulous_keyed_sha1)
2131         key.add_vpp_config()
2132         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2133                                             self.pg0.remote_ip4, sha1_key=key)
2134         self.vpp_session.add_vpp_config()
2135         self.test_session = BFDTestSession(
2136             self, self.pg0, AF_INET, sha1_key=key,
2137             bfd_key_id=self.vpp_session.bfd_key_id)
2138         bfd_session_up(self)
2139         detection_time = self.test_session.detect_mult *\
2140             self.vpp_session.required_min_rx / USEC_IN_SEC
2141         send_until = time.time() + 2 * detection_time
2142         while time.time() < send_until:
2143             self.test_session.send_packet()
2144             self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
2145                        "time between bfd packets")
2146         e = self.vapi.collect_events()
2147         # session should be down now, because the sequence numbers weren't
2148         # updated
2149         self.assert_equal(len(e), 1, "number of bfd events")
2150         verify_event(self, e[0], expected_state=BFDState.down)
2151
2152     def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
2153                                        legitimate_test_session,
2154                                        rogue_test_session,
2155                                        rogue_bfd_values=None):
2156         """ execute a rogue session interaction scenario
2157
2158         1. create vpp session, add config
2159         2. bring the legitimate session up
2160         3. copy the bfd values from legitimate session to rogue session
2161         4. apply rogue_bfd_values to rogue session
2162         5. set rogue session state to down
2163         6. send message to take the session down from the rogue session
2164         7. assert that the legitimate session is unaffected
2165         """
2166
2167         self.vpp_session = vpp_bfd_udp_session
2168         self.vpp_session.add_vpp_config()
2169         self.test_session = legitimate_test_session
2170         # bring vpp session up
2171         bfd_session_up(self)
2172         # send packet from rogue session
2173         rogue_test_session.update(
2174             my_discriminator=self.test_session.my_discriminator,
2175             your_discriminator=self.test_session.your_discriminator,
2176             desired_min_tx=self.test_session.desired_min_tx,
2177             required_min_rx=self.test_session.required_min_rx,
2178             detect_mult=self.test_session.detect_mult,
2179             diag=self.test_session.diag,
2180             state=self.test_session.state,
2181             auth_type=self.test_session.auth_type)
2182         if rogue_bfd_values:
2183             rogue_test_session.update(**rogue_bfd_values)
2184         rogue_test_session.update(state=BFDState.down)
2185         rogue_test_session.send_packet()
2186         wait_for_bfd_packet(self)
2187         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2188
2189     def test_mismatch_auth(self):
2190         """ session is not brought down by unauthenticated msg """
2191         key = self.factory.create_random_key(self)
2192         key.add_vpp_config()
2193         vpp_session = VppBFDUDPSession(
2194             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2195         legitimate_test_session = BFDTestSession(
2196             self, self.pg0, AF_INET, sha1_key=key,
2197             bfd_key_id=vpp_session.bfd_key_id)
2198         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2199         self.execute_rogue_session_scenario(vpp_session,
2200                                             legitimate_test_session,
2201                                             rogue_test_session)
2202
2203     def test_mismatch_bfd_key_id(self):
2204         """ session is not brought down by msg with non-existent key-id """
2205         key = self.factory.create_random_key(self)
2206         key.add_vpp_config()
2207         vpp_session = VppBFDUDPSession(
2208             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2209         # pick a different random bfd key id
2210         x = randint(0, 255)
2211         while x == vpp_session.bfd_key_id:
2212             x = randint(0, 255)
2213         legitimate_test_session = BFDTestSession(
2214             self, self.pg0, AF_INET, sha1_key=key,
2215             bfd_key_id=vpp_session.bfd_key_id)
2216         rogue_test_session = BFDTestSession(
2217             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
2218         self.execute_rogue_session_scenario(vpp_session,
2219                                             legitimate_test_session,
2220                                             rogue_test_session)
2221
2222     def test_mismatched_auth_type(self):
2223         """ session is not brought down by msg with wrong auth type """
2224         key = self.factory.create_random_key(self)
2225         key.add_vpp_config()
2226         vpp_session = VppBFDUDPSession(
2227             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2228         legitimate_test_session = BFDTestSession(
2229             self, self.pg0, AF_INET, sha1_key=key,
2230             bfd_key_id=vpp_session.bfd_key_id)
2231         rogue_test_session = BFDTestSession(
2232             self, self.pg0, AF_INET, sha1_key=key,
2233             bfd_key_id=vpp_session.bfd_key_id)
2234         self.execute_rogue_session_scenario(
2235             vpp_session, legitimate_test_session, rogue_test_session,
2236             {'auth_type': BFDAuthType.keyed_md5})
2237
2238     def test_restart(self):
2239         """ simulate remote peer restart and resynchronization """
2240         key = self.factory.create_random_key(
2241             self, BFDAuthType.meticulous_keyed_sha1)
2242         key.add_vpp_config()
2243         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2244                                             self.pg0.remote_ip4, sha1_key=key)
2245         self.vpp_session.add_vpp_config()
2246         self.test_session = BFDTestSession(
2247             self, self.pg0, AF_INET, sha1_key=key,
2248             bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2249         bfd_session_up(self)
2250         # don't send any packets for 2*detection_time
2251         detection_time = self.test_session.detect_mult *\
2252             self.vpp_session.required_min_rx / USEC_IN_SEC
2253         self.sleep(2 * detection_time, "simulating peer restart")
2254         events = self.vapi.collect_events()
2255         self.assert_equal(len(events), 1, "number of bfd events")
2256         verify_event(self, events[0], expected_state=BFDState.down)
2257         self.test_session.update(state=BFDState.down)
2258         # reset sequence number
2259         self.test_session.our_seq_number = 0
2260         self.test_session.vpp_seq_number = None
2261         # now throw away any pending packets
2262         self.pg0.enable_capture()
2263         self.test_session.my_discriminator = 0
2264         bfd_session_up(self)
2265
2266
2267 @tag_run_solo
2268 class BFDAuthOnOffTestCase(VppTestCase):
2269     """Bidirectional Forwarding Detection (BFD) (changing auth) """
2270
2271     pg0 = None
2272     vpp_session = None
2273     test_session = None
2274
2275     @classmethod
2276     def setUpClass(cls):
2277         super(BFDAuthOnOffTestCase, cls).setUpClass()
2278         cls.vapi.cli("set log class bfd level debug")
2279         try:
2280             cls.create_pg_interfaces([0])
2281             cls.pg0.config_ip4()
2282             cls.pg0.admin_up()
2283             cls.pg0.resolve_arp()
2284
2285         except Exception:
2286             super(BFDAuthOnOffTestCase, cls).tearDownClass()
2287             raise
2288
2289     @classmethod
2290     def tearDownClass(cls):
2291         super(BFDAuthOnOffTestCase, cls).tearDownClass()
2292
2293     def setUp(self):
2294         super(BFDAuthOnOffTestCase, self).setUp()
2295         self.factory = AuthKeyFactory()
2296         self.vapi.want_bfd_events()
2297         self.pg0.enable_capture()
2298
2299     def tearDown(self):
2300         if not self.vpp_dead:
2301             self.vapi.want_bfd_events(enable_disable=False)
2302         self.vapi.collect_events()  # clear the event queue
2303         super(BFDAuthOnOffTestCase, self).tearDown()
2304
2305     def test_auth_on_immediate(self):
2306         """ turn auth on without disturbing session state (immediate) """
2307         key = self.factory.create_random_key(self)
2308         key.add_vpp_config()
2309         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2310                                             self.pg0.remote_ip4)
2311         self.vpp_session.add_vpp_config()
2312         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2313         bfd_session_up(self)
2314         for dummy in range(self.test_session.detect_mult * 2):
2315             p = wait_for_bfd_packet(self)
2316             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2317             self.test_session.send_packet()
2318         self.vpp_session.activate_auth(key)
2319         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2320         self.test_session.sha1_key = key
2321         for dummy in range(self.test_session.detect_mult * 2):
2322             p = wait_for_bfd_packet(self)
2323             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2324             self.test_session.send_packet()
2325         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2326         self.assert_equal(len(self.vapi.collect_events()), 0,
2327                           "number of bfd events")
2328
2329     def test_auth_off_immediate(self):
2330         """ turn auth off without disturbing session state (immediate) """
2331         key = self.factory.create_random_key(self)
2332         key.add_vpp_config()
2333         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2334                                             self.pg0.remote_ip4, sha1_key=key)
2335         self.vpp_session.add_vpp_config()
2336         self.test_session = BFDTestSession(
2337             self, self.pg0, AF_INET, sha1_key=key,
2338             bfd_key_id=self.vpp_session.bfd_key_id)
2339         bfd_session_up(self)
2340         # self.vapi.want_bfd_events(enable_disable=0)
2341         for dummy in range(self.test_session.detect_mult * 2):
2342             p = wait_for_bfd_packet(self)
2343             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2344             self.test_session.inc_seq_num()
2345             self.test_session.send_packet()
2346         self.vpp_session.deactivate_auth()
2347         self.test_session.bfd_key_id = None
2348         self.test_session.sha1_key = None
2349         for dummy in range(self.test_session.detect_mult * 2):
2350             p = wait_for_bfd_packet(self)
2351             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2352             self.test_session.inc_seq_num()
2353             self.test_session.send_packet()
2354         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2355         self.assert_equal(len(self.vapi.collect_events()), 0,
2356                           "number of bfd events")
2357
2358     def test_auth_change_key_immediate(self):
2359         """ change auth key without disturbing session state (immediate) """
2360         key1 = self.factory.create_random_key(self)
2361         key1.add_vpp_config()
2362         key2 = self.factory.create_random_key(self)
2363         key2.add_vpp_config()
2364         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2365                                             self.pg0.remote_ip4, sha1_key=key1)
2366         self.vpp_session.add_vpp_config()
2367         self.test_session = BFDTestSession(
2368             self, self.pg0, AF_INET, sha1_key=key1,
2369             bfd_key_id=self.vpp_session.bfd_key_id)
2370         bfd_session_up(self)
2371         for dummy in range(self.test_session.detect_mult * 2):
2372             p = wait_for_bfd_packet(self)
2373             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2374             self.test_session.send_packet()
2375         self.vpp_session.activate_auth(key2)
2376         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2377         self.test_session.sha1_key = key2
2378         for dummy in range(self.test_session.detect_mult * 2):
2379             p = wait_for_bfd_packet(self)
2380             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2381             self.test_session.send_packet()
2382         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2383         self.assert_equal(len(self.vapi.collect_events()), 0,
2384                           "number of bfd events")
2385
2386     def test_auth_on_delayed(self):
2387         """ turn auth on without disturbing session state (delayed) """
2388         key = self.factory.create_random_key(self)
2389         key.add_vpp_config()
2390         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2391                                             self.pg0.remote_ip4)
2392         self.vpp_session.add_vpp_config()
2393         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2394         bfd_session_up(self)
2395         for dummy in range(self.test_session.detect_mult * 2):
2396             wait_for_bfd_packet(self)
2397             self.test_session.send_packet()
2398         self.vpp_session.activate_auth(key, delayed=True)
2399         for dummy in range(self.test_session.detect_mult * 2):
2400             p = wait_for_bfd_packet(self)
2401             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2402             self.test_session.send_packet()
2403         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2404         self.test_session.sha1_key = key
2405         self.test_session.send_packet()
2406         for dummy in range(self.test_session.detect_mult * 2):
2407             p = wait_for_bfd_packet(self)
2408             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2409             self.test_session.send_packet()
2410         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2411         self.assert_equal(len(self.vapi.collect_events()), 0,
2412                           "number of bfd events")
2413
2414     def test_auth_off_delayed(self):
2415         """ turn auth off without disturbing session state (delayed) """
2416         key = self.factory.create_random_key(self)
2417         key.add_vpp_config()
2418         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2419                                             self.pg0.remote_ip4, sha1_key=key)
2420         self.vpp_session.add_vpp_config()
2421         self.test_session = BFDTestSession(
2422             self, self.pg0, AF_INET, sha1_key=key,
2423             bfd_key_id=self.vpp_session.bfd_key_id)
2424         bfd_session_up(self)
2425         for dummy in range(self.test_session.detect_mult * 2):
2426             p = wait_for_bfd_packet(self)
2427             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2428             self.test_session.send_packet()
2429         self.vpp_session.deactivate_auth(delayed=True)
2430         for dummy in range(self.test_session.detect_mult * 2):
2431             p = wait_for_bfd_packet(self)
2432             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2433             self.test_session.send_packet()
2434         self.test_session.bfd_key_id = None
2435         self.test_session.sha1_key = None
2436         self.test_session.send_packet()
2437         for dummy in range(self.test_session.detect_mult * 2):
2438             p = wait_for_bfd_packet(self)
2439             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2440             self.test_session.send_packet()
2441         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2442         self.assert_equal(len(self.vapi.collect_events()), 0,
2443                           "number of bfd events")
2444
2445     def test_auth_change_key_delayed(self):
2446         """ change auth key without disturbing session state (delayed) """
2447         key1 = self.factory.create_random_key(self)
2448         key1.add_vpp_config()
2449         key2 = self.factory.create_random_key(self)
2450         key2.add_vpp_config()
2451         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2452                                             self.pg0.remote_ip4, sha1_key=key1)
2453         self.vpp_session.add_vpp_config()
2454         self.vpp_session.admin_up()
2455         self.test_session = BFDTestSession(
2456             self, self.pg0, AF_INET, sha1_key=key1,
2457             bfd_key_id=self.vpp_session.bfd_key_id)
2458         bfd_session_up(self)
2459         for dummy in range(self.test_session.detect_mult * 2):
2460             p = wait_for_bfd_packet(self)
2461             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2462             self.test_session.send_packet()
2463         self.vpp_session.activate_auth(key2, delayed=True)
2464         for dummy in range(self.test_session.detect_mult * 2):
2465             p = wait_for_bfd_packet(self)
2466             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2467             self.test_session.send_packet()
2468         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2469         self.test_session.sha1_key = key2
2470         self.test_session.send_packet()
2471         for dummy in range(self.test_session.detect_mult * 2):
2472             p = wait_for_bfd_packet(self)
2473             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2474             self.test_session.send_packet()
2475         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2476         self.assert_equal(len(self.vapi.collect_events()), 0,
2477                           "number of bfd events")
2478
2479
2480 @tag_run_solo
2481 class BFDCLITestCase(VppTestCase):
2482     """Bidirectional Forwarding Detection (BFD) (CLI) """
2483     pg0 = None
2484
2485     @classmethod
2486     def setUpClass(cls):
2487         super(BFDCLITestCase, cls).setUpClass()
2488         cls.vapi.cli("set log class bfd level debug")
2489         try:
2490             cls.create_pg_interfaces((0,))
2491             cls.pg0.config_ip4()
2492             cls.pg0.config_ip6()
2493             cls.pg0.resolve_arp()
2494             cls.pg0.resolve_ndp()
2495
2496         except Exception:
2497             super(BFDCLITestCase, cls).tearDownClass()
2498             raise
2499
2500     @classmethod
2501     def tearDownClass(cls):
2502         super(BFDCLITestCase, cls).tearDownClass()
2503
2504     def setUp(self):
2505         super(BFDCLITestCase, self).setUp()
2506         self.factory = AuthKeyFactory()
2507         self.pg0.enable_capture()
2508
2509     def tearDown(self):
2510         try:
2511             self.vapi.want_bfd_events(enable_disable=False)
2512         except UnexpectedApiReturnValueError:
2513             # some tests aren't subscribed, so this is not an issue
2514             pass
2515         self.vapi.collect_events()  # clear the event queue
2516         super(BFDCLITestCase, self).tearDown()
2517
2518     def cli_verify_no_response(self, cli):
2519         """ execute a CLI, asserting that the response is empty """
2520         self.assert_equal(self.vapi.cli(cli),
2521                           "",
2522                           "CLI command response")
2523
2524     def cli_verify_response(self, cli, expected):
2525         """ execute a CLI, asserting that the response matches expectation """
2526         try:
2527             reply = self.vapi.cli(cli)
2528         except CliFailedCommandError as cli_error:
2529             reply = str(cli_error)
2530         self.assert_equal(reply.strip(),
2531                           expected,
2532                           "CLI command response")
2533
2534     def test_show(self):
2535         """ show commands """
2536         k1 = self.factory.create_random_key(self)
2537         k1.add_vpp_config()
2538         k2 = self.factory.create_random_key(
2539             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2540         k2.add_vpp_config()
2541         s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2542         s1.add_vpp_config()
2543         s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2544                               sha1_key=k2)
2545         s2.add_vpp_config()
2546         self.logger.info(self.vapi.ppcli("show bfd keys"))
2547         self.logger.info(self.vapi.ppcli("show bfd sessions"))
2548         self.logger.info(self.vapi.ppcli("show bfd"))
2549
2550     def test_set_del_sha1_key(self):
2551         """ set/delete SHA1 auth key """
2552         k = self.factory.create_random_key(self)
2553         self.registry.register(k, self.logger)
2554         self.cli_verify_no_response(
2555             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2556             (k.conf_key_id,
2557                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2558         self.assertTrue(k.query_vpp_config())
2559         self.vpp_session = VppBFDUDPSession(
2560             self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2561         self.vpp_session.add_vpp_config()
2562         self.test_session = \
2563             BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2564                            bfd_key_id=self.vpp_session.bfd_key_id)
2565         self.vapi.want_bfd_events()
2566         bfd_session_up(self)
2567         bfd_session_down(self)
2568         # try to replace the secret for the key - should fail because the key
2569         # is in-use
2570         k2 = self.factory.create_random_key(self)
2571         self.cli_verify_response(
2572             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2573             (k.conf_key_id,
2574                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2575             "bfd key set: `bfd_auth_set_key' API call failed, "
2576             "rv=-103:BFD object in use")
2577         # manipulating the session using old secret should still work
2578         bfd_session_up(self)
2579         bfd_session_down(self)
2580         self.vpp_session.remove_vpp_config()
2581         self.cli_verify_no_response(
2582             "bfd key del conf-key-id %s" % k.conf_key_id)
2583         self.assertFalse(k.query_vpp_config())
2584
2585     def test_set_del_meticulous_sha1_key(self):
2586         """ set/delete meticulous SHA1 auth key """
2587         k = self.factory.create_random_key(
2588             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2589         self.registry.register(k, self.logger)
2590         self.cli_verify_no_response(
2591             "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2592             (k.conf_key_id,
2593                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2594         self.assertTrue(k.query_vpp_config())
2595         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2596                                             self.pg0.remote_ip6, af=AF_INET6,
2597                                             sha1_key=k)
2598         self.vpp_session.add_vpp_config()
2599         self.vpp_session.admin_up()
2600         self.test_session = \
2601             BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2602                            bfd_key_id=self.vpp_session.bfd_key_id)
2603         self.vapi.want_bfd_events()
2604         bfd_session_up(self)
2605         bfd_session_down(self)
2606         # try to replace the secret for the key - should fail because the key
2607         # is in-use
2608         k2 = self.factory.create_random_key(self)
2609         self.cli_verify_response(
2610             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2611             (k.conf_key_id,
2612                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2613             "bfd key set: `bfd_auth_set_key' API call failed, "
2614             "rv=-103:BFD object in use")
2615         # manipulating the session using old secret should still work
2616         bfd_session_up(self)
2617         bfd_session_down(self)
2618         self.vpp_session.remove_vpp_config()
2619         self.cli_verify_no_response(
2620             "bfd key del conf-key-id %s" % k.conf_key_id)
2621         self.assertFalse(k.query_vpp_config())
2622
2623     def test_add_mod_del_bfd_udp(self):
2624         """ create/modify/delete IPv4 BFD UDP session """
2625         vpp_session = VppBFDUDPSession(
2626             self, self.pg0, self.pg0.remote_ip4)
2627         self.registry.register(vpp_session, self.logger)
2628         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2629             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2630             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2631                                 self.pg0.remote_ip4,
2632                                 vpp_session.desired_min_tx,
2633                                 vpp_session.required_min_rx,
2634                                 vpp_session.detect_mult)
2635         self.cli_verify_no_response(cli_add_cmd)
2636         # 2nd add should fail
2637         self.cli_verify_response(
2638             cli_add_cmd,
2639             "bfd udp session add: `bfd_add_add_session' API call"
2640             " failed, rv=-101:Duplicate BFD object")
2641         verify_bfd_session_config(self, vpp_session)
2642         mod_session = VppBFDUDPSession(
2643             self, self.pg0, self.pg0.remote_ip4,
2644             required_min_rx=2 * vpp_session.required_min_rx,
2645             desired_min_tx=3 * vpp_session.desired_min_tx,
2646             detect_mult=4 * vpp_session.detect_mult)
2647         self.cli_verify_no_response(
2648             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2649             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2650             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2651              mod_session.desired_min_tx, mod_session.required_min_rx,
2652              mod_session.detect_mult))
2653         verify_bfd_session_config(self, mod_session)
2654         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2655             "peer-addr %s" % (self.pg0.name,
2656                               self.pg0.local_ip4, self.pg0.remote_ip4)
2657         self.cli_verify_no_response(cli_del_cmd)
2658         # 2nd del is expected to fail
2659         self.cli_verify_response(
2660             cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2661             " failed, rv=-102:No such BFD object")
2662         self.assertFalse(vpp_session.query_vpp_config())
2663
2664     def test_add_mod_del_bfd_udp6(self):
2665         """ create/modify/delete IPv6 BFD UDP session """
2666         vpp_session = VppBFDUDPSession(
2667             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2668         self.registry.register(vpp_session, self.logger)
2669         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2670             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2671             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2672                                 self.pg0.remote_ip6,
2673                                 vpp_session.desired_min_tx,
2674                                 vpp_session.required_min_rx,
2675                                 vpp_session.detect_mult)
2676         self.cli_verify_no_response(cli_add_cmd)
2677         # 2nd add should fail
2678         self.cli_verify_response(
2679             cli_add_cmd,
2680             "bfd udp session add: `bfd_add_add_session' API call"
2681             " failed, rv=-101:Duplicate BFD object")
2682         verify_bfd_session_config(self, vpp_session)
2683         mod_session = VppBFDUDPSession(
2684             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2685             required_min_rx=2 * vpp_session.required_min_rx,
2686             desired_min_tx=3 * vpp_session.desired_min_tx,
2687             detect_mult=4 * vpp_session.detect_mult)
2688         self.cli_verify_no_response(
2689             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2690             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2691             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2692              mod_session.desired_min_tx,
2693              mod_session.required_min_rx, mod_session.detect_mult))
2694         verify_bfd_session_config(self, mod_session)
2695         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2696             "peer-addr %s" % (self.pg0.name,
2697                               self.pg0.local_ip6, self.pg0.remote_ip6)
2698         self.cli_verify_no_response(cli_del_cmd)
2699         # 2nd del is expected to fail
2700         self.cli_verify_response(
2701             cli_del_cmd,
2702             "bfd udp session del: `bfd_udp_del_session' API call"
2703             " failed, rv=-102:No such BFD object")
2704         self.assertFalse(vpp_session.query_vpp_config())
2705
2706     def test_add_mod_del_bfd_udp_auth(self):
2707         """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2708         key = self.factory.create_random_key(self)
2709         key.add_vpp_config()
2710         vpp_session = VppBFDUDPSession(
2711             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2712         self.registry.register(vpp_session, self.logger)
2713         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2714             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2715             "detect-mult %s conf-key-id %s bfd-key-id %s"\
2716             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2717                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2718                vpp_session.detect_mult, key.conf_key_id,
2719                vpp_session.bfd_key_id)
2720         self.cli_verify_no_response(cli_add_cmd)
2721         # 2nd add should fail
2722         self.cli_verify_response(
2723             cli_add_cmd,
2724             "bfd udp session add: `bfd_add_add_session' API call"
2725             " failed, rv=-101:Duplicate BFD object")
2726         verify_bfd_session_config(self, vpp_session)
2727         mod_session = VppBFDUDPSession(
2728             self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2729             bfd_key_id=vpp_session.bfd_key_id,
2730             required_min_rx=2 * vpp_session.required_min_rx,
2731             desired_min_tx=3 * vpp_session.desired_min_tx,
2732             detect_mult=4 * vpp_session.detect_mult)
2733         self.cli_verify_no_response(
2734             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2735             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2736             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2737              mod_session.desired_min_tx,
2738              mod_session.required_min_rx, mod_session.detect_mult))
2739         verify_bfd_session_config(self, mod_session)
2740         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2741             "peer-addr %s" % (self.pg0.name,
2742                               self.pg0.local_ip4, self.pg0.remote_ip4)
2743         self.cli_verify_no_response(cli_del_cmd)
2744         # 2nd del is expected to fail
2745         self.cli_verify_response(
2746             cli_del_cmd,
2747             "bfd udp session del: `bfd_udp_del_session' API call"
2748             " failed, rv=-102:No such BFD object")
2749         self.assertFalse(vpp_session.query_vpp_config())
2750
2751     def test_add_mod_del_bfd_udp6_auth(self):
2752         """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2753         key = self.factory.create_random_key(
2754             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2755         key.add_vpp_config()
2756         vpp_session = VppBFDUDPSession(
2757             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2758         self.registry.register(vpp_session, self.logger)
2759         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2760             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2761             "detect-mult %s conf-key-id %s bfd-key-id %s" \
2762             % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2763                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2764                vpp_session.detect_mult, key.conf_key_id,
2765                vpp_session.bfd_key_id)
2766         self.cli_verify_no_response(cli_add_cmd)
2767         # 2nd add should fail
2768         self.cli_verify_response(
2769             cli_add_cmd,
2770             "bfd udp session add: `bfd_add_add_session' API call"
2771             " failed, rv=-101:Duplicate BFD object")
2772         verify_bfd_session_config(self, vpp_session)
2773         mod_session = VppBFDUDPSession(
2774             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2775             bfd_key_id=vpp_session.bfd_key_id,
2776             required_min_rx=2 * vpp_session.required_min_rx,
2777             desired_min_tx=3 * vpp_session.desired_min_tx,
2778             detect_mult=4 * vpp_session.detect_mult)
2779         self.cli_verify_no_response(
2780             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2781             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2782             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2783              mod_session.desired_min_tx,
2784              mod_session.required_min_rx, mod_session.detect_mult))
2785         verify_bfd_session_config(self, mod_session)
2786         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2787             "peer-addr %s" % (self.pg0.name,
2788                               self.pg0.local_ip6, self.pg0.remote_ip6)
2789         self.cli_verify_no_response(cli_del_cmd)
2790         # 2nd del is expected to fail
2791         self.cli_verify_response(
2792             cli_del_cmd,
2793             "bfd udp session del: `bfd_udp_del_session' API call"
2794             " failed, rv=-102:No such BFD object")
2795         self.assertFalse(vpp_session.query_vpp_config())
2796
2797     def test_auth_on_off(self):
2798         """ turn authentication on and off """
2799         key = self.factory.create_random_key(
2800             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2801         key.add_vpp_config()
2802         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2803         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2804                                         sha1_key=key)
2805         session.add_vpp_config()
2806         cli_activate = \
2807             "bfd udp session auth activate interface %s local-addr %s "\
2808             "peer-addr %s conf-key-id %s bfd-key-id %s"\
2809             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2810                key.conf_key_id, auth_session.bfd_key_id)
2811         self.cli_verify_no_response(cli_activate)
2812         verify_bfd_session_config(self, auth_session)
2813         self.cli_verify_no_response(cli_activate)
2814         verify_bfd_session_config(self, auth_session)
2815         cli_deactivate = \
2816             "bfd udp session auth deactivate interface %s local-addr %s "\
2817             "peer-addr %s "\
2818             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2819         self.cli_verify_no_response(cli_deactivate)
2820         verify_bfd_session_config(self, session)
2821         self.cli_verify_no_response(cli_deactivate)
2822         verify_bfd_session_config(self, session)
2823
2824     def test_auth_on_off_delayed(self):
2825         """ turn authentication on and off (delayed) """
2826         key = self.factory.create_random_key(
2827             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2828         key.add_vpp_config()
2829         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2830         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2831                                         sha1_key=key)
2832         session.add_vpp_config()
2833         cli_activate = \
2834             "bfd udp session auth activate interface %s local-addr %s "\
2835             "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2836             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2837                key.conf_key_id, auth_session.bfd_key_id)
2838         self.cli_verify_no_response(cli_activate)
2839         verify_bfd_session_config(self, auth_session)
2840         self.cli_verify_no_response(cli_activate)
2841         verify_bfd_session_config(self, auth_session)
2842         cli_deactivate = \
2843             "bfd udp session auth deactivate interface %s local-addr %s "\
2844             "peer-addr %s delayed yes"\
2845             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2846         self.cli_verify_no_response(cli_deactivate)
2847         verify_bfd_session_config(self, session)
2848         self.cli_verify_no_response(cli_deactivate)
2849         verify_bfd_session_config(self, session)
2850
2851     def test_admin_up_down(self):
2852         """ put session admin-up and admin-down """
2853         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2854         session.add_vpp_config()
2855         cli_down = \
2856             "bfd udp session set-flags admin down interface %s local-addr %s "\
2857             "peer-addr %s "\
2858             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2859         cli_up = \
2860             "bfd udp session set-flags admin up interface %s local-addr %s "\
2861             "peer-addr %s "\
2862             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2863         self.cli_verify_no_response(cli_down)
2864         verify_bfd_session_config(self, session, state=BFDState.admin_down)
2865         self.cli_verify_no_response(cli_up)
2866         verify_bfd_session_config(self, session, state=BFDState.down)
2867
2868     def test_set_del_udp_echo_source(self):
2869         """ set/del udp echo source """
2870         self.create_loopback_interfaces(1)
2871         self.loopback0 = self.lo_interfaces[0]
2872         self.loopback0.admin_up()
2873         self.cli_verify_response("show bfd echo-source",
2874                                  "UDP echo source is not set.")
2875         cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2876         self.cli_verify_no_response(cli_set)
2877         self.cli_verify_response("show bfd echo-source",
2878                                  "UDP echo source is: %s\n"
2879                                  "IPv4 address usable as echo source: none\n"
2880                                  "IPv6 address usable as echo source: none" %
2881                                  self.loopback0.name)
2882         self.loopback0.config_ip4()
2883         echo_ip4 = str(ipaddress.IPv4Address(int(ipaddress.IPv4Address(
2884             self.loopback0.local_ip4)) ^ 1))
2885         self.cli_verify_response("show bfd echo-source",
2886                                  "UDP echo source is: %s\n"
2887                                  "IPv4 address usable as echo source: %s\n"
2888                                  "IPv6 address usable as echo source: none" %
2889                                  (self.loopback0.name, echo_ip4))
2890         echo_ip6 = str(ipaddress.IPv6Address(int(ipaddress.IPv6Address(
2891             self.loopback0.local_ip6)) ^ 1))
2892         self.loopback0.config_ip6()
2893         self.cli_verify_response("show bfd echo-source",
2894                                  "UDP echo source is: %s\n"
2895                                  "IPv4 address usable as echo source: %s\n"
2896                                  "IPv6 address usable as echo source: %s" %
2897                                  (self.loopback0.name, echo_ip4, echo_ip6))
2898         cli_del = "bfd udp echo-source del"
2899         self.cli_verify_no_response(cli_del)
2900         self.cli_verify_response("show bfd echo-source",
2901                                  "UDP echo source is not set.")
2902
2903
2904 if __name__ == '__main__':
2905     unittest.main(testRunner=VppTestRunner)