BFD: command line interface
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python
2 """ BFD tests """
3
4 from __future__ import division
5 import unittest
6 import hashlib
7 import binascii
8 import time
9 from struct import pack, unpack
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import UDP, IP
15 from scapy.layers.inet6 import IPv6
16 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
17     BFDDiagCode, BFDState, BFD_vpp_echo
18 from framework import VppTestCase, VppTestRunner
19 from vpp_pg_interface import CaptureTimeoutError
20 from util import ppp
21 from vpp_papi_provider import UnexpectedApiReturnValueError
22
23 USEC_IN_SEC = 1000000
24
25
26 class AuthKeyFactory(object):
27     """Factory class for creating auth keys with unique conf key ID"""
28
29     def __init__(self):
30         self._conf_key_ids = {}
31
32     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
33         """ create a random key with unique conf key id """
34         conf_key_id = randint(0, 0xFFFFFFFF)
35         while conf_key_id in self._conf_key_ids:
36             conf_key_id = randint(0, 0xFFFFFFFF)
37         self._conf_key_ids[conf_key_id] = 1
38         key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
39         return VppBFDAuthKey(test=test, auth_type=auth_type,
40                              conf_key_id=conf_key_id, key=key)
41
42
43 class BFDAPITestCase(VppTestCase):
44     """Bidirectional Forwarding Detection (BFD) - API"""
45
46     pg0 = None
47     pg1 = None
48
49     @classmethod
50     def setUpClass(cls):
51         super(BFDAPITestCase, cls).setUpClass()
52
53         try:
54             cls.create_pg_interfaces(range(2))
55             for i in cls.pg_interfaces:
56                 i.config_ip4()
57                 i.config_ip6()
58                 i.resolve_arp()
59
60         except Exception:
61             super(BFDAPITestCase, cls).tearDownClass()
62             raise
63
64     def setUp(self):
65         super(BFDAPITestCase, self).setUp()
66         self.factory = AuthKeyFactory()
67
68     def test_add_bfd(self):
69         """ create a BFD session """
70         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
71         session.add_vpp_config()
72         self.logger.debug("Session state is %s", session.state)
73         session.remove_vpp_config()
74         session.add_vpp_config()
75         self.logger.debug("Session state is %s", session.state)
76         session.remove_vpp_config()
77
78     def test_double_add(self):
79         """ create the same BFD session twice (negative case) """
80         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
81         session.add_vpp_config()
82
83         with self.vapi.expect_negative_api_retval():
84             session.add_vpp_config()
85
86         session.remove_vpp_config()
87
88     def test_add_bfd6(self):
89         """ create IPv6 BFD session """
90         session = VppBFDUDPSession(
91             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
92         session.add_vpp_config()
93         self.logger.debug("Session state is %s", session.state)
94         session.remove_vpp_config()
95         session.add_vpp_config()
96         self.logger.debug("Session state is %s", session.state)
97         session.remove_vpp_config()
98
99     def test_mod_bfd(self):
100         """ modify BFD session parameters """
101         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
102                                    desired_min_tx=50000,
103                                    required_min_rx=10000,
104                                    detect_mult=1)
105         session.add_vpp_config()
106         s = session.get_bfd_udp_session_dump_entry()
107         self.assert_equal(session.desired_min_tx,
108                           s.desired_min_tx,
109                           "desired min transmit interval")
110         self.assert_equal(session.required_min_rx,
111                           s.required_min_rx,
112                           "required min receive interval")
113         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
114         session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
115                                   required_min_rx=session.required_min_rx * 2,
116                                   detect_mult=session.detect_mult * 2)
117         s = session.get_bfd_udp_session_dump_entry()
118         self.assert_equal(session.desired_min_tx,
119                           s.desired_min_tx,
120                           "desired min transmit interval")
121         self.assert_equal(session.required_min_rx,
122                           s.required_min_rx,
123                           "required min receive interval")
124         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
125
126     def test_add_sha1_keys(self):
127         """ add SHA1 keys """
128         key_count = 10
129         keys = [self.factory.create_random_key(
130             self) for i in range(0, key_count)]
131         for key in keys:
132             self.assertFalse(key.query_vpp_config())
133         for key in keys:
134             key.add_vpp_config()
135         for key in keys:
136             self.assertTrue(key.query_vpp_config())
137         # remove randomly
138         indexes = range(key_count)
139         shuffle(indexes)
140         removed = []
141         for i in indexes:
142             key = keys[i]
143             key.remove_vpp_config()
144             removed.append(i)
145             for j in range(key_count):
146                 key = keys[j]
147                 if j in removed:
148                     self.assertFalse(key.query_vpp_config())
149                 else:
150                     self.assertTrue(key.query_vpp_config())
151         # should be removed now
152         for key in keys:
153             self.assertFalse(key.query_vpp_config())
154         # add back and remove again
155         for key in keys:
156             key.add_vpp_config()
157         for key in keys:
158             self.assertTrue(key.query_vpp_config())
159         for key in keys:
160             key.remove_vpp_config()
161         for key in keys:
162             self.assertFalse(key.query_vpp_config())
163
164     def test_add_bfd_sha1(self):
165         """ create a BFD session (SHA1) """
166         key = self.factory.create_random_key(self)
167         key.add_vpp_config()
168         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
169                                    sha1_key=key)
170         session.add_vpp_config()
171         self.logger.debug("Session state is %s", session.state)
172         session.remove_vpp_config()
173         session.add_vpp_config()
174         self.logger.debug("Session state is %s", session.state)
175         session.remove_vpp_config()
176
177     def test_double_add_sha1(self):
178         """ create the same BFD session twice (negative case) (SHA1) """
179         key = self.factory.create_random_key(self)
180         key.add_vpp_config()
181         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
182                                    sha1_key=key)
183         session.add_vpp_config()
184         with self.assertRaises(Exception):
185             session.add_vpp_config()
186
187     def test_add_auth_nonexistent_key(self):
188         """ create BFD session using non-existent SHA1 (negative case) """
189         session = VppBFDUDPSession(
190             self, self.pg0, self.pg0.remote_ip4,
191             sha1_key=self.factory.create_random_key(self))
192         with self.assertRaises(Exception):
193             session.add_vpp_config()
194
195     def test_shared_sha1_key(self):
196         """ share single SHA1 key between multiple BFD sessions """
197         key = self.factory.create_random_key(self)
198         key.add_vpp_config()
199         sessions = [
200             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
201                              sha1_key=key),
202             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
203                              sha1_key=key, af=AF_INET6),
204             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
205                              sha1_key=key),
206             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
207                              sha1_key=key, af=AF_INET6)]
208         for s in sessions:
209             s.add_vpp_config()
210         removed = 0
211         for s in sessions:
212             e = key.get_bfd_auth_keys_dump_entry()
213             self.assert_equal(e.use_count, len(sessions) - removed,
214                               "Use count for shared key")
215             s.remove_vpp_config()
216             removed += 1
217         e = key.get_bfd_auth_keys_dump_entry()
218         self.assert_equal(e.use_count, len(sessions) - removed,
219                           "Use count for shared key")
220
221     def test_activate_auth(self):
222         """ activate SHA1 authentication """
223         key = self.factory.create_random_key(self)
224         key.add_vpp_config()
225         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
226         session.add_vpp_config()
227         session.activate_auth(key)
228
229     def test_deactivate_auth(self):
230         """ deactivate SHA1 authentication """
231         key = self.factory.create_random_key(self)
232         key.add_vpp_config()
233         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
234         session.add_vpp_config()
235         session.activate_auth(key)
236         session.deactivate_auth()
237
238     def test_change_key(self):
239         """ change SHA1 key """
240         key1 = self.factory.create_random_key(self)
241         key2 = self.factory.create_random_key(self)
242         while key2.conf_key_id == key1.conf_key_id:
243             key2 = self.factory.create_random_key(self)
244         key1.add_vpp_config()
245         key2.add_vpp_config()
246         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
247                                    sha1_key=key1)
248         session.add_vpp_config()
249         session.activate_auth(key2)
250
251
252 class BFDTestSession(object):
253     """ BFD session as seen from test framework side """
254
255     def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
256                  bfd_key_id=None, our_seq_number=None):
257         self.test = test
258         self.af = af
259         self.sha1_key = sha1_key
260         self.bfd_key_id = bfd_key_id
261         self.interface = interface
262         self.udp_sport = randint(49152, 65535)
263         if our_seq_number is None:
264             self.our_seq_number = randint(0, 40000000)
265         else:
266             self.our_seq_number = our_seq_number
267         self.vpp_seq_number = None
268         self.my_discriminator = 0
269         self.desired_min_tx = 100000
270         self.required_min_rx = 100000
271         self.required_min_echo_rx = None
272         self.detect_mult = detect_mult
273         self.diag = BFDDiagCode.no_diagnostic
274         self.your_discriminator = None
275         self.state = BFDState.down
276         self.auth_type = BFDAuthType.no_auth
277
278     def inc_seq_num(self):
279         """ increment sequence number, wrapping if needed """
280         if self.our_seq_number == 0xFFFFFFFF:
281             self.our_seq_number = 0
282         else:
283             self.our_seq_number += 1
284
285     def update(self, my_discriminator=None, your_discriminator=None,
286                desired_min_tx=None, required_min_rx=None,
287                required_min_echo_rx=None, detect_mult=None,
288                diag=None, state=None, auth_type=None):
289         """ update BFD parameters associated with session """
290         if my_discriminator is not None:
291             self.my_discriminator = my_discriminator
292         if your_discriminator is not None:
293             self.your_discriminator = your_discriminator
294         if required_min_rx is not None:
295             self.required_min_rx = required_min_rx
296         if required_min_echo_rx is not None:
297             self.required_min_echo_rx = required_min_echo_rx
298         if desired_min_tx is not None:
299             self.desired_min_tx = desired_min_tx
300         if detect_mult is not None:
301             self.detect_mult = detect_mult
302         if diag is not None:
303             self.diag = diag
304         if state is not None:
305             self.state = state
306         if auth_type is not None:
307             self.auth_type = auth_type
308
309     def fill_packet_fields(self, packet):
310         """ set packet fields with known values in packet """
311         bfd = packet[BFD]
312         if self.my_discriminator:
313             self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
314                                    self.my_discriminator)
315             bfd.my_discriminator = self.my_discriminator
316         if self.your_discriminator:
317             self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
318                                    self.your_discriminator)
319             bfd.your_discriminator = self.your_discriminator
320         if self.required_min_rx:
321             self.test.logger.debug(
322                 "BFD: setting packet.required_min_rx_interval=%s",
323                 self.required_min_rx)
324             bfd.required_min_rx_interval = self.required_min_rx
325         if self.required_min_echo_rx:
326             self.test.logger.debug(
327                 "BFD: setting packet.required_min_echo_rx=%s",
328                 self.required_min_echo_rx)
329             bfd.required_min_echo_rx_interval = self.required_min_echo_rx
330         if self.desired_min_tx:
331             self.test.logger.debug(
332                 "BFD: setting packet.desired_min_tx_interval=%s",
333                 self.desired_min_tx)
334             bfd.desired_min_tx_interval = self.desired_min_tx
335         if self.detect_mult:
336             self.test.logger.debug(
337                 "BFD: setting packet.detect_mult=%s", self.detect_mult)
338             bfd.detect_mult = self.detect_mult
339         if self.diag:
340             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
341             bfd.diag = self.diag
342         if self.state:
343             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
344             bfd.state = self.state
345         if self.auth_type:
346             # this is used by a negative test-case
347             self.test.logger.debug("BFD: setting packet.auth_type=%s",
348                                    self.auth_type)
349             bfd.auth_type = self.auth_type
350
351     def create_packet(self):
352         """ create a BFD packet, reflecting the current state of session """
353         if self.sha1_key:
354             bfd = BFD(flags="A")
355             bfd.auth_type = self.sha1_key.auth_type
356             bfd.auth_len = BFD.sha1_auth_len
357             bfd.auth_key_id = self.bfd_key_id
358             bfd.auth_seq_num = self.our_seq_number
359             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
360         else:
361             bfd = BFD()
362         if self.af == AF_INET6:
363             packet = (Ether(src=self.interface.remote_mac,
364                             dst=self.interface.local_mac) /
365                       IPv6(src=self.interface.remote_ip6,
366                            dst=self.interface.local_ip6,
367                            hlim=255) /
368                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
369                       bfd)
370         else:
371             packet = (Ether(src=self.interface.remote_mac,
372                             dst=self.interface.local_mac) /
373                       IP(src=self.interface.remote_ip4,
374                          dst=self.interface.local_ip4,
375                          ttl=255) /
376                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
377                       bfd)
378         self.test.logger.debug("BFD: Creating packet")
379         self.fill_packet_fields(packet)
380         if self.sha1_key:
381             hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
382                 "\0" * (20 - len(self.sha1_key.key))
383             self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
384                                    hashlib.sha1(hash_material).hexdigest())
385             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
386         return packet
387
388     def send_packet(self, packet=None, interface=None):
389         """ send packet on interface, creating the packet if needed """
390         if packet is None:
391             packet = self.create_packet()
392         if interface is None:
393             interface = self.test.pg0
394         self.test.logger.debug(ppp("Sending packet:", packet))
395         interface.add_stream(packet)
396         self.test.pg_start()
397
398     def verify_sha1_auth(self, packet):
399         """ Verify correctness of authentication in BFD layer. """
400         bfd = packet[BFD]
401         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
402         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
403                                BFDAuthType)
404         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
405         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
406         if self.vpp_seq_number is None:
407             self.vpp_seq_number = bfd.auth_seq_num
408             self.test.logger.debug("Received initial sequence number: %s" %
409                                    self.vpp_seq_number)
410         else:
411             recvd_seq_num = bfd.auth_seq_num
412             self.test.logger.debug("Received followup sequence number: %s" %
413                                    recvd_seq_num)
414             if self.vpp_seq_number < 0xffffffff:
415                 if self.sha1_key.auth_type == \
416                         BFDAuthType.meticulous_keyed_sha1:
417                     self.test.assert_equal(recvd_seq_num,
418                                            self.vpp_seq_number + 1,
419                                            "BFD sequence number")
420                 else:
421                     self.test.assert_in_range(recvd_seq_num,
422                                               self.vpp_seq_number,
423                                               self.vpp_seq_number + 1,
424                                               "BFD sequence number")
425             else:
426                 if self.sha1_key.auth_type == \
427                         BFDAuthType.meticulous_keyed_sha1:
428                     self.test.assert_equal(recvd_seq_num, 0,
429                                            "BFD sequence number")
430                 else:
431                     self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
432                                        "BFD sequence number not one of "
433                                        "(%s, 0)" % self.vpp_seq_number)
434             self.vpp_seq_number = recvd_seq_num
435         # last 20 bytes represent the hash - so replace them with the key,
436         # pad the result with zeros and hash the result
437         hash_material = bfd.original[:-20] + self.sha1_key.key + \
438             "\0" * (20 - len(self.sha1_key.key))
439         expected_hash = hashlib.sha1(hash_material).hexdigest()
440         self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
441                                expected_hash, "Auth key hash")
442
443     def verify_bfd(self, packet):
444         """ Verify correctness of BFD layer. """
445         bfd = packet[BFD]
446         self.test.assert_equal(bfd.version, 1, "BFD version")
447         self.test.assert_equal(bfd.your_discriminator,
448                                self.my_discriminator,
449                                "BFD - your discriminator")
450         if self.sha1_key:
451             self.verify_sha1_auth(packet)
452
453
454 def bfd_session_up(test):
455     """ Bring BFD session up """
456     test.logger.info("BFD: Waiting for slow hello")
457     p = wait_for_bfd_packet(test, 2)
458     old_offset = None
459     if hasattr(test, 'vpp_clock_offset'):
460         old_offset = test.vpp_clock_offset
461     test.vpp_clock_offset = time.time() - p.time
462     test.logger.debug("BFD: Calculated vpp clock offset: %s",
463                       test.vpp_clock_offset)
464     if old_offset:
465         test.assertAlmostEqual(
466             old_offset, test.vpp_clock_offset, delta=0.5,
467             msg="vpp clock offset not stable (new: %s, old: %s)" %
468             (test.vpp_clock_offset, old_offset))
469     test.logger.info("BFD: Sending Init")
470     test.test_session.update(my_discriminator=randint(0, 40000000),
471                              your_discriminator=p[BFD].my_discriminator,
472                              state=BFDState.init)
473     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
474             BFDAuthType.meticulous_keyed_sha1:
475         test.test_session.inc_seq_num()
476     test.test_session.send_packet()
477     test.logger.info("BFD: Waiting for event")
478     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
479     verify_event(test, e, expected_state=BFDState.up)
480     test.logger.info("BFD: Session is Up")
481     test.test_session.update(state=BFDState.up)
482     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
483             BFDAuthType.meticulous_keyed_sha1:
484         test.test_session.inc_seq_num()
485     test.test_session.send_packet()
486     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
487
488
489 def bfd_session_down(test):
490     """ Bring BFD session down """
491     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
492     test.test_session.update(state=BFDState.down)
493     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
494             BFDAuthType.meticulous_keyed_sha1:
495         test.test_session.inc_seq_num()
496     test.test_session.send_packet()
497     test.logger.info("BFD: Waiting for event")
498     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
499     verify_event(test, e, expected_state=BFDState.down)
500     test.logger.info("BFD: Session is Down")
501     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
502
503
504 def verify_bfd_session_config(test, session, state=None):
505     dump = session.get_bfd_udp_session_dump_entry()
506     test.assertIsNotNone(dump)
507     # since dump is not none, we have verified that sw_if_index and addresses
508     # are valid (in get_bfd_udp_session_dump_entry)
509     if state:
510         test.assert_equal(dump.state, state, "session state")
511     test.assert_equal(dump.required_min_rx, session.required_min_rx,
512                       "required min rx interval")
513     test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
514                       "desired min tx interval")
515     test.assert_equal(dump.detect_mult, session.detect_mult,
516                       "detect multiplier")
517     if session.sha1_key is None:
518         test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
519     else:
520         test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
521         test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
522                           "bfd key id")
523         test.assert_equal(dump.conf_key_id,
524                           session.sha1_key.conf_key_id,
525                           "config key id")
526
527
528 def verify_ip(test, packet):
529     """ Verify correctness of IP layer. """
530     if test.vpp_session.af == AF_INET6:
531         ip = packet[IPv6]
532         local_ip = test.pg0.local_ip6
533         remote_ip = test.pg0.remote_ip6
534         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
535     else:
536         ip = packet[IP]
537         local_ip = test.pg0.local_ip4
538         remote_ip = test.pg0.remote_ip4
539         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
540     test.assert_equal(ip.src, local_ip, "IP source address")
541     test.assert_equal(ip.dst, remote_ip, "IP destination address")
542
543
544 def verify_udp(test, packet):
545     """ Verify correctness of UDP layer. """
546     udp = packet[UDP]
547     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
548     test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
549                          "UDP source port")
550
551
552 def verify_event(test, event, expected_state):
553     """ Verify correctness of event values. """
554     e = event
555     test.logger.debug("BFD: Event: %s" % repr(e))
556     test.assert_equal(e.sw_if_index,
557                       test.vpp_session.interface.sw_if_index,
558                       "BFD interface index")
559     is_ipv6 = 0
560     if test.vpp_session.af == AF_INET6:
561         is_ipv6 = 1
562     test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
563     if test.vpp_session.af == AF_INET:
564         test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
565                           "Local IPv4 address")
566         test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
567                           "Peer IPv4 address")
568     else:
569         test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
570                           "Local IPv6 address")
571         test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
572                           "Peer IPv6 address")
573     test.assert_equal(e.state, expected_state, BFDState)
574
575
576 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
577     """ wait for BFD packet and verify its correctness
578
579     :param timeout: how long to wait
580     :param pcap_time_min: ignore packets with pcap timestamp lower than this
581
582     :returns: tuple (packet, time spent waiting for packet)
583     """
584     test.logger.info("BFD: Waiting for BFD packet")
585     deadline = time.time() + timeout
586     counter = 0
587     while True:
588         counter += 1
589         # sanity check
590         test.assert_in_range(counter, 0, 100, "number of packets ignored")
591         time_left = deadline - time.time()
592         if time_left < 0:
593             raise CaptureTimeoutError("Packet did not arrive within timeout")
594         p = test.pg0.wait_for_packet(timeout=time_left)
595         test.logger.debug(ppp("BFD: Got packet:", p))
596         if pcap_time_min is not None and p.time < pcap_time_min:
597             test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
598                                   "pcap time min %s):" %
599                                   (p.time, pcap_time_min), p))
600         else:
601             break
602     bfd = p[BFD]
603     if bfd is None:
604         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
605     if bfd.payload:
606         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
607     verify_ip(test, p)
608     verify_udp(test, p)
609     test.test_session.verify_bfd(p)
610     return p
611
612
613 class BFD4TestCase(VppTestCase):
614     """Bidirectional Forwarding Detection (BFD)"""
615
616     pg0 = None
617     vpp_clock_offset = None
618     vpp_session = None
619     test_session = None
620
621     @classmethod
622     def setUpClass(cls):
623         super(BFD4TestCase, cls).setUpClass()
624         try:
625             cls.create_pg_interfaces([0])
626             cls.create_loopback_interfaces([0])
627             cls.loopback0 = cls.lo_interfaces[0]
628             cls.loopback0.config_ip4()
629             cls.loopback0.admin_up()
630             cls.pg0.config_ip4()
631             cls.pg0.configure_ipv4_neighbors()
632             cls.pg0.admin_up()
633             cls.pg0.resolve_arp()
634
635         except Exception:
636             super(BFD4TestCase, cls).tearDownClass()
637             raise
638
639     def setUp(self):
640         super(BFD4TestCase, self).setUp()
641         self.factory = AuthKeyFactory()
642         self.vapi.want_bfd_events()
643         self.pg0.enable_capture()
644         try:
645             self.vpp_session = VppBFDUDPSession(self, self.pg0,
646                                                 self.pg0.remote_ip4)
647             self.vpp_session.add_vpp_config()
648             self.vpp_session.admin_up()
649             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
650         except:
651             self.vapi.want_bfd_events(enable_disable=0)
652             raise
653
654     def tearDown(self):
655         if not self.vpp_dead:
656             self.vapi.want_bfd_events(enable_disable=0)
657         self.vapi.collect_events()  # clear the event queue
658         super(BFD4TestCase, self).tearDown()
659
660     def test_session_up(self):
661         """ bring BFD session up """
662         bfd_session_up(self)
663
664     def test_session_up_by_ip(self):
665         """ bring BFD session up - first frame looked up by address pair """
666         self.logger.info("BFD: Sending Slow control frame")
667         self.test_session.update(my_discriminator=randint(0, 40000000))
668         self.test_session.send_packet()
669         self.pg0.enable_capture()
670         p = self.pg0.wait_for_packet(1)
671         self.assert_equal(p[BFD].your_discriminator,
672                           self.test_session.my_discriminator,
673                           "BFD - your discriminator")
674         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
675         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
676                                  state=BFDState.up)
677         self.logger.info("BFD: Waiting for event")
678         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
679         verify_event(self, e, expected_state=BFDState.init)
680         self.logger.info("BFD: Sending Up")
681         self.test_session.send_packet()
682         self.logger.info("BFD: Waiting for event")
683         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
684         verify_event(self, e, expected_state=BFDState.up)
685         self.logger.info("BFD: Session is Up")
686         self.test_session.update(state=BFDState.up)
687         self.test_session.send_packet()
688         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
689
690     def test_session_down(self):
691         """ bring BFD session down """
692         bfd_session_up(self)
693         bfd_session_down(self)
694
695     def test_hold_up(self):
696         """ hold BFD session up """
697         bfd_session_up(self)
698         for dummy in range(self.test_session.detect_mult * 2):
699             wait_for_bfd_packet(self)
700             self.test_session.send_packet()
701         self.assert_equal(len(self.vapi.collect_events()), 0,
702                           "number of bfd events")
703
704     def test_slow_timer(self):
705         """ verify slow periodic control frames while session down """
706         packet_count = 3
707         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
708         prev_packet = wait_for_bfd_packet(self, 2)
709         for dummy in range(packet_count):
710             next_packet = wait_for_bfd_packet(self, 2)
711             time_diff = next_packet.time - prev_packet.time
712             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
713             # to work around timing issues
714             self.assert_in_range(
715                 time_diff, 0.70, 1.05, "time between slow packets")
716             prev_packet = next_packet
717
718     def test_zero_remote_min_rx(self):
719         """ no packets when zero remote required min rx interval """
720         bfd_session_up(self)
721         self.test_session.update(required_min_rx=0)
722         self.test_session.send_packet()
723         for dummy in range(self.test_session.detect_mult):
724             self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
725                        "sleep before transmitting bfd packet")
726             self.test_session.send_packet()
727             try:
728                 p = wait_for_bfd_packet(self, timeout=0)
729                 self.logger.error(ppp("Received unexpected packet:", p))
730             except CaptureTimeoutError:
731                 pass
732         self.assert_equal(
733             len(self.vapi.collect_events()), 0, "number of bfd events")
734         self.test_session.update(required_min_rx=100000)
735         for dummy in range(3):
736             self.test_session.send_packet()
737             wait_for_bfd_packet(
738                 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
739         self.assert_equal(
740             len(self.vapi.collect_events()), 0, "number of bfd events")
741
742     def test_conn_down(self):
743         """ verify session goes down after inactivity """
744         bfd_session_up(self)
745         detection_time = self.test_session.detect_mult *\
746             self.vpp_session.required_min_rx / USEC_IN_SEC
747         self.sleep(detection_time, "waiting for BFD session time-out")
748         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
749         verify_event(self, e, expected_state=BFDState.down)
750
751     def test_large_required_min_rx(self):
752         """ large remote required min rx interval """
753         bfd_session_up(self)
754         p = wait_for_bfd_packet(self)
755         interval = 3000000
756         self.test_session.update(required_min_rx=interval)
757         self.test_session.send_packet()
758         time_mark = time.time()
759         count = 0
760         # busy wait here, trying to collect a packet or event, vpp is not
761         # allowed to send packets and the session will timeout first - so the
762         # Up->Down event must arrive before any packets do
763         while time.time() < time_mark + interval / USEC_IN_SEC:
764             try:
765                 p = wait_for_bfd_packet(self, timeout=0)
766                 # if vpp managed to send a packet before we did the session
767                 # session update, then that's fine, ignore it
768                 if p.time < time_mark - self.vpp_clock_offset:
769                     continue
770                 self.logger.error(ppp("Received unexpected packet:", p))
771                 count += 1
772             except CaptureTimeoutError:
773                 pass
774             events = self.vapi.collect_events()
775             if len(events) > 0:
776                 verify_event(self, events[0], BFDState.down)
777                 break
778         self.assert_equal(count, 0, "number of packets received")
779
780     def test_immediate_remote_min_rx_reduction(self):
781         """ immediately honor remote required min rx reduction """
782         self.vpp_session.remove_vpp_config()
783         self.vpp_session = VppBFDUDPSession(
784             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
785         self.pg0.enable_capture()
786         self.vpp_session.add_vpp_config()
787         self.test_session.update(desired_min_tx=1000000,
788                                  required_min_rx=1000000)
789         bfd_session_up(self)
790         reference_packet = wait_for_bfd_packet(self)
791         time_mark = time.time()
792         interval = 300000
793         self.test_session.update(required_min_rx=interval)
794         self.test_session.send_packet()
795         extra_time = time.time() - time_mark
796         p = wait_for_bfd_packet(self)
797         # first packet is allowed to be late by time we spent doing the update
798         # calculated in extra_time
799         self.assert_in_range(p.time - reference_packet.time,
800                              .95 * 0.75 * interval / USEC_IN_SEC,
801                              1.05 * interval / USEC_IN_SEC + extra_time,
802                              "time between BFD packets")
803         reference_packet = p
804         for dummy in range(3):
805             p = wait_for_bfd_packet(self)
806             diff = p.time - reference_packet.time
807             self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
808                                  1.05 * interval / USEC_IN_SEC,
809                                  "time between BFD packets")
810             reference_packet = p
811
812     def test_modify_req_min_rx_double(self):
813         """ modify session - double required min rx """
814         bfd_session_up(self)
815         p = wait_for_bfd_packet(self)
816         self.test_session.update(desired_min_tx=10000,
817                                  required_min_rx=10000)
818         self.test_session.send_packet()
819         # double required min rx
820         self.vpp_session.modify_parameters(
821             required_min_rx=2 * self.vpp_session.required_min_rx)
822         p = wait_for_bfd_packet(
823             self, pcap_time_min=time.time() - self.vpp_clock_offset)
824         # poll bit needs to be set
825         self.assertIn("P", p.sprintf("%BFD.flags%"),
826                       "Poll bit not set in BFD packet")
827         # finish poll sequence with final packet
828         final = self.test_session.create_packet()
829         final[BFD].flags = "F"
830         timeout = self.test_session.detect_mult * \
831             max(self.test_session.desired_min_tx,
832                 self.vpp_session.required_min_rx) / USEC_IN_SEC
833         self.test_session.send_packet(final)
834         time_mark = time.time()
835         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
836         verify_event(self, e, expected_state=BFDState.down)
837         time_to_event = time.time() - time_mark
838         self.assert_in_range(time_to_event, .9 * timeout,
839                              1.1 * timeout, "session timeout")
840
841     def test_modify_req_min_rx_halve(self):
842         """ modify session - halve required min rx """
843         self.vpp_session.modify_parameters(
844             required_min_rx=2 * self.vpp_session.required_min_rx)
845         bfd_session_up(self)
846         p = wait_for_bfd_packet(self)
847         self.test_session.update(desired_min_tx=10000,
848                                  required_min_rx=10000)
849         self.test_session.send_packet()
850         p = wait_for_bfd_packet(
851             self, pcap_time_min=time.time() - self.vpp_clock_offset)
852         # halve required min rx
853         old_required_min_rx = self.vpp_session.required_min_rx
854         self.vpp_session.modify_parameters(
855             required_min_rx=0.5 * self.vpp_session.required_min_rx)
856         # now we wait 0.8*3*old-req-min-rx and the session should still be up
857         self.sleep(0.8 * self.vpp_session.detect_mult *
858                    old_required_min_rx / USEC_IN_SEC)
859         self.assert_equal(len(self.vapi.collect_events()), 0,
860                           "number of bfd events")
861         p = wait_for_bfd_packet(self)
862         # poll bit needs to be set
863         self.assertIn("P", p.sprintf("%BFD.flags%"),
864                       "Poll bit not set in BFD packet")
865         # finish poll sequence with final packet
866         final = self.test_session.create_packet()
867         final[BFD].flags = "F"
868         self.test_session.send_packet(final)
869         # now the session should time out under new conditions
870         before = time.time()
871         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
872         after = time.time()
873         detection_time = self.test_session.detect_mult *\
874             self.vpp_session.required_min_rx / USEC_IN_SEC
875         self.assert_in_range(after - before,
876                              0.9 * detection_time,
877                              1.1 * detection_time,
878                              "time before bfd session goes down")
879         verify_event(self, e, expected_state=BFDState.down)
880
881     def test_modify_detect_mult(self):
882         """ modify detect multiplier """
883         bfd_session_up(self)
884         p = wait_for_bfd_packet(self)
885         self.vpp_session.modify_parameters(detect_mult=1)
886         p = wait_for_bfd_packet(
887             self, pcap_time_min=time.time() - self.vpp_clock_offset)
888         self.assert_equal(self.vpp_session.detect_mult,
889                           p[BFD].detect_mult,
890                           "detect mult")
891         # poll bit must not be set
892         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
893                          "Poll bit not set in BFD packet")
894         self.vpp_session.modify_parameters(detect_mult=10)
895         p = wait_for_bfd_packet(
896             self, pcap_time_min=time.time() - self.vpp_clock_offset)
897         self.assert_equal(self.vpp_session.detect_mult,
898                           p[BFD].detect_mult,
899                           "detect mult")
900         # poll bit must not be set
901         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
902                          "Poll bit not set in BFD packet")
903
904     def test_queued_poll(self):
905         """ test poll sequence queueing """
906         bfd_session_up(self)
907         p = wait_for_bfd_packet(self)
908         self.vpp_session.modify_parameters(
909             required_min_rx=2 * self.vpp_session.required_min_rx)
910         p = wait_for_bfd_packet(self)
911         poll_sequence_start = time.time()
912         poll_sequence_length_min = 0.5
913         send_final_after = time.time() + poll_sequence_length_min
914         # poll bit needs to be set
915         self.assertIn("P", p.sprintf("%BFD.flags%"),
916                       "Poll bit not set in BFD packet")
917         self.assert_equal(p[BFD].required_min_rx_interval,
918                           self.vpp_session.required_min_rx,
919                           "BFD required min rx interval")
920         self.vpp_session.modify_parameters(
921             required_min_rx=2 * self.vpp_session.required_min_rx)
922         # 2nd poll sequence should be queued now
923         # don't send the reply back yet, wait for some time to emulate
924         # longer round-trip time
925         packet_count = 0
926         while time.time() < send_final_after:
927             self.test_session.send_packet()
928             p = wait_for_bfd_packet(self)
929             self.assert_equal(len(self.vapi.collect_events()), 0,
930                               "number of bfd events")
931             self.assert_equal(p[BFD].required_min_rx_interval,
932                               self.vpp_session.required_min_rx,
933                               "BFD required min rx interval")
934             packet_count += 1
935             # poll bit must be set
936             self.assertIn("P", p.sprintf("%BFD.flags%"),
937                           "Poll bit not set in BFD packet")
938         final = self.test_session.create_packet()
939         final[BFD].flags = "F"
940         self.test_session.send_packet(final)
941         # finish 1st with final
942         poll_sequence_length = time.time() - poll_sequence_start
943         # vpp must wait for some time before starting new poll sequence
944         poll_no_2_started = False
945         for dummy in range(2 * packet_count):
946             p = wait_for_bfd_packet(self)
947             self.assert_equal(len(self.vapi.collect_events()), 0,
948                               "number of bfd events")
949             if "P" in p.sprintf("%BFD.flags%"):
950                 poll_no_2_started = True
951                 if time.time() < poll_sequence_start + poll_sequence_length:
952                     raise Exception("VPP started 2nd poll sequence too soon")
953                 final = self.test_session.create_packet()
954                 final[BFD].flags = "F"
955                 self.test_session.send_packet(final)
956                 break
957             else:
958                 self.test_session.send_packet()
959         self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
960         # finish 2nd with final
961         final = self.test_session.create_packet()
962         final[BFD].flags = "F"
963         self.test_session.send_packet(final)
964         p = wait_for_bfd_packet(self)
965         # poll bit must not be set
966         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
967                          "Poll bit set in BFD packet")
968
969     def test_poll_response(self):
970         """ test correct response to control frame with poll bit set """
971         bfd_session_up(self)
972         poll = self.test_session.create_packet()
973         poll[BFD].flags = "P"
974         self.test_session.send_packet(poll)
975         final = wait_for_bfd_packet(
976             self, pcap_time_min=time.time() - self.vpp_clock_offset)
977         self.assertIn("F", final.sprintf("%BFD.flags%"))
978
979     def test_no_periodic_if_remote_demand(self):
980         """ no periodic frames outside poll sequence if remote demand set """
981         bfd_session_up(self)
982         demand = self.test_session.create_packet()
983         demand[BFD].flags = "D"
984         self.test_session.send_packet(demand)
985         transmit_time = 0.9 \
986             * max(self.vpp_session.required_min_rx,
987                   self.test_session.desired_min_tx) \
988             / USEC_IN_SEC
989         count = 0
990         for dummy in range(self.test_session.detect_mult * 2):
991             time.sleep(transmit_time)
992             self.test_session.send_packet(demand)
993             try:
994                 p = wait_for_bfd_packet(self, timeout=0)
995                 self.logger.error(ppp("Received unexpected packet:", p))
996                 count += 1
997             except CaptureTimeoutError:
998                 pass
999         events = self.vapi.collect_events()
1000         for e in events:
1001             self.logger.error("Received unexpected event: %s", e)
1002         self.assert_equal(count, 0, "number of packets received")
1003         self.assert_equal(len(events), 0, "number of events received")
1004
1005     def test_echo_looped_back(self):
1006         """ echo packets looped back """
1007         # don't need a session in this case..
1008         self.vpp_session.remove_vpp_config()
1009         self.pg0.enable_capture()
1010         echo_packet_count = 10
1011         # random source port low enough to increment a few times..
1012         udp_sport_tx = randint(1, 50000)
1013         udp_sport_rx = udp_sport_tx
1014         echo_packet = (Ether(src=self.pg0.remote_mac,
1015                              dst=self.pg0.local_mac) /
1016                        IP(src=self.pg0.remote_ip4,
1017                           dst=self.pg0.remote_ip4) /
1018                        UDP(dport=BFD.udp_dport_echo) /
1019                        Raw("this should be looped back"))
1020         for dummy in range(echo_packet_count):
1021             self.sleep(.01, "delay between echo packets")
1022             echo_packet[UDP].sport = udp_sport_tx
1023             udp_sport_tx += 1
1024             self.logger.debug(ppp("Sending packet:", echo_packet))
1025             self.pg0.add_stream(echo_packet)
1026             self.pg_start()
1027         for dummy in range(echo_packet_count):
1028             p = self.pg0.wait_for_packet(1)
1029             self.logger.debug(ppp("Got packet:", p))
1030             ether = p[Ether]
1031             self.assert_equal(self.pg0.remote_mac,
1032                               ether.dst, "Destination MAC")
1033             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1034             ip = p[IP]
1035             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1036             self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1037             udp = p[UDP]
1038             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1039                               "UDP destination port")
1040             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1041             udp_sport_rx += 1
1042             # need to compare the hex payload here, otherwise BFD_vpp_echo
1043             # gets in way
1044             self.assertEqual(str(p[UDP].payload),
1045                              str(echo_packet[UDP].payload),
1046                              "Received packet is not the echo packet sent")
1047         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1048                           "ECHO packet identifier for test purposes)")
1049
1050     def test_echo(self):
1051         """ echo function """
1052         bfd_session_up(self)
1053         self.test_session.update(required_min_echo_rx=50000)
1054         self.test_session.send_packet()
1055         detection_time = self.test_session.detect_mult *\
1056             self.vpp_session.required_min_rx / USEC_IN_SEC
1057         # echo shouldn't work without echo source set
1058         for dummy in range(3):
1059             sleep = 0.75 * detection_time
1060             self.sleep(sleep, "delay before sending bfd packet")
1061             self.test_session.send_packet()
1062         p = wait_for_bfd_packet(
1063             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1064         self.assert_equal(p[BFD].required_min_rx_interval,
1065                           self.vpp_session.required_min_rx,
1066                           "BFD required min rx interval")
1067         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1068         # should be turned on - loopback echo packets
1069         for dummy in range(3):
1070             loop_until = time.time() + 0.75 * detection_time
1071             while time.time() < loop_until:
1072                 p = self.pg0.wait_for_packet(1)
1073                 self.logger.debug(ppp("Got packet:", p))
1074                 if p[UDP].dport == BFD.udp_dport_echo:
1075                     self.assert_equal(
1076                         p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1077                     self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1078                                         "BFD ECHO src IP equal to loopback IP")
1079                     self.logger.debug(ppp("Looping back packet:", p))
1080                     self.pg0.add_stream(p)
1081                     self.pg_start()
1082                 elif p.haslayer(BFD):
1083                     self.assertGreaterEqual(p[BFD].required_min_rx_interval,
1084                                             1000000)
1085                     if "P" in p.sprintf("%BFD.flags%"):
1086                         final = self.test_session.create_packet()
1087                         final[BFD].flags = "F"
1088                         self.test_session.send_packet(final)
1089                 else:
1090                     raise Exception(ppp("Received unknown packet:", p))
1091
1092                 self.assert_equal(len(self.vapi.collect_events()), 0,
1093                                   "number of bfd events")
1094             self.test_session.send_packet()
1095
1096     def test_echo_fail(self):
1097         """ session goes down if echo function fails """
1098         bfd_session_up(self)
1099         self.test_session.update(required_min_echo_rx=50000)
1100         self.test_session.send_packet()
1101         detection_time = self.test_session.detect_mult *\
1102             self.vpp_session.required_min_rx / USEC_IN_SEC
1103         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1104         # echo function should be used now, but we will drop the echo packets
1105         verified_diag = False
1106         for dummy in range(3):
1107             loop_until = time.time() + 0.75 * detection_time
1108             while time.time() < loop_until:
1109                 p = self.pg0.wait_for_packet(1)
1110                 self.logger.debug(ppp("Got packet:", p))
1111                 if p[UDP].dport == BFD.udp_dport_echo:
1112                     # dropped
1113                     pass
1114                 elif p.haslayer(BFD):
1115                     if "P" in p.sprintf("%BFD.flags%"):
1116                         self.assertGreaterEqual(
1117                             p[BFD].required_min_rx_interval,
1118                             1000000)
1119                         final = self.test_session.create_packet()
1120                         final[BFD].flags = "F"
1121                         self.test_session.send_packet(final)
1122                     if p[BFD].state == BFDState.down:
1123                         self.assert_equal(p[BFD].diag,
1124                                           BFDDiagCode.echo_function_failed,
1125                                           BFDDiagCode)
1126                         verified_diag = True
1127                 else:
1128                     raise Exception(ppp("Received unknown packet:", p))
1129             self.test_session.send_packet()
1130         events = self.vapi.collect_events()
1131         self.assert_equal(len(events), 1, "number of bfd events")
1132         self.assert_equal(events[0].state, BFDState.down, BFDState)
1133         self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1134
1135     def test_echo_stop(self):
1136         """ echo function stops if peer sets required min echo rx zero """
1137         bfd_session_up(self)
1138         self.test_session.update(required_min_echo_rx=50000)
1139         self.test_session.send_packet()
1140         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1141         # wait for first echo packet
1142         while True:
1143             p = self.pg0.wait_for_packet(1)
1144             self.logger.debug(ppp("Got packet:", p))
1145             if p[UDP].dport == BFD.udp_dport_echo:
1146                 self.logger.debug(ppp("Looping back packet:", p))
1147                 self.pg0.add_stream(p)
1148                 self.pg_start()
1149                 break
1150             elif p.haslayer(BFD):
1151                 # ignore BFD
1152                 pass
1153             else:
1154                 raise Exception(ppp("Received unknown packet:", p))
1155         self.test_session.update(required_min_echo_rx=0)
1156         self.test_session.send_packet()
1157         # echo packets shouldn't arrive anymore
1158         for dummy in range(5):
1159             wait_for_bfd_packet(
1160                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1161             self.test_session.send_packet()
1162             events = self.vapi.collect_events()
1163             self.assert_equal(len(events), 0, "number of bfd events")
1164
1165     def test_echo_source_removed(self):
1166         """ echo function stops if echo source is removed """
1167         bfd_session_up(self)
1168         self.test_session.update(required_min_echo_rx=50000)
1169         self.test_session.send_packet()
1170         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1171         # wait for first echo packet
1172         while True:
1173             p = self.pg0.wait_for_packet(1)
1174             self.logger.debug(ppp("Got packet:", p))
1175             if p[UDP].dport == BFD.udp_dport_echo:
1176                 self.logger.debug(ppp("Looping back packet:", p))
1177                 self.pg0.add_stream(p)
1178                 self.pg_start()
1179                 break
1180             elif p.haslayer(BFD):
1181                 # ignore BFD
1182                 pass
1183             else:
1184                 raise Exception(ppp("Received unknown packet:", p))
1185         self.vapi.bfd_udp_del_echo_source()
1186         self.test_session.send_packet()
1187         # echo packets shouldn't arrive anymore
1188         for dummy in range(5):
1189             wait_for_bfd_packet(
1190                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1191             self.test_session.send_packet()
1192             events = self.vapi.collect_events()
1193             self.assert_equal(len(events), 0, "number of bfd events")
1194
1195     def test_stale_echo(self):
1196         """ stale echo packets don't keep a session up """
1197         bfd_session_up(self)
1198         self.test_session.update(required_min_echo_rx=50000)
1199         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1200         self.test_session.send_packet()
1201         # should be turned on - loopback echo packets
1202         echo_packet = None
1203         timeout_at = None
1204         timeout_ok = False
1205         for dummy in range(10 * self.vpp_session.detect_mult):
1206             p = self.pg0.wait_for_packet(1)
1207             if p[UDP].dport == BFD.udp_dport_echo:
1208                 if echo_packet is None:
1209                     self.logger.debug(ppp("Got first echo packet:", p))
1210                     echo_packet = p
1211                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1212                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1213                 else:
1214                     self.logger.debug(ppp("Got followup echo packet:", p))
1215                 self.logger.debug(ppp("Looping back first echo packet:", p))
1216                 self.pg0.add_stream(echo_packet)
1217                 self.pg_start()
1218             elif p.haslayer(BFD):
1219                 self.logger.debug(ppp("Got packet:", p))
1220                 if "P" in p.sprintf("%BFD.flags%"):
1221                     final = self.test_session.create_packet()
1222                     final[BFD].flags = "F"
1223                     self.test_session.send_packet(final)
1224                 if p[BFD].state == BFDState.down:
1225                     self.assertIsNotNone(
1226                         timeout_at,
1227                         "Session went down before first echo packet received")
1228                     now = time.time()
1229                     self.assertGreaterEqual(
1230                         now, timeout_at,
1231                         "Session timeout at %s, but is expected at %s" %
1232                         (now, timeout_at))
1233                     self.assert_equal(p[BFD].diag,
1234                                       BFDDiagCode.echo_function_failed,
1235                                       BFDDiagCode)
1236                     events = self.vapi.collect_events()
1237                     self.assert_equal(len(events), 1, "number of bfd events")
1238                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1239                     timeout_ok = True
1240                     break
1241             else:
1242                 raise Exception(ppp("Received unknown packet:", p))
1243             self.test_session.send_packet()
1244         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1245
1246     def test_invalid_echo_checksum(self):
1247         """ echo packets with invalid checksum don't keep a session up """
1248         bfd_session_up(self)
1249         self.test_session.update(required_min_echo_rx=50000)
1250         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1251         self.test_session.send_packet()
1252         # should be turned on - loopback echo packets
1253         timeout_at = None
1254         timeout_ok = False
1255         for dummy in range(10 * self.vpp_session.detect_mult):
1256             p = self.pg0.wait_for_packet(1)
1257             if p[UDP].dport == BFD.udp_dport_echo:
1258                 self.logger.debug(ppp("Got echo packet:", p))
1259                 if timeout_at is None:
1260                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1261                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1262                 p[BFD_vpp_echo].checksum = getrandbits(64)
1263                 self.logger.debug(ppp("Looping back modified echo packet:", p))
1264                 self.pg0.add_stream(p)
1265                 self.pg_start()
1266             elif p.haslayer(BFD):
1267                 self.logger.debug(ppp("Got packet:", p))
1268                 if "P" in p.sprintf("%BFD.flags%"):
1269                     final = self.test_session.create_packet()
1270                     final[BFD].flags = "F"
1271                     self.test_session.send_packet(final)
1272                 if p[BFD].state == BFDState.down:
1273                     self.assertIsNotNone(
1274                         timeout_at,
1275                         "Session went down before first echo packet received")
1276                     now = time.time()
1277                     self.assertGreaterEqual(
1278                         now, timeout_at,
1279                         "Session timeout at %s, but is expected at %s" %
1280                         (now, timeout_at))
1281                     self.assert_equal(p[BFD].diag,
1282                                       BFDDiagCode.echo_function_failed,
1283                                       BFDDiagCode)
1284                     events = self.vapi.collect_events()
1285                     self.assert_equal(len(events), 1, "number of bfd events")
1286                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1287                     timeout_ok = True
1288                     break
1289             else:
1290                 raise Exception(ppp("Received unknown packet:", p))
1291             self.test_session.send_packet()
1292         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1293
1294     def test_admin_up_down(self):
1295         """ put session admin-up and admin-down """
1296         bfd_session_up(self)
1297         self.vpp_session.admin_down()
1298         self.pg0.enable_capture()
1299         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1300         verify_event(self, e, expected_state=BFDState.admin_down)
1301         for dummy in range(2):
1302             p = wait_for_bfd_packet(self)
1303             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1304         # try to bring session up - shouldn't be possible
1305         self.test_session.update(state=BFDState.init)
1306         self.test_session.send_packet()
1307         for dummy in range(2):
1308             p = wait_for_bfd_packet(self)
1309             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1310         self.vpp_session.admin_up()
1311         self.test_session.update(state=BFDState.down)
1312         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1313         verify_event(self, e, expected_state=BFDState.down)
1314         p = wait_for_bfd_packet(
1315             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1316         self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1317         self.test_session.send_packet()
1318         p = wait_for_bfd_packet(
1319             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1320         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1321         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1322         verify_event(self, e, expected_state=BFDState.init)
1323         self.test_session.update(state=BFDState.up)
1324         self.test_session.send_packet()
1325         p = wait_for_bfd_packet(
1326             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1327         self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1328         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1329         verify_event(self, e, expected_state=BFDState.up)
1330
1331     def test_config_change_remote_demand(self):
1332         """ configuration change while peer in demand mode """
1333         bfd_session_up(self)
1334         demand = self.test_session.create_packet()
1335         demand[BFD].flags = "D"
1336         self.test_session.send_packet(demand)
1337         self.vpp_session.modify_parameters(
1338             required_min_rx=2 * self.vpp_session.required_min_rx)
1339         p = wait_for_bfd_packet(
1340             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1341         # poll bit must be set
1342         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1343         # terminate poll sequence
1344         final = self.test_session.create_packet()
1345         final[BFD].flags = "D+F"
1346         self.test_session.send_packet(final)
1347         # vpp should be quiet now again
1348         transmit_time = 0.9 \
1349             * max(self.vpp_session.required_min_rx,
1350                   self.test_session.desired_min_tx) \
1351             / USEC_IN_SEC
1352         count = 0
1353         for dummy in range(self.test_session.detect_mult * 2):
1354             time.sleep(transmit_time)
1355             self.test_session.send_packet(demand)
1356             try:
1357                 p = wait_for_bfd_packet(self, timeout=0)
1358                 self.logger.error(ppp("Received unexpected packet:", p))
1359                 count += 1
1360             except CaptureTimeoutError:
1361                 pass
1362         events = self.vapi.collect_events()
1363         for e in events:
1364             self.logger.error("Received unexpected event: %s", e)
1365         self.assert_equal(count, 0, "number of packets received")
1366         self.assert_equal(len(events), 0, "number of events received")
1367
1368
1369 class BFD6TestCase(VppTestCase):
1370     """Bidirectional Forwarding Detection (BFD) (IPv6) """
1371
1372     pg0 = None
1373     vpp_clock_offset = None
1374     vpp_session = None
1375     test_session = None
1376
1377     @classmethod
1378     def setUpClass(cls):
1379         super(BFD6TestCase, cls).setUpClass()
1380         try:
1381             cls.create_pg_interfaces([0])
1382             cls.pg0.config_ip6()
1383             cls.pg0.configure_ipv6_neighbors()
1384             cls.pg0.admin_up()
1385             cls.pg0.resolve_ndp()
1386             cls.create_loopback_interfaces([0])
1387             cls.loopback0 = cls.lo_interfaces[0]
1388             cls.loopback0.config_ip6()
1389             cls.loopback0.admin_up()
1390
1391         except Exception:
1392             super(BFD6TestCase, cls).tearDownClass()
1393             raise
1394
1395     def setUp(self):
1396         super(BFD6TestCase, self).setUp()
1397         self.factory = AuthKeyFactory()
1398         self.vapi.want_bfd_events()
1399         self.pg0.enable_capture()
1400         try:
1401             self.vpp_session = VppBFDUDPSession(self, self.pg0,
1402                                                 self.pg0.remote_ip6,
1403                                                 af=AF_INET6)
1404             self.vpp_session.add_vpp_config()
1405             self.vpp_session.admin_up()
1406             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1407             self.logger.debug(self.vapi.cli("show adj nbr"))
1408         except:
1409             self.vapi.want_bfd_events(enable_disable=0)
1410             raise
1411
1412     def tearDown(self):
1413         if not self.vpp_dead:
1414             self.vapi.want_bfd_events(enable_disable=0)
1415         self.vapi.collect_events()  # clear the event queue
1416         super(BFD6TestCase, self).tearDown()
1417
1418     def test_session_up(self):
1419         """ bring BFD session up """
1420         bfd_session_up(self)
1421
1422     def test_session_up_by_ip(self):
1423         """ bring BFD session up - first frame looked up by address pair """
1424         self.logger.info("BFD: Sending Slow control frame")
1425         self.test_session.update(my_discriminator=randint(0, 40000000))
1426         self.test_session.send_packet()
1427         self.pg0.enable_capture()
1428         p = self.pg0.wait_for_packet(1)
1429         self.assert_equal(p[BFD].your_discriminator,
1430                           self.test_session.my_discriminator,
1431                           "BFD - your discriminator")
1432         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1433         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1434                                  state=BFDState.up)
1435         self.logger.info("BFD: Waiting for event")
1436         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1437         verify_event(self, e, expected_state=BFDState.init)
1438         self.logger.info("BFD: Sending Up")
1439         self.test_session.send_packet()
1440         self.logger.info("BFD: Waiting for event")
1441         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1442         verify_event(self, e, expected_state=BFDState.up)
1443         self.logger.info("BFD: Session is Up")
1444         self.test_session.update(state=BFDState.up)
1445         self.test_session.send_packet()
1446         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1447
1448     def test_hold_up(self):
1449         """ hold BFD session up """
1450         bfd_session_up(self)
1451         for dummy in range(self.test_session.detect_mult * 2):
1452             wait_for_bfd_packet(self)
1453             self.test_session.send_packet()
1454         self.assert_equal(len(self.vapi.collect_events()), 0,
1455                           "number of bfd events")
1456         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1457
1458     def test_echo_looped_back(self):
1459         """ echo packets looped back """
1460         # don't need a session in this case..
1461         self.vpp_session.remove_vpp_config()
1462         self.pg0.enable_capture()
1463         echo_packet_count = 10
1464         # random source port low enough to increment a few times..
1465         udp_sport_tx = randint(1, 50000)
1466         udp_sport_rx = udp_sport_tx
1467         echo_packet = (Ether(src=self.pg0.remote_mac,
1468                              dst=self.pg0.local_mac) /
1469                        IPv6(src=self.pg0.remote_ip6,
1470                             dst=self.pg0.remote_ip6) /
1471                        UDP(dport=BFD.udp_dport_echo) /
1472                        Raw("this should be looped back"))
1473         for dummy in range(echo_packet_count):
1474             self.sleep(.01, "delay between echo packets")
1475             echo_packet[UDP].sport = udp_sport_tx
1476             udp_sport_tx += 1
1477             self.logger.debug(ppp("Sending packet:", echo_packet))
1478             self.pg0.add_stream(echo_packet)
1479             self.pg_start()
1480         for dummy in range(echo_packet_count):
1481             p = self.pg0.wait_for_packet(1)
1482             self.logger.debug(ppp("Got packet:", p))
1483             ether = p[Ether]
1484             self.assert_equal(self.pg0.remote_mac,
1485                               ether.dst, "Destination MAC")
1486             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1487             ip = p[IPv6]
1488             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1489             self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1490             udp = p[UDP]
1491             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1492                               "UDP destination port")
1493             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1494             udp_sport_rx += 1
1495             # need to compare the hex payload here, otherwise BFD_vpp_echo
1496             # gets in way
1497             self.assertEqual(str(p[UDP].payload),
1498                              str(echo_packet[UDP].payload),
1499                              "Received packet is not the echo packet sent")
1500         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1501                           "ECHO packet identifier for test purposes)")
1502         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1503                           "ECHO packet identifier for test purposes)")
1504
1505     def test_echo(self):
1506         """ echo function used """
1507         bfd_session_up(self)
1508         self.test_session.update(required_min_echo_rx=50000)
1509         self.test_session.send_packet()
1510         detection_time = self.test_session.detect_mult *\
1511             self.vpp_session.required_min_rx / USEC_IN_SEC
1512         # echo shouldn't work without echo source set
1513         for dummy in range(3):
1514             sleep = 0.75 * detection_time
1515             self.sleep(sleep, "delay before sending bfd packet")
1516             self.test_session.send_packet()
1517         p = wait_for_bfd_packet(
1518             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1519         self.assert_equal(p[BFD].required_min_rx_interval,
1520                           self.vpp_session.required_min_rx,
1521                           "BFD required min rx interval")
1522         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1523         # should be turned on - loopback echo packets
1524         for dummy in range(3):
1525             loop_until = time.time() + 0.75 * detection_time
1526             while time.time() < loop_until:
1527                 p = self.pg0.wait_for_packet(1)
1528                 self.logger.debug(ppp("Got packet:", p))
1529                 if p[UDP].dport == BFD.udp_dport_echo:
1530                     self.assert_equal(
1531                         p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1532                     self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1533                                         "BFD ECHO src IP equal to loopback IP")
1534                     self.logger.debug(ppp("Looping back packet:", p))
1535                     self.pg0.add_stream(p)
1536                     self.pg_start()
1537                 elif p.haslayer(BFD):
1538                     self.assertGreaterEqual(p[BFD].required_min_rx_interval,
1539                                             1000000)
1540                     if "P" in p.sprintf("%BFD.flags%"):
1541                         final = self.test_session.create_packet()
1542                         final[BFD].flags = "F"
1543                         self.test_session.send_packet(final)
1544                 else:
1545                     raise Exception(ppp("Received unknown packet:", p))
1546
1547                 self.assert_equal(len(self.vapi.collect_events()), 0,
1548                                   "number of bfd events")
1549             self.test_session.send_packet()
1550
1551
1552 class BFDSHA1TestCase(VppTestCase):
1553     """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1554
1555     pg0 = None
1556     vpp_clock_offset = None
1557     vpp_session = None
1558     test_session = None
1559
1560     @classmethod
1561     def setUpClass(cls):
1562         super(BFDSHA1TestCase, cls).setUpClass()
1563         try:
1564             cls.create_pg_interfaces([0])
1565             cls.pg0.config_ip4()
1566             cls.pg0.admin_up()
1567             cls.pg0.resolve_arp()
1568
1569         except Exception:
1570             super(BFDSHA1TestCase, cls).tearDownClass()
1571             raise
1572
1573     def setUp(self):
1574         super(BFDSHA1TestCase, self).setUp()
1575         self.factory = AuthKeyFactory()
1576         self.vapi.want_bfd_events()
1577         self.pg0.enable_capture()
1578
1579     def tearDown(self):
1580         if not self.vpp_dead:
1581             self.vapi.want_bfd_events(enable_disable=0)
1582         self.vapi.collect_events()  # clear the event queue
1583         super(BFDSHA1TestCase, self).tearDown()
1584
1585     def test_session_up(self):
1586         """ bring BFD session up """
1587         key = self.factory.create_random_key(self)
1588         key.add_vpp_config()
1589         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1590                                             self.pg0.remote_ip4,
1591                                             sha1_key=key)
1592         self.vpp_session.add_vpp_config()
1593         self.vpp_session.admin_up()
1594         self.test_session = BFDTestSession(
1595             self, self.pg0, AF_INET, sha1_key=key,
1596             bfd_key_id=self.vpp_session.bfd_key_id)
1597         bfd_session_up(self)
1598
1599     def test_hold_up(self):
1600         """ hold BFD session up """
1601         key = self.factory.create_random_key(self)
1602         key.add_vpp_config()
1603         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1604                                             self.pg0.remote_ip4,
1605                                             sha1_key=key)
1606         self.vpp_session.add_vpp_config()
1607         self.vpp_session.admin_up()
1608         self.test_session = BFDTestSession(
1609             self, self.pg0, AF_INET, sha1_key=key,
1610             bfd_key_id=self.vpp_session.bfd_key_id)
1611         bfd_session_up(self)
1612         for dummy in range(self.test_session.detect_mult * 2):
1613             wait_for_bfd_packet(self)
1614             self.test_session.send_packet()
1615         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1616
1617     def test_hold_up_meticulous(self):
1618         """ hold BFD session up - meticulous auth """
1619         key = self.factory.create_random_key(
1620             self, BFDAuthType.meticulous_keyed_sha1)
1621         key.add_vpp_config()
1622         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1623                                             self.pg0.remote_ip4, sha1_key=key)
1624         self.vpp_session.add_vpp_config()
1625         self.vpp_session.admin_up()
1626         # specify sequence number so that it wraps
1627         self.test_session = BFDTestSession(
1628             self, self.pg0, AF_INET, sha1_key=key,
1629             bfd_key_id=self.vpp_session.bfd_key_id,
1630             our_seq_number=0xFFFFFFFF - 4)
1631         bfd_session_up(self)
1632         for dummy in range(30):
1633             wait_for_bfd_packet(self)
1634             self.test_session.inc_seq_num()
1635             self.test_session.send_packet()
1636         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1637
1638     def test_send_bad_seq_number(self):
1639         """ session is not kept alive by msgs with bad sequence numbers"""
1640         key = self.factory.create_random_key(
1641             self, BFDAuthType.meticulous_keyed_sha1)
1642         key.add_vpp_config()
1643         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1644                                             self.pg0.remote_ip4, sha1_key=key)
1645         self.vpp_session.add_vpp_config()
1646         self.test_session = BFDTestSession(
1647             self, self.pg0, AF_INET, sha1_key=key,
1648             bfd_key_id=self.vpp_session.bfd_key_id)
1649         bfd_session_up(self)
1650         detection_time = self.test_session.detect_mult *\
1651             self.vpp_session.required_min_rx / USEC_IN_SEC
1652         send_until = time.time() + 2 * detection_time
1653         while time.time() < send_until:
1654             self.test_session.send_packet()
1655             self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1656                        "time between bfd packets")
1657         e = self.vapi.collect_events()
1658         # session should be down now, because the sequence numbers weren't
1659         # updated
1660         self.assert_equal(len(e), 1, "number of bfd events")
1661         verify_event(self, e[0], expected_state=BFDState.down)
1662
1663     def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1664                                        legitimate_test_session,
1665                                        rogue_test_session,
1666                                        rogue_bfd_values=None):
1667         """ execute a rogue session interaction scenario
1668
1669         1. create vpp session, add config
1670         2. bring the legitimate session up
1671         3. copy the bfd values from legitimate session to rogue session
1672         4. apply rogue_bfd_values to rogue session
1673         5. set rogue session state to down
1674         6. send message to take the session down from the rogue session
1675         7. assert that the legitimate session is unaffected
1676         """
1677
1678         self.vpp_session = vpp_bfd_udp_session
1679         self.vpp_session.add_vpp_config()
1680         self.test_session = legitimate_test_session
1681         # bring vpp session up
1682         bfd_session_up(self)
1683         # send packet from rogue session
1684         rogue_test_session.update(
1685             my_discriminator=self.test_session.my_discriminator,
1686             your_discriminator=self.test_session.your_discriminator,
1687             desired_min_tx=self.test_session.desired_min_tx,
1688             required_min_rx=self.test_session.required_min_rx,
1689             detect_mult=self.test_session.detect_mult,
1690             diag=self.test_session.diag,
1691             state=self.test_session.state,
1692             auth_type=self.test_session.auth_type)
1693         if rogue_bfd_values:
1694             rogue_test_session.update(**rogue_bfd_values)
1695         rogue_test_session.update(state=BFDState.down)
1696         rogue_test_session.send_packet()
1697         wait_for_bfd_packet(self)
1698         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1699
1700     def test_mismatch_auth(self):
1701         """ session is not brought down by unauthenticated msg """
1702         key = self.factory.create_random_key(self)
1703         key.add_vpp_config()
1704         vpp_session = VppBFDUDPSession(
1705             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1706         legitimate_test_session = BFDTestSession(
1707             self, self.pg0, AF_INET, sha1_key=key,
1708             bfd_key_id=vpp_session.bfd_key_id)
1709         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
1710         self.execute_rogue_session_scenario(vpp_session,
1711                                             legitimate_test_session,
1712                                             rogue_test_session)
1713
1714     def test_mismatch_bfd_key_id(self):
1715         """ session is not brought down by msg with non-existent key-id """
1716         key = self.factory.create_random_key(self)
1717         key.add_vpp_config()
1718         vpp_session = VppBFDUDPSession(
1719             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1720         # pick a different random bfd key id
1721         x = randint(0, 255)
1722         while x == vpp_session.bfd_key_id:
1723             x = randint(0, 255)
1724         legitimate_test_session = BFDTestSession(
1725             self, self.pg0, AF_INET, sha1_key=key,
1726             bfd_key_id=vpp_session.bfd_key_id)
1727         rogue_test_session = BFDTestSession(
1728             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
1729         self.execute_rogue_session_scenario(vpp_session,
1730                                             legitimate_test_session,
1731                                             rogue_test_session)
1732
1733     def test_mismatched_auth_type(self):
1734         """ session is not brought down by msg with wrong auth type """
1735         key = self.factory.create_random_key(self)
1736         key.add_vpp_config()
1737         vpp_session = VppBFDUDPSession(
1738             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1739         legitimate_test_session = BFDTestSession(
1740             self, self.pg0, AF_INET, sha1_key=key,
1741             bfd_key_id=vpp_session.bfd_key_id)
1742         rogue_test_session = BFDTestSession(
1743             self, self.pg0, AF_INET, sha1_key=key,
1744             bfd_key_id=vpp_session.bfd_key_id)
1745         self.execute_rogue_session_scenario(
1746             vpp_session, legitimate_test_session, rogue_test_session,
1747             {'auth_type': BFDAuthType.keyed_md5})
1748
1749     def test_restart(self):
1750         """ simulate remote peer restart and resynchronization """
1751         key = self.factory.create_random_key(
1752             self, BFDAuthType.meticulous_keyed_sha1)
1753         key.add_vpp_config()
1754         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1755                                             self.pg0.remote_ip4, sha1_key=key)
1756         self.vpp_session.add_vpp_config()
1757         self.test_session = BFDTestSession(
1758             self, self.pg0, AF_INET, sha1_key=key,
1759             bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
1760         bfd_session_up(self)
1761         # don't send any packets for 2*detection_time
1762         detection_time = self.test_session.detect_mult *\
1763             self.vpp_session.required_min_rx / USEC_IN_SEC
1764         self.sleep(2*detection_time, "simulating peer restart")
1765         events = self.vapi.collect_events()
1766         self.assert_equal(len(events), 1, "number of bfd events")
1767         verify_event(self, events[0], expected_state=BFDState.down)
1768         self.test_session.update(state=BFDState.down)
1769         # reset sequence number
1770         self.test_session.our_seq_number = 0
1771         self.test_session.vpp_seq_number = None
1772         # now throw away any pending packets
1773         self.pg0.enable_capture()
1774         bfd_session_up(self)
1775
1776
1777 class BFDAuthOnOffTestCase(VppTestCase):
1778     """Bidirectional Forwarding Detection (BFD) (changing auth) """
1779
1780     pg0 = None
1781     vpp_session = None
1782     test_session = None
1783
1784     @classmethod
1785     def setUpClass(cls):
1786         super(BFDAuthOnOffTestCase, cls).setUpClass()
1787         try:
1788             cls.create_pg_interfaces([0])
1789             cls.pg0.config_ip4()
1790             cls.pg0.admin_up()
1791             cls.pg0.resolve_arp()
1792
1793         except Exception:
1794             super(BFDAuthOnOffTestCase, cls).tearDownClass()
1795             raise
1796
1797     def setUp(self):
1798         super(BFDAuthOnOffTestCase, self).setUp()
1799         self.factory = AuthKeyFactory()
1800         self.vapi.want_bfd_events()
1801         self.pg0.enable_capture()
1802
1803     def tearDown(self):
1804         if not self.vpp_dead:
1805             self.vapi.want_bfd_events(enable_disable=0)
1806         self.vapi.collect_events()  # clear the event queue
1807         super(BFDAuthOnOffTestCase, self).tearDown()
1808
1809     def test_auth_on_immediate(self):
1810         """ turn auth on without disturbing session state (immediate) """
1811         key = self.factory.create_random_key(self)
1812         key.add_vpp_config()
1813         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1814                                             self.pg0.remote_ip4)
1815         self.vpp_session.add_vpp_config()
1816         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1817         bfd_session_up(self)
1818         for dummy in range(self.test_session.detect_mult * 2):
1819             p = wait_for_bfd_packet(self)
1820             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1821             self.test_session.send_packet()
1822         self.vpp_session.activate_auth(key)
1823         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1824         self.test_session.sha1_key = key
1825         for dummy in range(self.test_session.detect_mult * 2):
1826             p = wait_for_bfd_packet(self)
1827             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1828             self.test_session.send_packet()
1829         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1830         self.assert_equal(len(self.vapi.collect_events()), 0,
1831                           "number of bfd events")
1832
1833     def test_auth_off_immediate(self):
1834         """ turn auth off without disturbing session state (immediate) """
1835         key = self.factory.create_random_key(self)
1836         key.add_vpp_config()
1837         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1838                                             self.pg0.remote_ip4, sha1_key=key)
1839         self.vpp_session.add_vpp_config()
1840         self.test_session = BFDTestSession(
1841             self, self.pg0, AF_INET, sha1_key=key,
1842             bfd_key_id=self.vpp_session.bfd_key_id)
1843         bfd_session_up(self)
1844         # self.vapi.want_bfd_events(enable_disable=0)
1845         for dummy in range(self.test_session.detect_mult * 2):
1846             p = wait_for_bfd_packet(self)
1847             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1848             self.test_session.inc_seq_num()
1849             self.test_session.send_packet()
1850         self.vpp_session.deactivate_auth()
1851         self.test_session.bfd_key_id = None
1852         self.test_session.sha1_key = None
1853         for dummy in range(self.test_session.detect_mult * 2):
1854             p = wait_for_bfd_packet(self)
1855             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1856             self.test_session.inc_seq_num()
1857             self.test_session.send_packet()
1858         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1859         self.assert_equal(len(self.vapi.collect_events()), 0,
1860                           "number of bfd events")
1861
1862     def test_auth_change_key_immediate(self):
1863         """ change auth key without disturbing session state (immediate) """
1864         key1 = self.factory.create_random_key(self)
1865         key1.add_vpp_config()
1866         key2 = self.factory.create_random_key(self)
1867         key2.add_vpp_config()
1868         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1869                                             self.pg0.remote_ip4, sha1_key=key1)
1870         self.vpp_session.add_vpp_config()
1871         self.test_session = BFDTestSession(
1872             self, self.pg0, AF_INET, sha1_key=key1,
1873             bfd_key_id=self.vpp_session.bfd_key_id)
1874         bfd_session_up(self)
1875         for dummy in range(self.test_session.detect_mult * 2):
1876             p = wait_for_bfd_packet(self)
1877             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1878             self.test_session.send_packet()
1879         self.vpp_session.activate_auth(key2)
1880         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1881         self.test_session.sha1_key = key2
1882         for dummy in range(self.test_session.detect_mult * 2):
1883             p = wait_for_bfd_packet(self)
1884             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1885             self.test_session.send_packet()
1886         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1887         self.assert_equal(len(self.vapi.collect_events()), 0,
1888                           "number of bfd events")
1889
1890     def test_auth_on_delayed(self):
1891         """ turn auth on without disturbing session state (delayed) """
1892         key = self.factory.create_random_key(self)
1893         key.add_vpp_config()
1894         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1895                                             self.pg0.remote_ip4)
1896         self.vpp_session.add_vpp_config()
1897         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1898         bfd_session_up(self)
1899         for dummy in range(self.test_session.detect_mult * 2):
1900             wait_for_bfd_packet(self)
1901             self.test_session.send_packet()
1902         self.vpp_session.activate_auth(key, delayed=True)
1903         for dummy in range(self.test_session.detect_mult * 2):
1904             p = wait_for_bfd_packet(self)
1905             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1906             self.test_session.send_packet()
1907         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1908         self.test_session.sha1_key = key
1909         self.test_session.send_packet()
1910         for dummy in range(self.test_session.detect_mult * 2):
1911             p = wait_for_bfd_packet(self)
1912             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1913             self.test_session.send_packet()
1914         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1915         self.assert_equal(len(self.vapi.collect_events()), 0,
1916                           "number of bfd events")
1917
1918     def test_auth_off_delayed(self):
1919         """ turn auth off without disturbing session state (delayed) """
1920         key = self.factory.create_random_key(self)
1921         key.add_vpp_config()
1922         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1923                                             self.pg0.remote_ip4, sha1_key=key)
1924         self.vpp_session.add_vpp_config()
1925         self.test_session = BFDTestSession(
1926             self, self.pg0, AF_INET, sha1_key=key,
1927             bfd_key_id=self.vpp_session.bfd_key_id)
1928         bfd_session_up(self)
1929         for dummy in range(self.test_session.detect_mult * 2):
1930             p = wait_for_bfd_packet(self)
1931             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1932             self.test_session.send_packet()
1933         self.vpp_session.deactivate_auth(delayed=True)
1934         for dummy in range(self.test_session.detect_mult * 2):
1935             p = wait_for_bfd_packet(self)
1936             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1937             self.test_session.send_packet()
1938         self.test_session.bfd_key_id = None
1939         self.test_session.sha1_key = None
1940         self.test_session.send_packet()
1941         for dummy in range(self.test_session.detect_mult * 2):
1942             p = wait_for_bfd_packet(self)
1943             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1944             self.test_session.send_packet()
1945         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1946         self.assert_equal(len(self.vapi.collect_events()), 0,
1947                           "number of bfd events")
1948
1949     def test_auth_change_key_delayed(self):
1950         """ change auth key without disturbing session state (delayed) """
1951         key1 = self.factory.create_random_key(self)
1952         key1.add_vpp_config()
1953         key2 = self.factory.create_random_key(self)
1954         key2.add_vpp_config()
1955         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1956                                             self.pg0.remote_ip4, sha1_key=key1)
1957         self.vpp_session.add_vpp_config()
1958         self.vpp_session.admin_up()
1959         self.test_session = BFDTestSession(
1960             self, self.pg0, AF_INET, sha1_key=key1,
1961             bfd_key_id=self.vpp_session.bfd_key_id)
1962         bfd_session_up(self)
1963         for dummy in range(self.test_session.detect_mult * 2):
1964             p = wait_for_bfd_packet(self)
1965             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1966             self.test_session.send_packet()
1967         self.vpp_session.activate_auth(key2, delayed=True)
1968         for dummy in range(self.test_session.detect_mult * 2):
1969             p = wait_for_bfd_packet(self)
1970             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1971             self.test_session.send_packet()
1972         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1973         self.test_session.sha1_key = key2
1974         self.test_session.send_packet()
1975         for dummy in range(self.test_session.detect_mult * 2):
1976             p = wait_for_bfd_packet(self)
1977             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1978             self.test_session.send_packet()
1979         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1980         self.assert_equal(len(self.vapi.collect_events()), 0,
1981                           "number of bfd events")
1982
1983
1984 class BFDCLITestCase(VppTestCase):
1985     """Bidirectional Forwarding Detection (BFD) (CLI) """
1986     pg0 = None
1987
1988     @classmethod
1989     def setUpClass(cls):
1990         super(BFDCLITestCase, cls).setUpClass()
1991
1992         try:
1993             cls.create_pg_interfaces((0,))
1994             cls.pg0.config_ip4()
1995             cls.pg0.config_ip6()
1996             cls.pg0.resolve_arp()
1997             cls.pg0.resolve_ndp()
1998
1999         except Exception:
2000             super(BFDCLITestCase, cls).tearDownClass()
2001             raise
2002
2003     def setUp(self):
2004         super(BFDCLITestCase, self).setUp()
2005         self.factory = AuthKeyFactory()
2006         self.pg0.enable_capture()
2007
2008     def tearDown(self):
2009         try:
2010             self.vapi.want_bfd_events(enable_disable=0)
2011         except UnexpectedApiReturnValueError:
2012             # some tests aren't subscribed, so this is not an issue
2013             pass
2014         self.vapi.collect_events()  # clear the event queue
2015         super(BFDCLITestCase, self).tearDown()
2016
2017     def cli_verify_no_response(self, cli):
2018         """ execute a CLI, asserting that the response is empty """
2019         self.assert_equal(self.vapi.cli(cli),
2020                           "",
2021                           "CLI command response")
2022
2023     def cli_verify_response(self, cli, expected):
2024         """ execute a CLI, asserting that the response matches expectation """
2025         self.assert_equal(self.vapi.cli(cli).strip(),
2026                           expected,
2027                           "CLI command response")
2028
2029     def test_show(self):
2030         """ show commands """
2031         k1 = self.factory.create_random_key(self)
2032         k1.add_vpp_config()
2033         k2 = self.factory.create_random_key(
2034             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2035         k2.add_vpp_config()
2036         s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2037         s1.add_vpp_config()
2038         s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2039                               sha1_key=k2)
2040         s2.add_vpp_config()
2041         self.logger.info(self.vapi.ppcli("show bfd keys"))
2042         self.logger.info(self.vapi.ppcli("show bfd sessions"))
2043         self.logger.info(self.vapi.ppcli("show bfd"))
2044
2045     def test_set_del_sha1_key(self):
2046         """ set/delete SHA1 auth key """
2047         k = self.factory.create_random_key(self)
2048         self.registry.register(k, self.logger)
2049         self.cli_verify_no_response(
2050             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2051             (k.conf_key_id,
2052                 "".join("{:02x}".format(ord(c)) for c in k.key)))
2053         self.assertTrue(k.query_vpp_config())
2054         self.vpp_session = VppBFDUDPSession(
2055             self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2056         self.vpp_session.add_vpp_config()
2057         self.test_session = \
2058             BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2059                            bfd_key_id=self.vpp_session.bfd_key_id)
2060         self.vapi.want_bfd_events()
2061         bfd_session_up(self)
2062         bfd_session_down(self)
2063         # try to replace the secret for the key - should fail because the key
2064         # is in-use
2065         k2 = self.factory.create_random_key(self)
2066         self.cli_verify_response(
2067             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2068             (k.conf_key_id,
2069                 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2070             "bfd key set: `bfd_auth_set_key' API call failed, "
2071             "rv=-103:BFD object in use")
2072         # manipulating the session using old secret should still work
2073         bfd_session_up(self)
2074         bfd_session_down(self)
2075         self.vpp_session.remove_vpp_config()
2076         self.cli_verify_no_response(
2077             "bfd key del conf-key-id %s" % k.conf_key_id)
2078         self.assertFalse(k.query_vpp_config())
2079
2080     def test_set_del_meticulous_sha1_key(self):
2081         """ set/delete meticulous SHA1 auth key """
2082         k = self.factory.create_random_key(
2083             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2084         self.registry.register(k, self.logger)
2085         self.cli_verify_no_response(
2086             "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2087             (k.conf_key_id,
2088                 "".join("{:02x}".format(ord(c)) for c in k.key)))
2089         self.assertTrue(k.query_vpp_config())
2090         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2091                                             self.pg0.remote_ip6, af=AF_INET6,
2092                                             sha1_key=k)
2093         self.vpp_session.add_vpp_config()
2094         self.vpp_session.admin_up()
2095         self.test_session = \
2096             BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2097                            bfd_key_id=self.vpp_session.bfd_key_id)
2098         self.vapi.want_bfd_events()
2099         bfd_session_up(self)
2100         bfd_session_down(self)
2101         # try to replace the secret for the key - should fail because the key
2102         # is in-use
2103         k2 = self.factory.create_random_key(self)
2104         self.cli_verify_response(
2105             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2106             (k.conf_key_id,
2107                 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2108             "bfd key set: `bfd_auth_set_key' API call failed, "
2109             "rv=-103:BFD object in use")
2110         # manipulating the session using old secret should still work
2111         bfd_session_up(self)
2112         bfd_session_down(self)
2113         self.vpp_session.remove_vpp_config()
2114         self.cli_verify_no_response(
2115             "bfd key del conf-key-id %s" % k.conf_key_id)
2116         self.assertFalse(k.query_vpp_config())
2117
2118     def test_add_mod_del_bfd_udp(self):
2119         """ create/modify/delete IPv4 BFD UDP session """
2120         vpp_session = VppBFDUDPSession(
2121             self, self.pg0, self.pg0.remote_ip4)
2122         self.registry.register(vpp_session, self.logger)
2123         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2124             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2125             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2126                                 self.pg0.remote_ip4,
2127                                 vpp_session.desired_min_tx,
2128                                 vpp_session.required_min_rx,
2129                                 vpp_session.detect_mult)
2130         self.cli_verify_no_response(cli_add_cmd)
2131         # 2nd add should fail
2132         self.cli_verify_response(
2133             cli_add_cmd,
2134             "bfd udp session add: `bfd_add_add_session' API call"
2135             " failed, rv=-101:Duplicate BFD object")
2136         verify_bfd_session_config(self, vpp_session)
2137         mod_session = VppBFDUDPSession(
2138             self, self.pg0, self.pg0.remote_ip4,
2139             required_min_rx=2 * vpp_session.required_min_rx,
2140             desired_min_tx=3 * vpp_session.desired_min_tx,
2141             detect_mult=4 * vpp_session.detect_mult)
2142         self.cli_verify_no_response(
2143             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2144             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2145             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2146              mod_session.desired_min_tx, mod_session.required_min_rx,
2147              mod_session.detect_mult))
2148         verify_bfd_session_config(self, mod_session)
2149         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2150             "peer-addr %s" % (self.pg0.name,
2151                               self.pg0.local_ip4, self.pg0.remote_ip4)
2152         self.cli_verify_no_response(cli_del_cmd)
2153         # 2nd del is expected to fail
2154         self.cli_verify_response(
2155             cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2156             " failed, rv=-102:No such BFD object")
2157         self.assertFalse(vpp_session.query_vpp_config())
2158
2159     def test_add_mod_del_bfd_udp6(self):
2160         """ create/modify/delete IPv6 BFD UDP session """
2161         vpp_session = VppBFDUDPSession(
2162             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2163         self.registry.register(vpp_session, self.logger)
2164         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2165             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2166             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2167                                 self.pg0.remote_ip6,
2168                                 vpp_session.desired_min_tx,
2169                                 vpp_session.required_min_rx,
2170                                 vpp_session.detect_mult)
2171         self.cli_verify_no_response(cli_add_cmd)
2172         # 2nd add should fail
2173         self.cli_verify_response(
2174             cli_add_cmd,
2175             "bfd udp session add: `bfd_add_add_session' API call"
2176             " failed, rv=-101:Duplicate BFD object")
2177         verify_bfd_session_config(self, vpp_session)
2178         mod_session = VppBFDUDPSession(
2179             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2180             required_min_rx=2 * vpp_session.required_min_rx,
2181             desired_min_tx=3 * vpp_session.desired_min_tx,
2182             detect_mult=4 * vpp_session.detect_mult)
2183         self.cli_verify_no_response(
2184             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2185             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2186             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2187              mod_session.desired_min_tx,
2188              mod_session.required_min_rx, mod_session.detect_mult))
2189         verify_bfd_session_config(self, mod_session)
2190         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2191             "peer-addr %s" % (self.pg0.name,
2192                               self.pg0.local_ip6, self.pg0.remote_ip6)
2193         self.cli_verify_no_response(cli_del_cmd)
2194         # 2nd del is expected to fail
2195         self.cli_verify_response(
2196             cli_del_cmd,
2197             "bfd udp session del: `bfd_udp_del_session' API call"
2198             " failed, rv=-102:No such BFD object")
2199         self.assertFalse(vpp_session.query_vpp_config())
2200
2201     def test_add_mod_del_bfd_udp_auth(self):
2202         """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2203         key = self.factory.create_random_key(self)
2204         key.add_vpp_config()
2205         vpp_session = VppBFDUDPSession(
2206             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2207         self.registry.register(vpp_session, self.logger)
2208         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2209             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2210             "detect-mult %s conf-key-id %s bfd-key-id %s"\
2211             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2212                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2213                vpp_session.detect_mult, key.conf_key_id,
2214                vpp_session.bfd_key_id)
2215         self.cli_verify_no_response(cli_add_cmd)
2216         # 2nd add should fail
2217         self.cli_verify_response(
2218             cli_add_cmd,
2219             "bfd udp session add: `bfd_add_add_session' API call"
2220             " failed, rv=-101:Duplicate BFD object")
2221         verify_bfd_session_config(self, vpp_session)
2222         mod_session = VppBFDUDPSession(
2223             self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2224             bfd_key_id=vpp_session.bfd_key_id,
2225             required_min_rx=2 * vpp_session.required_min_rx,
2226             desired_min_tx=3 * vpp_session.desired_min_tx,
2227             detect_mult=4 * vpp_session.detect_mult)
2228         self.cli_verify_no_response(
2229             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2230             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2231             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2232              mod_session.desired_min_tx,
2233              mod_session.required_min_rx, mod_session.detect_mult))
2234         verify_bfd_session_config(self, mod_session)
2235         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2236             "peer-addr %s" % (self.pg0.name,
2237                               self.pg0.local_ip4, self.pg0.remote_ip4)
2238         self.cli_verify_no_response(cli_del_cmd)
2239         # 2nd del is expected to fail
2240         self.cli_verify_response(
2241             cli_del_cmd,
2242             "bfd udp session del: `bfd_udp_del_session' API call"
2243             " failed, rv=-102:No such BFD object")
2244         self.assertFalse(vpp_session.query_vpp_config())
2245
2246     def test_add_mod_del_bfd_udp6_auth(self):
2247         """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2248         key = self.factory.create_random_key(
2249             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2250         key.add_vpp_config()
2251         vpp_session = VppBFDUDPSession(
2252             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2253         self.registry.register(vpp_session, self.logger)
2254         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2255             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2256             "detect-mult %s conf-key-id %s bfd-key-id %s" \
2257             % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2258                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2259                vpp_session.detect_mult, key.conf_key_id,
2260                vpp_session.bfd_key_id)
2261         self.cli_verify_no_response(cli_add_cmd)
2262         # 2nd add should fail
2263         self.cli_verify_response(
2264             cli_add_cmd,
2265             "bfd udp session add: `bfd_add_add_session' API call"
2266             " failed, rv=-101:Duplicate BFD object")
2267         verify_bfd_session_config(self, vpp_session)
2268         mod_session = VppBFDUDPSession(
2269             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2270             bfd_key_id=vpp_session.bfd_key_id,
2271             required_min_rx=2 * vpp_session.required_min_rx,
2272             desired_min_tx=3 * vpp_session.desired_min_tx,
2273             detect_mult=4 * vpp_session.detect_mult)
2274         self.cli_verify_no_response(
2275             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2276             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2277             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2278              mod_session.desired_min_tx,
2279              mod_session.required_min_rx, mod_session.detect_mult))
2280         verify_bfd_session_config(self, mod_session)
2281         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2282             "peer-addr %s" % (self.pg0.name,
2283                               self.pg0.local_ip6, self.pg0.remote_ip6)
2284         self.cli_verify_no_response(cli_del_cmd)
2285         # 2nd del is expected to fail
2286         self.cli_verify_response(
2287             cli_del_cmd,
2288             "bfd udp session del: `bfd_udp_del_session' API call"
2289             " failed, rv=-102:No such BFD object")
2290         self.assertFalse(vpp_session.query_vpp_config())
2291
2292     def test_auth_on_off(self):
2293         """ turn authentication on and off """
2294         key = self.factory.create_random_key(
2295             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2296         key.add_vpp_config()
2297         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2298         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2299                                         sha1_key=key)
2300         session.add_vpp_config()
2301         cli_activate = \
2302             "bfd udp session auth activate interface %s local-addr %s "\
2303             "peer-addr %s conf-key-id %s bfd-key-id %s"\
2304             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2305                key.conf_key_id, auth_session.bfd_key_id)
2306         self.cli_verify_no_response(cli_activate)
2307         verify_bfd_session_config(self, auth_session)
2308         self.cli_verify_no_response(cli_activate)
2309         verify_bfd_session_config(self, auth_session)
2310         cli_deactivate = \
2311             "bfd udp session auth deactivate interface %s local-addr %s "\
2312             "peer-addr %s "\
2313             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2314         self.cli_verify_no_response(cli_deactivate)
2315         verify_bfd_session_config(self, session)
2316         self.cli_verify_no_response(cli_deactivate)
2317         verify_bfd_session_config(self, session)
2318
2319     def test_auth_on_off_delayed(self):
2320         """ turn authentication on and off (delayed) """
2321         key = self.factory.create_random_key(
2322             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2323         key.add_vpp_config()
2324         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2325         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2326                                         sha1_key=key)
2327         session.add_vpp_config()
2328         cli_activate = \
2329             "bfd udp session auth activate interface %s local-addr %s "\
2330             "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2331             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2332                key.conf_key_id, auth_session.bfd_key_id)
2333         self.cli_verify_no_response(cli_activate)
2334         verify_bfd_session_config(self, auth_session)
2335         self.cli_verify_no_response(cli_activate)
2336         verify_bfd_session_config(self, auth_session)
2337         cli_deactivate = \
2338             "bfd udp session auth deactivate interface %s local-addr %s "\
2339             "peer-addr %s delayed yes"\
2340             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2341         self.cli_verify_no_response(cli_deactivate)
2342         verify_bfd_session_config(self, session)
2343         self.cli_verify_no_response(cli_deactivate)
2344         verify_bfd_session_config(self, session)
2345
2346     def test_admin_up_down(self):
2347         """ put session admin-up and admin-down """
2348         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2349         session.add_vpp_config()
2350         cli_down = \
2351             "bfd udp session set-flags admin down interface %s local-addr %s "\
2352             "peer-addr %s "\
2353             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2354         cli_up = \
2355             "bfd udp session set-flags admin up interface %s local-addr %s "\
2356             "peer-addr %s "\
2357             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2358         self.cli_verify_no_response(cli_down)
2359         verify_bfd_session_config(self, session, state=BFDState.admin_down)
2360         self.cli_verify_no_response(cli_up)
2361         verify_bfd_session_config(self, session, state=BFDState.down)
2362
2363     def test_set_del_udp_echo_source(self):
2364         """ set/del udp echo source """
2365         self.create_loopback_interfaces([0])
2366         self.loopback0 = self.lo_interfaces[0]
2367         self.loopback0.admin_up()
2368         self.cli_verify_response("show bfd echo-source",
2369                                  "UDP echo source is not set.")
2370         cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2371         self.cli_verify_no_response(cli_set)
2372         self.cli_verify_response("show bfd echo-source",
2373                                  "UDP echo source is: %s\n"
2374                                  "IPv4 address usable as echo source: none\n"
2375                                  "IPv6 address usable as echo source: none" %
2376                                  self.loopback0.name)
2377         self.loopback0.config_ip4()
2378         unpacked = unpack("!L", self.loopback0.local_ip4n)
2379         echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2380         self.cli_verify_response("show bfd echo-source",
2381                                  "UDP echo source is: %s\n"
2382                                  "IPv4 address usable as echo source: %s\n"
2383                                  "IPv6 address usable as echo source: none" %
2384                                  (self.loopback0.name, echo_ip4))
2385         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2386         echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2387                                             unpacked[2], unpacked[3] ^ 1))
2388         self.loopback0.config_ip6()
2389         self.cli_verify_response("show bfd echo-source",
2390                                  "UDP echo source is: %s\n"
2391                                  "IPv4 address usable as echo source: %s\n"
2392                                  "IPv6 address usable as echo source: %s" %
2393                                  (self.loopback0.name, echo_ip4, echo_ip6))
2394         cli_del = "bfd udp echo-source del"
2395         self.cli_verify_no_response(cli_del)
2396         self.cli_verify_response("show bfd echo-source",
2397                                  "UDP echo source is not set.")
2398
2399 if __name__ == '__main__':
2400     unittest.main(testRunner=VppTestRunner)