IP Flow Hash Config fixes
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python
2 """ BFD tests """
3
4 from __future__ import division
5 import unittest
6 import hashlib
7 import binascii
8 import time
9 from struct import pack, unpack
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import UDP, IP
15 from scapy.layers.inet6 import IPv6
16 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
17     BFDDiagCode, BFDState, BFD_vpp_echo
18 from framework import VppTestCase, VppTestRunner, running_extended_tests
19 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
20 from util import ppp
21 from vpp_papi_provider import UnexpectedApiReturnValueError
22 from vpp_ip_route import VppIpRoute, VppRoutePath
23
24 USEC_IN_SEC = 1000000
25
26
27 class AuthKeyFactory(object):
28     """Factory class for creating auth keys with unique conf key ID"""
29
30     def __init__(self):
31         self._conf_key_ids = {}
32
33     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
34         """ create a random key with unique conf key id """
35         conf_key_id = randint(0, 0xFFFFFFFF)
36         while conf_key_id in self._conf_key_ids:
37             conf_key_id = randint(0, 0xFFFFFFFF)
38         self._conf_key_ids[conf_key_id] = 1
39         key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
40         return VppBFDAuthKey(test=test, auth_type=auth_type,
41                              conf_key_id=conf_key_id, key=key)
42
43
44 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
45 class BFDAPITestCase(VppTestCase):
46     """Bidirectional Forwarding Detection (BFD) - API"""
47
48     pg0 = None
49     pg1 = None
50
51     @classmethod
52     def setUpClass(cls):
53         super(BFDAPITestCase, cls).setUpClass()
54
55         try:
56             cls.create_pg_interfaces(range(2))
57             for i in cls.pg_interfaces:
58                 i.config_ip4()
59                 i.config_ip6()
60                 i.resolve_arp()
61
62         except Exception:
63             super(BFDAPITestCase, cls).tearDownClass()
64             raise
65
66     def setUp(self):
67         super(BFDAPITestCase, self).setUp()
68         self.factory = AuthKeyFactory()
69
70     def test_add_bfd(self):
71         """ create a BFD session """
72         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
73         session.add_vpp_config()
74         self.logger.debug("Session state is %s", session.state)
75         session.remove_vpp_config()
76         session.add_vpp_config()
77         self.logger.debug("Session state is %s", session.state)
78         session.remove_vpp_config()
79
80     def test_double_add(self):
81         """ create the same BFD session twice (negative case) """
82         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
83         session.add_vpp_config()
84
85         with self.vapi.expect_negative_api_retval():
86             session.add_vpp_config()
87
88         session.remove_vpp_config()
89
90     def test_add_bfd6(self):
91         """ create IPv6 BFD session """
92         session = VppBFDUDPSession(
93             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
94         session.add_vpp_config()
95         self.logger.debug("Session state is %s", session.state)
96         session.remove_vpp_config()
97         session.add_vpp_config()
98         self.logger.debug("Session state is %s", session.state)
99         session.remove_vpp_config()
100
101     def test_mod_bfd(self):
102         """ modify BFD session parameters """
103         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
104                                    desired_min_tx=50000,
105                                    required_min_rx=10000,
106                                    detect_mult=1)
107         session.add_vpp_config()
108         s = session.get_bfd_udp_session_dump_entry()
109         self.assert_equal(session.desired_min_tx,
110                           s.desired_min_tx,
111                           "desired min transmit interval")
112         self.assert_equal(session.required_min_rx,
113                           s.required_min_rx,
114                           "required min receive interval")
115         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
116         session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
117                                   required_min_rx=session.required_min_rx * 2,
118                                   detect_mult=session.detect_mult * 2)
119         s = session.get_bfd_udp_session_dump_entry()
120         self.assert_equal(session.desired_min_tx,
121                           s.desired_min_tx,
122                           "desired min transmit interval")
123         self.assert_equal(session.required_min_rx,
124                           s.required_min_rx,
125                           "required min receive interval")
126         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
127
128     def test_add_sha1_keys(self):
129         """ add SHA1 keys """
130         key_count = 10
131         keys = [self.factory.create_random_key(
132             self) for i in range(0, key_count)]
133         for key in keys:
134             self.assertFalse(key.query_vpp_config())
135         for key in keys:
136             key.add_vpp_config()
137         for key in keys:
138             self.assertTrue(key.query_vpp_config())
139         # remove randomly
140         indexes = range(key_count)
141         shuffle(indexes)
142         removed = []
143         for i in indexes:
144             key = keys[i]
145             key.remove_vpp_config()
146             removed.append(i)
147             for j in range(key_count):
148                 key = keys[j]
149                 if j in removed:
150                     self.assertFalse(key.query_vpp_config())
151                 else:
152                     self.assertTrue(key.query_vpp_config())
153         # should be removed now
154         for key in keys:
155             self.assertFalse(key.query_vpp_config())
156         # add back and remove again
157         for key in keys:
158             key.add_vpp_config()
159         for key in keys:
160             self.assertTrue(key.query_vpp_config())
161         for key in keys:
162             key.remove_vpp_config()
163         for key in keys:
164             self.assertFalse(key.query_vpp_config())
165
166     def test_add_bfd_sha1(self):
167         """ create a BFD session (SHA1) """
168         key = self.factory.create_random_key(self)
169         key.add_vpp_config()
170         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
171                                    sha1_key=key)
172         session.add_vpp_config()
173         self.logger.debug("Session state is %s", session.state)
174         session.remove_vpp_config()
175         session.add_vpp_config()
176         self.logger.debug("Session state is %s", session.state)
177         session.remove_vpp_config()
178
179     def test_double_add_sha1(self):
180         """ create the same BFD session twice (negative case) (SHA1) """
181         key = self.factory.create_random_key(self)
182         key.add_vpp_config()
183         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
184                                    sha1_key=key)
185         session.add_vpp_config()
186         with self.assertRaises(Exception):
187             session.add_vpp_config()
188
189     def test_add_auth_nonexistent_key(self):
190         """ create BFD session using non-existent SHA1 (negative case) """
191         session = VppBFDUDPSession(
192             self, self.pg0, self.pg0.remote_ip4,
193             sha1_key=self.factory.create_random_key(self))
194         with self.assertRaises(Exception):
195             session.add_vpp_config()
196
197     def test_shared_sha1_key(self):
198         """ share single SHA1 key between multiple BFD sessions """
199         key = self.factory.create_random_key(self)
200         key.add_vpp_config()
201         sessions = [
202             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
203                              sha1_key=key),
204             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
205                              sha1_key=key, af=AF_INET6),
206             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
207                              sha1_key=key),
208             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
209                              sha1_key=key, af=AF_INET6)]
210         for s in sessions:
211             s.add_vpp_config()
212         removed = 0
213         for s in sessions:
214             e = key.get_bfd_auth_keys_dump_entry()
215             self.assert_equal(e.use_count, len(sessions) - removed,
216                               "Use count for shared key")
217             s.remove_vpp_config()
218             removed += 1
219         e = key.get_bfd_auth_keys_dump_entry()
220         self.assert_equal(e.use_count, len(sessions) - removed,
221                           "Use count for shared key")
222
223     def test_activate_auth(self):
224         """ activate SHA1 authentication """
225         key = self.factory.create_random_key(self)
226         key.add_vpp_config()
227         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
228         session.add_vpp_config()
229         session.activate_auth(key)
230
231     def test_deactivate_auth(self):
232         """ deactivate SHA1 authentication """
233         key = self.factory.create_random_key(self)
234         key.add_vpp_config()
235         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
236         session.add_vpp_config()
237         session.activate_auth(key)
238         session.deactivate_auth()
239
240     def test_change_key(self):
241         """ change SHA1 key """
242         key1 = self.factory.create_random_key(self)
243         key2 = self.factory.create_random_key(self)
244         while key2.conf_key_id == key1.conf_key_id:
245             key2 = self.factory.create_random_key(self)
246         key1.add_vpp_config()
247         key2.add_vpp_config()
248         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
249                                    sha1_key=key1)
250         session.add_vpp_config()
251         session.activate_auth(key2)
252
253
254 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
255 class BFDTestSession(object):
256     """ BFD session as seen from test framework side """
257
258     def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
259                  bfd_key_id=None, our_seq_number=None):
260         self.test = test
261         self.af = af
262         self.sha1_key = sha1_key
263         self.bfd_key_id = bfd_key_id
264         self.interface = interface
265         self.udp_sport = randint(49152, 65535)
266         if our_seq_number is None:
267             self.our_seq_number = randint(0, 40000000)
268         else:
269             self.our_seq_number = our_seq_number
270         self.vpp_seq_number = None
271         self.my_discriminator = 0
272         self.desired_min_tx = 300000
273         self.required_min_rx = 300000
274         self.required_min_echo_rx = None
275         self.detect_mult = detect_mult
276         self.diag = BFDDiagCode.no_diagnostic
277         self.your_discriminator = None
278         self.state = BFDState.down
279         self.auth_type = BFDAuthType.no_auth
280
281     def inc_seq_num(self):
282         """ increment sequence number, wrapping if needed """
283         if self.our_seq_number == 0xFFFFFFFF:
284             self.our_seq_number = 0
285         else:
286             self.our_seq_number += 1
287
288     def update(self, my_discriminator=None, your_discriminator=None,
289                desired_min_tx=None, required_min_rx=None,
290                required_min_echo_rx=None, detect_mult=None,
291                diag=None, state=None, auth_type=None):
292         """ update BFD parameters associated with session """
293         if my_discriminator is not None:
294             self.my_discriminator = my_discriminator
295         if your_discriminator is not None:
296             self.your_discriminator = your_discriminator
297         if required_min_rx is not None:
298             self.required_min_rx = required_min_rx
299         if required_min_echo_rx is not None:
300             self.required_min_echo_rx = required_min_echo_rx
301         if desired_min_tx is not None:
302             self.desired_min_tx = desired_min_tx
303         if detect_mult is not None:
304             self.detect_mult = detect_mult
305         if diag is not None:
306             self.diag = diag
307         if state is not None:
308             self.state = state
309         if auth_type is not None:
310             self.auth_type = auth_type
311
312     def fill_packet_fields(self, packet):
313         """ set packet fields with known values in packet """
314         bfd = packet[BFD]
315         if self.my_discriminator:
316             self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
317                                    self.my_discriminator)
318             bfd.my_discriminator = self.my_discriminator
319         if self.your_discriminator:
320             self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
321                                    self.your_discriminator)
322             bfd.your_discriminator = self.your_discriminator
323         if self.required_min_rx:
324             self.test.logger.debug(
325                 "BFD: setting packet.required_min_rx_interval=%s",
326                 self.required_min_rx)
327             bfd.required_min_rx_interval = self.required_min_rx
328         if self.required_min_echo_rx:
329             self.test.logger.debug(
330                 "BFD: setting packet.required_min_echo_rx=%s",
331                 self.required_min_echo_rx)
332             bfd.required_min_echo_rx_interval = self.required_min_echo_rx
333         if self.desired_min_tx:
334             self.test.logger.debug(
335                 "BFD: setting packet.desired_min_tx_interval=%s",
336                 self.desired_min_tx)
337             bfd.desired_min_tx_interval = self.desired_min_tx
338         if self.detect_mult:
339             self.test.logger.debug(
340                 "BFD: setting packet.detect_mult=%s", self.detect_mult)
341             bfd.detect_mult = self.detect_mult
342         if self.diag:
343             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
344             bfd.diag = self.diag
345         if self.state:
346             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
347             bfd.state = self.state
348         if self.auth_type:
349             # this is used by a negative test-case
350             self.test.logger.debug("BFD: setting packet.auth_type=%s",
351                                    self.auth_type)
352             bfd.auth_type = self.auth_type
353
354     def create_packet(self):
355         """ create a BFD packet, reflecting the current state of session """
356         if self.sha1_key:
357             bfd = BFD(flags="A")
358             bfd.auth_type = self.sha1_key.auth_type
359             bfd.auth_len = BFD.sha1_auth_len
360             bfd.auth_key_id = self.bfd_key_id
361             bfd.auth_seq_num = self.our_seq_number
362             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
363         else:
364             bfd = BFD()
365         if self.af == AF_INET6:
366             packet = (Ether(src=self.interface.remote_mac,
367                             dst=self.interface.local_mac) /
368                       IPv6(src=self.interface.remote_ip6,
369                            dst=self.interface.local_ip6,
370                            hlim=255) /
371                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
372                       bfd)
373         else:
374             packet = (Ether(src=self.interface.remote_mac,
375                             dst=self.interface.local_mac) /
376                       IP(src=self.interface.remote_ip4,
377                          dst=self.interface.local_ip4,
378                          ttl=255) /
379                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
380                       bfd)
381         self.test.logger.debug("BFD: Creating packet")
382         self.fill_packet_fields(packet)
383         if self.sha1_key:
384             hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
385                 "\0" * (20 - len(self.sha1_key.key))
386             self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
387                                    hashlib.sha1(hash_material).hexdigest())
388             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
389         return packet
390
391     def send_packet(self, packet=None, interface=None):
392         """ send packet on interface, creating the packet if needed """
393         if packet is None:
394             packet = self.create_packet()
395         if interface is None:
396             interface = self.test.pg0
397         self.test.logger.debug(ppp("Sending packet:", packet))
398         interface.add_stream(packet)
399         self.test.pg_start()
400
401     def verify_sha1_auth(self, packet):
402         """ Verify correctness of authentication in BFD layer. """
403         bfd = packet[BFD]
404         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
405         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
406                                BFDAuthType)
407         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
408         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
409         if self.vpp_seq_number is None:
410             self.vpp_seq_number = bfd.auth_seq_num
411             self.test.logger.debug("Received initial sequence number: %s" %
412                                    self.vpp_seq_number)
413         else:
414             recvd_seq_num = bfd.auth_seq_num
415             self.test.logger.debug("Received followup sequence number: %s" %
416                                    recvd_seq_num)
417             if self.vpp_seq_number < 0xffffffff:
418                 if self.sha1_key.auth_type == \
419                         BFDAuthType.meticulous_keyed_sha1:
420                     self.test.assert_equal(recvd_seq_num,
421                                            self.vpp_seq_number + 1,
422                                            "BFD sequence number")
423                 else:
424                     self.test.assert_in_range(recvd_seq_num,
425                                               self.vpp_seq_number,
426                                               self.vpp_seq_number + 1,
427                                               "BFD sequence number")
428             else:
429                 if self.sha1_key.auth_type == \
430                         BFDAuthType.meticulous_keyed_sha1:
431                     self.test.assert_equal(recvd_seq_num, 0,
432                                            "BFD sequence number")
433                 else:
434                     self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
435                                        "BFD sequence number not one of "
436                                        "(%s, 0)" % self.vpp_seq_number)
437             self.vpp_seq_number = recvd_seq_num
438         # last 20 bytes represent the hash - so replace them with the key,
439         # pad the result with zeros and hash the result
440         hash_material = bfd.original[:-20] + self.sha1_key.key + \
441             "\0" * (20 - len(self.sha1_key.key))
442         expected_hash = hashlib.sha1(hash_material).hexdigest()
443         self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
444                                expected_hash, "Auth key hash")
445
446     def verify_bfd(self, packet):
447         """ Verify correctness of BFD layer. """
448         bfd = packet[BFD]
449         self.test.assert_equal(bfd.version, 1, "BFD version")
450         self.test.assert_equal(bfd.your_discriminator,
451                                self.my_discriminator,
452                                "BFD - your discriminator")
453         if self.sha1_key:
454             self.verify_sha1_auth(packet)
455
456
457 def bfd_session_up(test):
458     """ Bring BFD session up """
459     test.logger.info("BFD: Waiting for slow hello")
460     p = wait_for_bfd_packet(test, 2)
461     old_offset = None
462     if hasattr(test, 'vpp_clock_offset'):
463         old_offset = test.vpp_clock_offset
464     test.vpp_clock_offset = time.time() - p.time
465     test.logger.debug("BFD: Calculated vpp clock offset: %s",
466                       test.vpp_clock_offset)
467     if old_offset:
468         test.assertAlmostEqual(
469             old_offset, test.vpp_clock_offset, delta=0.5,
470             msg="vpp clock offset not stable (new: %s, old: %s)" %
471             (test.vpp_clock_offset, old_offset))
472     test.logger.info("BFD: Sending Init")
473     test.test_session.update(my_discriminator=randint(0, 40000000),
474                              your_discriminator=p[BFD].my_discriminator,
475                              state=BFDState.init)
476     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
477             BFDAuthType.meticulous_keyed_sha1:
478         test.test_session.inc_seq_num()
479     test.test_session.send_packet()
480     test.logger.info("BFD: Waiting for event")
481     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
482     verify_event(test, e, expected_state=BFDState.up)
483     test.logger.info("BFD: Session is Up")
484     test.test_session.update(state=BFDState.up)
485     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
486             BFDAuthType.meticulous_keyed_sha1:
487         test.test_session.inc_seq_num()
488     test.test_session.send_packet()
489     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
490
491
492 def bfd_session_down(test):
493     """ Bring BFD session down """
494     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
495     test.test_session.update(state=BFDState.down)
496     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
497             BFDAuthType.meticulous_keyed_sha1:
498         test.test_session.inc_seq_num()
499     test.test_session.send_packet()
500     test.logger.info("BFD: Waiting for event")
501     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
502     verify_event(test, e, expected_state=BFDState.down)
503     test.logger.info("BFD: Session is Down")
504     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
505
506
507 def verify_bfd_session_config(test, session, state=None):
508     dump = session.get_bfd_udp_session_dump_entry()
509     test.assertIsNotNone(dump)
510     # since dump is not none, we have verified that sw_if_index and addresses
511     # are valid (in get_bfd_udp_session_dump_entry)
512     if state:
513         test.assert_equal(dump.state, state, "session state")
514     test.assert_equal(dump.required_min_rx, session.required_min_rx,
515                       "required min rx interval")
516     test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
517                       "desired min tx interval")
518     test.assert_equal(dump.detect_mult, session.detect_mult,
519                       "detect multiplier")
520     if session.sha1_key is None:
521         test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
522     else:
523         test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
524         test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
525                           "bfd key id")
526         test.assert_equal(dump.conf_key_id,
527                           session.sha1_key.conf_key_id,
528                           "config key id")
529
530
531 def verify_ip(test, packet):
532     """ Verify correctness of IP layer. """
533     if test.vpp_session.af == AF_INET6:
534         ip = packet[IPv6]
535         local_ip = test.pg0.local_ip6
536         remote_ip = test.pg0.remote_ip6
537         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
538     else:
539         ip = packet[IP]
540         local_ip = test.pg0.local_ip4
541         remote_ip = test.pg0.remote_ip4
542         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
543     test.assert_equal(ip.src, local_ip, "IP source address")
544     test.assert_equal(ip.dst, remote_ip, "IP destination address")
545
546
547 def verify_udp(test, packet):
548     """ Verify correctness of UDP layer. """
549     udp = packet[UDP]
550     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
551     test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
552                          "UDP source port")
553
554
555 def verify_event(test, event, expected_state):
556     """ Verify correctness of event values. """
557     e = event
558     test.logger.debug("BFD: Event: %s" % repr(e))
559     test.assert_equal(e.sw_if_index,
560                       test.vpp_session.interface.sw_if_index,
561                       "BFD interface index")
562     is_ipv6 = 0
563     if test.vpp_session.af == AF_INET6:
564         is_ipv6 = 1
565     test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
566     if test.vpp_session.af == AF_INET:
567         test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
568                           "Local IPv4 address")
569         test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
570                           "Peer IPv4 address")
571     else:
572         test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
573                           "Local IPv6 address")
574         test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
575                           "Peer IPv6 address")
576     test.assert_equal(e.state, expected_state, BFDState)
577
578
579 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
580     """ wait for BFD packet and verify its correctness
581
582     :param timeout: how long to wait
583     :param pcap_time_min: ignore packets with pcap timestamp lower than this
584
585     :returns: tuple (packet, time spent waiting for packet)
586     """
587     test.logger.info("BFD: Waiting for BFD packet")
588     deadline = time.time() + timeout
589     counter = 0
590     while True:
591         counter += 1
592         # sanity check
593         test.assert_in_range(counter, 0, 100, "number of packets ignored")
594         time_left = deadline - time.time()
595         if time_left < 0:
596             raise CaptureTimeoutError("Packet did not arrive within timeout")
597         p = test.pg0.wait_for_packet(timeout=time_left)
598         test.logger.debug(ppp("BFD: Got packet:", p))
599         if pcap_time_min is not None and p.time < pcap_time_min:
600             test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
601                                   "pcap time min %s):" %
602                                   (p.time, pcap_time_min), p))
603         else:
604             break
605     bfd = p[BFD]
606     if bfd is None:
607         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
608     if bfd.payload:
609         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
610     verify_ip(test, p)
611     verify_udp(test, p)
612     test.test_session.verify_bfd(p)
613     return p
614
615
616 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
617 class BFD4TestCase(VppTestCase):
618     """Bidirectional Forwarding Detection (BFD)"""
619
620     pg0 = None
621     vpp_clock_offset = None
622     vpp_session = None
623     test_session = None
624
625     @classmethod
626     def setUpClass(cls):
627         super(BFD4TestCase, cls).setUpClass()
628         try:
629             cls.create_pg_interfaces([0])
630             cls.create_loopback_interfaces([0])
631             cls.loopback0 = cls.lo_interfaces[0]
632             cls.loopback0.config_ip4()
633             cls.loopback0.admin_up()
634             cls.pg0.config_ip4()
635             cls.pg0.configure_ipv4_neighbors()
636             cls.pg0.admin_up()
637             cls.pg0.resolve_arp()
638
639         except Exception:
640             super(BFD4TestCase, cls).tearDownClass()
641             raise
642
643     def setUp(self):
644         super(BFD4TestCase, self).setUp()
645         self.factory = AuthKeyFactory()
646         self.vapi.want_bfd_events()
647         self.pg0.enable_capture()
648         try:
649             self.vpp_session = VppBFDUDPSession(self, self.pg0,
650                                                 self.pg0.remote_ip4)
651             self.vpp_session.add_vpp_config()
652             self.vpp_session.admin_up()
653             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
654         except:
655             self.vapi.want_bfd_events(enable_disable=0)
656             raise
657
658     def tearDown(self):
659         if not self.vpp_dead:
660             self.vapi.want_bfd_events(enable_disable=0)
661         self.vapi.collect_events()  # clear the event queue
662         super(BFD4TestCase, self).tearDown()
663
664     def test_session_up(self):
665         """ bring BFD session up """
666         bfd_session_up(self)
667
668     def test_session_up_by_ip(self):
669         """ bring BFD session up - first frame looked up by address pair """
670         self.logger.info("BFD: Sending Slow control frame")
671         self.test_session.update(my_discriminator=randint(0, 40000000))
672         self.test_session.send_packet()
673         self.pg0.enable_capture()
674         p = self.pg0.wait_for_packet(1)
675         self.assert_equal(p[BFD].your_discriminator,
676                           self.test_session.my_discriminator,
677                           "BFD - your discriminator")
678         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
679         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
680                                  state=BFDState.up)
681         self.logger.info("BFD: Waiting for event")
682         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
683         verify_event(self, e, expected_state=BFDState.init)
684         self.logger.info("BFD: Sending Up")
685         self.test_session.send_packet()
686         self.logger.info("BFD: Waiting for event")
687         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
688         verify_event(self, e, expected_state=BFDState.up)
689         self.logger.info("BFD: Session is Up")
690         self.test_session.update(state=BFDState.up)
691         self.test_session.send_packet()
692         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
693
694     def test_session_down(self):
695         """ bring BFD session down """
696         bfd_session_up(self)
697         bfd_session_down(self)
698
699     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
700     def test_hold_up(self):
701         """ hold BFD session up """
702         bfd_session_up(self)
703         for dummy in range(self.test_session.detect_mult * 2):
704             wait_for_bfd_packet(self)
705             self.test_session.send_packet()
706         self.assert_equal(len(self.vapi.collect_events()), 0,
707                           "number of bfd events")
708
709     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
710     def test_slow_timer(self):
711         """ verify slow periodic control frames while session down """
712         packet_count = 3
713         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
714         prev_packet = wait_for_bfd_packet(self, 2)
715         for dummy in range(packet_count):
716             next_packet = wait_for_bfd_packet(self, 2)
717             time_diff = next_packet.time - prev_packet.time
718             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
719             # to work around timing issues
720             self.assert_in_range(
721                 time_diff, 0.70, 1.05, "time between slow packets")
722             prev_packet = next_packet
723
724     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
725     def test_zero_remote_min_rx(self):
726         """ no packets when zero remote required min rx interval """
727         bfd_session_up(self)
728         self.test_session.update(required_min_rx=0)
729         self.test_session.send_packet()
730         for dummy in range(self.test_session.detect_mult):
731             self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
732                        "sleep before transmitting bfd packet")
733             self.test_session.send_packet()
734             try:
735                 p = wait_for_bfd_packet(self, timeout=0)
736                 self.logger.error(ppp("Received unexpected packet:", p))
737             except CaptureTimeoutError:
738                 pass
739         self.assert_equal(
740             len(self.vapi.collect_events()), 0, "number of bfd events")
741         self.test_session.update(required_min_rx=300000)
742         for dummy in range(3):
743             self.test_session.send_packet()
744             wait_for_bfd_packet(
745                 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
746         self.assert_equal(
747             len(self.vapi.collect_events()), 0, "number of bfd events")
748
749     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
750     def test_conn_down(self):
751         """ verify session goes down after inactivity """
752         bfd_session_up(self)
753         detection_time = self.test_session.detect_mult *\
754             self.vpp_session.required_min_rx / USEC_IN_SEC
755         self.sleep(detection_time, "waiting for BFD session time-out")
756         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
757         verify_event(self, e, expected_state=BFDState.down)
758
759     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
760     def test_large_required_min_rx(self):
761         """ large remote required min rx interval """
762         bfd_session_up(self)
763         p = wait_for_bfd_packet(self)
764         interval = 3000000
765         self.test_session.update(required_min_rx=interval)
766         self.test_session.send_packet()
767         time_mark = time.time()
768         count = 0
769         # busy wait here, trying to collect a packet or event, vpp is not
770         # allowed to send packets and the session will timeout first - so the
771         # Up->Down event must arrive before any packets do
772         while time.time() < time_mark + interval / USEC_IN_SEC:
773             try:
774                 p = wait_for_bfd_packet(self, timeout=0)
775                 # if vpp managed to send a packet before we did the session
776                 # session update, then that's fine, ignore it
777                 if p.time < time_mark - self.vpp_clock_offset:
778                     continue
779                 self.logger.error(ppp("Received unexpected packet:", p))
780                 count += 1
781             except CaptureTimeoutError:
782                 pass
783             events = self.vapi.collect_events()
784             if len(events) > 0:
785                 verify_event(self, events[0], BFDState.down)
786                 break
787         self.assert_equal(count, 0, "number of packets received")
788
789     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
790     def test_immediate_remote_min_rx_reduction(self):
791         """ immediately honor remote required min rx reduction """
792         self.vpp_session.remove_vpp_config()
793         self.vpp_session = VppBFDUDPSession(
794             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
795         self.pg0.enable_capture()
796         self.vpp_session.add_vpp_config()
797         self.test_session.update(desired_min_tx=1000000,
798                                  required_min_rx=1000000)
799         bfd_session_up(self)
800         reference_packet = wait_for_bfd_packet(self)
801         time_mark = time.time()
802         interval = 300000
803         self.test_session.update(required_min_rx=interval)
804         self.test_session.send_packet()
805         extra_time = time.time() - time_mark
806         p = wait_for_bfd_packet(self)
807         # first packet is allowed to be late by time we spent doing the update
808         # calculated in extra_time
809         self.assert_in_range(p.time - reference_packet.time,
810                              .95 * 0.75 * interval / USEC_IN_SEC,
811                              1.05 * interval / USEC_IN_SEC + extra_time,
812                              "time between BFD packets")
813         reference_packet = p
814         for dummy in range(3):
815             p = wait_for_bfd_packet(self)
816             diff = p.time - reference_packet.time
817             self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
818                                  1.05 * interval / USEC_IN_SEC,
819                                  "time between BFD packets")
820             reference_packet = p
821
822     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
823     def test_modify_req_min_rx_double(self):
824         """ modify session - double required min rx """
825         bfd_session_up(self)
826         p = wait_for_bfd_packet(self)
827         self.test_session.update(desired_min_tx=10000,
828                                  required_min_rx=10000)
829         self.test_session.send_packet()
830         # double required min rx
831         self.vpp_session.modify_parameters(
832             required_min_rx=2 * self.vpp_session.required_min_rx)
833         p = wait_for_bfd_packet(
834             self, pcap_time_min=time.time() - self.vpp_clock_offset)
835         # poll bit needs to be set
836         self.assertIn("P", p.sprintf("%BFD.flags%"),
837                       "Poll bit not set in BFD packet")
838         # finish poll sequence with final packet
839         final = self.test_session.create_packet()
840         final[BFD].flags = "F"
841         timeout = self.test_session.detect_mult * \
842             max(self.test_session.desired_min_tx,
843                 self.vpp_session.required_min_rx) / USEC_IN_SEC
844         self.test_session.send_packet(final)
845         time_mark = time.time()
846         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
847         verify_event(self, e, expected_state=BFDState.down)
848         time_to_event = time.time() - time_mark
849         self.assert_in_range(time_to_event, .9 * timeout,
850                              1.1 * timeout, "session timeout")
851
852     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
853     def test_modify_req_min_rx_halve(self):
854         """ modify session - halve required min rx """
855         self.vpp_session.modify_parameters(
856             required_min_rx=2 * self.vpp_session.required_min_rx)
857         bfd_session_up(self)
858         p = wait_for_bfd_packet(self)
859         self.test_session.update(desired_min_tx=10000,
860                                  required_min_rx=10000)
861         self.test_session.send_packet()
862         p = wait_for_bfd_packet(
863             self, pcap_time_min=time.time() - self.vpp_clock_offset)
864         # halve required min rx
865         old_required_min_rx = self.vpp_session.required_min_rx
866         self.vpp_session.modify_parameters(
867             required_min_rx=0.5 * self.vpp_session.required_min_rx)
868         # now we wait 0.8*3*old-req-min-rx and the session should still be up
869         self.sleep(0.8 * self.vpp_session.detect_mult *
870                    old_required_min_rx / USEC_IN_SEC,
871                    "wait before finishing poll sequence")
872         self.assert_equal(len(self.vapi.collect_events()), 0,
873                           "number of bfd events")
874         p = wait_for_bfd_packet(self)
875         # poll bit needs to be set
876         self.assertIn("P", p.sprintf("%BFD.flags%"),
877                       "Poll bit not set in BFD packet")
878         # finish poll sequence with final packet
879         final = self.test_session.create_packet()
880         final[BFD].flags = "F"
881         self.test_session.send_packet(final)
882         # now the session should time out under new conditions
883         detection_time = self.test_session.detect_mult *\
884             self.vpp_session.required_min_rx / USEC_IN_SEC
885         before = time.time()
886         e = self.vapi.wait_for_event(
887             2 * detection_time, "bfd_udp_session_details")
888         after = time.time()
889         self.assert_in_range(after - before,
890                              0.9 * detection_time,
891                              1.1 * detection_time,
892                              "time before bfd session goes down")
893         verify_event(self, e, expected_state=BFDState.down)
894
895     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
896     def test_modify_detect_mult(self):
897         """ modify detect multiplier """
898         bfd_session_up(self)
899         p = wait_for_bfd_packet(self)
900         self.vpp_session.modify_parameters(detect_mult=1)
901         p = wait_for_bfd_packet(
902             self, pcap_time_min=time.time() - self.vpp_clock_offset)
903         self.assert_equal(self.vpp_session.detect_mult,
904                           p[BFD].detect_mult,
905                           "detect mult")
906         # poll bit must not be set
907         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
908                          "Poll bit not set in BFD packet")
909         self.vpp_session.modify_parameters(detect_mult=10)
910         p = wait_for_bfd_packet(
911             self, pcap_time_min=time.time() - self.vpp_clock_offset)
912         self.assert_equal(self.vpp_session.detect_mult,
913                           p[BFD].detect_mult,
914                           "detect mult")
915         # poll bit must not be set
916         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
917                          "Poll bit not set in BFD packet")
918
919     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
920     def test_queued_poll(self):
921         """ test poll sequence queueing """
922         bfd_session_up(self)
923         p = wait_for_bfd_packet(self)
924         self.vpp_session.modify_parameters(
925             required_min_rx=2 * self.vpp_session.required_min_rx)
926         p = wait_for_bfd_packet(self)
927         poll_sequence_start = time.time()
928         poll_sequence_length_min = 0.5
929         send_final_after = time.time() + poll_sequence_length_min
930         # poll bit needs to be set
931         self.assertIn("P", p.sprintf("%BFD.flags%"),
932                       "Poll bit not set in BFD packet")
933         self.assert_equal(p[BFD].required_min_rx_interval,
934                           self.vpp_session.required_min_rx,
935                           "BFD required min rx interval")
936         self.vpp_session.modify_parameters(
937             required_min_rx=2 * self.vpp_session.required_min_rx)
938         # 2nd poll sequence should be queued now
939         # don't send the reply back yet, wait for some time to emulate
940         # longer round-trip time
941         packet_count = 0
942         while time.time() < send_final_after:
943             self.test_session.send_packet()
944             p = wait_for_bfd_packet(self)
945             self.assert_equal(len(self.vapi.collect_events()), 0,
946                               "number of bfd events")
947             self.assert_equal(p[BFD].required_min_rx_interval,
948                               self.vpp_session.required_min_rx,
949                               "BFD required min rx interval")
950             packet_count += 1
951             # poll bit must be set
952             self.assertIn("P", p.sprintf("%BFD.flags%"),
953                           "Poll bit not set in BFD packet")
954         final = self.test_session.create_packet()
955         final[BFD].flags = "F"
956         self.test_session.send_packet(final)
957         # finish 1st with final
958         poll_sequence_length = time.time() - poll_sequence_start
959         # vpp must wait for some time before starting new poll sequence
960         poll_no_2_started = False
961         for dummy in range(2 * packet_count):
962             p = wait_for_bfd_packet(self)
963             self.assert_equal(len(self.vapi.collect_events()), 0,
964                               "number of bfd events")
965             if "P" in p.sprintf("%BFD.flags%"):
966                 poll_no_2_started = True
967                 if time.time() < poll_sequence_start + poll_sequence_length:
968                     raise Exception("VPP started 2nd poll sequence too soon")
969                 final = self.test_session.create_packet()
970                 final[BFD].flags = "F"
971                 self.test_session.send_packet(final)
972                 break
973             else:
974                 self.test_session.send_packet()
975         self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
976         # finish 2nd with final
977         final = self.test_session.create_packet()
978         final[BFD].flags = "F"
979         self.test_session.send_packet(final)
980         p = wait_for_bfd_packet(self)
981         # poll bit must not be set
982         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
983                          "Poll bit set in BFD packet")
984
985     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
986     def test_poll_response(self):
987         """ test correct response to control frame with poll bit set """
988         bfd_session_up(self)
989         poll = self.test_session.create_packet()
990         poll[BFD].flags = "P"
991         self.test_session.send_packet(poll)
992         final = wait_for_bfd_packet(
993             self, pcap_time_min=time.time() - self.vpp_clock_offset)
994         self.assertIn("F", final.sprintf("%BFD.flags%"))
995
996     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
997     def test_no_periodic_if_remote_demand(self):
998         """ no periodic frames outside poll sequence if remote demand set """
999         bfd_session_up(self)
1000         demand = self.test_session.create_packet()
1001         demand[BFD].flags = "D"
1002         self.test_session.send_packet(demand)
1003         transmit_time = 0.9 \
1004             * max(self.vpp_session.required_min_rx,
1005                   self.test_session.desired_min_tx) \
1006             / USEC_IN_SEC
1007         count = 0
1008         for dummy in range(self.test_session.detect_mult * 2):
1009             time.sleep(transmit_time)
1010             self.test_session.send_packet(demand)
1011             try:
1012                 p = wait_for_bfd_packet(self, timeout=0)
1013                 self.logger.error(ppp("Received unexpected packet:", p))
1014                 count += 1
1015             except CaptureTimeoutError:
1016                 pass
1017         events = self.vapi.collect_events()
1018         for e in events:
1019             self.logger.error("Received unexpected event: %s", e)
1020         self.assert_equal(count, 0, "number of packets received")
1021         self.assert_equal(len(events), 0, "number of events received")
1022
1023     def test_echo_looped_back(self):
1024         """ echo packets looped back """
1025         # don't need a session in this case..
1026         self.vpp_session.remove_vpp_config()
1027         self.pg0.enable_capture()
1028         echo_packet_count = 10
1029         # random source port low enough to increment a few times..
1030         udp_sport_tx = randint(1, 50000)
1031         udp_sport_rx = udp_sport_tx
1032         echo_packet = (Ether(src=self.pg0.remote_mac,
1033                              dst=self.pg0.local_mac) /
1034                        IP(src=self.pg0.remote_ip4,
1035                           dst=self.pg0.remote_ip4) /
1036                        UDP(dport=BFD.udp_dport_echo) /
1037                        Raw("this should be looped back"))
1038         for dummy in range(echo_packet_count):
1039             self.sleep(.01, "delay between echo packets")
1040             echo_packet[UDP].sport = udp_sport_tx
1041             udp_sport_tx += 1
1042             self.logger.debug(ppp("Sending packet:", echo_packet))
1043             self.pg0.add_stream(echo_packet)
1044             self.pg_start()
1045         for dummy in range(echo_packet_count):
1046             p = self.pg0.wait_for_packet(1)
1047             self.logger.debug(ppp("Got packet:", p))
1048             ether = p[Ether]
1049             self.assert_equal(self.pg0.remote_mac,
1050                               ether.dst, "Destination MAC")
1051             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1052             ip = p[IP]
1053             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1054             self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1055             udp = p[UDP]
1056             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1057                               "UDP destination port")
1058             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1059             udp_sport_rx += 1
1060             # need to compare the hex payload here, otherwise BFD_vpp_echo
1061             # gets in way
1062             self.assertEqual(str(p[UDP].payload),
1063                              str(echo_packet[UDP].payload),
1064                              "Received packet is not the echo packet sent")
1065         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1066                           "ECHO packet identifier for test purposes)")
1067
1068     def test_echo(self):
1069         """ echo function """
1070         bfd_session_up(self)
1071         self.test_session.update(required_min_echo_rx=150000)
1072         self.test_session.send_packet()
1073         detection_time = self.test_session.detect_mult *\
1074             self.vpp_session.required_min_rx / USEC_IN_SEC
1075         # echo shouldn't work without echo source set
1076         for dummy in range(10):
1077             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1078             self.sleep(sleep, "delay before sending bfd packet")
1079             self.test_session.send_packet()
1080         p = wait_for_bfd_packet(
1081             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1082         self.assert_equal(p[BFD].required_min_rx_interval,
1083                           self.vpp_session.required_min_rx,
1084                           "BFD required min rx interval")
1085         self.test_session.send_packet()
1086         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1087         echo_seen = False
1088         # should be turned on - loopback echo packets
1089         for dummy in range(3):
1090             loop_until = time.time() + 0.75 * detection_time
1091             while time.time() < loop_until:
1092                 p = self.pg0.wait_for_packet(1)
1093                 self.logger.debug(ppp("Got packet:", p))
1094                 if p[UDP].dport == BFD.udp_dport_echo:
1095                     self.assert_equal(
1096                         p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1097                     self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1098                                         "BFD ECHO src IP equal to loopback IP")
1099                     self.logger.debug(ppp("Looping back packet:", p))
1100                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1101                                       "ECHO packet destination MAC address")
1102                     p[Ether].dst = self.pg0.local_mac
1103                     self.pg0.add_stream(p)
1104                     self.pg_start()
1105                     echo_seen = True
1106                 elif p.haslayer(BFD):
1107                     if echo_seen:
1108                         self.assertGreaterEqual(
1109                             p[BFD].required_min_rx_interval,
1110                             1000000)
1111                     if "P" in p.sprintf("%BFD.flags%"):
1112                         final = self.test_session.create_packet()
1113                         final[BFD].flags = "F"
1114                         self.test_session.send_packet(final)
1115                 else:
1116                     raise Exception(ppp("Received unknown packet:", p))
1117
1118                 self.assert_equal(len(self.vapi.collect_events()), 0,
1119                                   "number of bfd events")
1120             self.test_session.send_packet()
1121         self.assertTrue(echo_seen, "No echo packets received")
1122
1123     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1124     def test_echo_fail(self):
1125         """ session goes down if echo function fails """
1126         bfd_session_up(self)
1127         self.test_session.update(required_min_echo_rx=150000)
1128         self.test_session.send_packet()
1129         detection_time = self.test_session.detect_mult *\
1130             self.vpp_session.required_min_rx / USEC_IN_SEC
1131         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1132         # echo function should be used now, but we will drop the echo packets
1133         verified_diag = False
1134         for dummy in range(3):
1135             loop_until = time.time() + 0.75 * detection_time
1136             while time.time() < loop_until:
1137                 p = self.pg0.wait_for_packet(1)
1138                 self.logger.debug(ppp("Got packet:", p))
1139                 if p[UDP].dport == BFD.udp_dport_echo:
1140                     # dropped
1141                     pass
1142                 elif p.haslayer(BFD):
1143                     if "P" in p.sprintf("%BFD.flags%"):
1144                         self.assertGreaterEqual(
1145                             p[BFD].required_min_rx_interval,
1146                             1000000)
1147                         final = self.test_session.create_packet()
1148                         final[BFD].flags = "F"
1149                         self.test_session.send_packet(final)
1150                     if p[BFD].state == BFDState.down:
1151                         self.assert_equal(p[BFD].diag,
1152                                           BFDDiagCode.echo_function_failed,
1153                                           BFDDiagCode)
1154                         verified_diag = True
1155                 else:
1156                     raise Exception(ppp("Received unknown packet:", p))
1157             self.test_session.send_packet()
1158         events = self.vapi.collect_events()
1159         self.assert_equal(len(events), 1, "number of bfd events")
1160         self.assert_equal(events[0].state, BFDState.down, BFDState)
1161         self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1162
1163     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1164     def test_echo_stop(self):
1165         """ echo function stops if peer sets required min echo rx zero """
1166         bfd_session_up(self)
1167         self.test_session.update(required_min_echo_rx=150000)
1168         self.test_session.send_packet()
1169         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1170         # wait for first echo packet
1171         while True:
1172             p = self.pg0.wait_for_packet(1)
1173             self.logger.debug(ppp("Got packet:", p))
1174             if p[UDP].dport == BFD.udp_dport_echo:
1175                 self.logger.debug(ppp("Looping back packet:", p))
1176                 p[Ether].dst = self.pg0.local_mac
1177                 self.pg0.add_stream(p)
1178                 self.pg_start()
1179                 break
1180             elif p.haslayer(BFD):
1181                 # ignore BFD
1182                 pass
1183             else:
1184                 raise Exception(ppp("Received unknown packet:", p))
1185         self.test_session.update(required_min_echo_rx=0)
1186         self.test_session.send_packet()
1187         # echo packets shouldn't arrive anymore
1188         for dummy in range(5):
1189             wait_for_bfd_packet(
1190                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1191             self.test_session.send_packet()
1192             events = self.vapi.collect_events()
1193             self.assert_equal(len(events), 0, "number of bfd events")
1194
1195     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1196     def test_echo_source_removed(self):
1197         """ echo function stops if echo source is removed """
1198         bfd_session_up(self)
1199         self.test_session.update(required_min_echo_rx=150000)
1200         self.test_session.send_packet()
1201         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1202         # wait for first echo packet
1203         while True:
1204             p = self.pg0.wait_for_packet(1)
1205             self.logger.debug(ppp("Got packet:", p))
1206             if p[UDP].dport == BFD.udp_dport_echo:
1207                 self.logger.debug(ppp("Looping back packet:", p))
1208                 p[Ether].dst = self.pg0.local_mac
1209                 self.pg0.add_stream(p)
1210                 self.pg_start()
1211                 break
1212             elif p.haslayer(BFD):
1213                 # ignore BFD
1214                 pass
1215             else:
1216                 raise Exception(ppp("Received unknown packet:", p))
1217         self.vapi.bfd_udp_del_echo_source()
1218         self.test_session.send_packet()
1219         # echo packets shouldn't arrive anymore
1220         for dummy in range(5):
1221             wait_for_bfd_packet(
1222                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1223             self.test_session.send_packet()
1224             events = self.vapi.collect_events()
1225             self.assert_equal(len(events), 0, "number of bfd events")
1226
1227     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1228     def test_stale_echo(self):
1229         """ stale echo packets don't keep a session up """
1230         bfd_session_up(self)
1231         self.test_session.update(required_min_echo_rx=150000)
1232         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1233         self.test_session.send_packet()
1234         # should be turned on - loopback echo packets
1235         echo_packet = None
1236         timeout_at = None
1237         timeout_ok = False
1238         for dummy in range(10 * self.vpp_session.detect_mult):
1239             p = self.pg0.wait_for_packet(1)
1240             if p[UDP].dport == BFD.udp_dport_echo:
1241                 if echo_packet is None:
1242                     self.logger.debug(ppp("Got first echo packet:", p))
1243                     echo_packet = p
1244                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1245                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1246                 else:
1247                     self.logger.debug(ppp("Got followup echo packet:", p))
1248                 self.logger.debug(ppp("Looping back first echo packet:", p))
1249                 echo_packet[Ether].dst = self.pg0.local_mac
1250                 self.pg0.add_stream(echo_packet)
1251                 self.pg_start()
1252             elif p.haslayer(BFD):
1253                 self.logger.debug(ppp("Got packet:", p))
1254                 if "P" in p.sprintf("%BFD.flags%"):
1255                     final = self.test_session.create_packet()
1256                     final[BFD].flags = "F"
1257                     self.test_session.send_packet(final)
1258                 if p[BFD].state == BFDState.down:
1259                     self.assertIsNotNone(
1260                         timeout_at,
1261                         "Session went down before first echo packet received")
1262                     now = time.time()
1263                     self.assertGreaterEqual(
1264                         now, timeout_at,
1265                         "Session timeout at %s, but is expected at %s" %
1266                         (now, timeout_at))
1267                     self.assert_equal(p[BFD].diag,
1268                                       BFDDiagCode.echo_function_failed,
1269                                       BFDDiagCode)
1270                     events = self.vapi.collect_events()
1271                     self.assert_equal(len(events), 1, "number of bfd events")
1272                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1273                     timeout_ok = True
1274                     break
1275             else:
1276                 raise Exception(ppp("Received unknown packet:", p))
1277             self.test_session.send_packet()
1278         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1279
1280     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1281     def test_invalid_echo_checksum(self):
1282         """ echo packets with invalid checksum don't keep a session up """
1283         bfd_session_up(self)
1284         self.test_session.update(required_min_echo_rx=150000)
1285         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1286         self.test_session.send_packet()
1287         # should be turned on - loopback echo packets
1288         timeout_at = None
1289         timeout_ok = False
1290         for dummy in range(10 * self.vpp_session.detect_mult):
1291             p = self.pg0.wait_for_packet(1)
1292             if p[UDP].dport == BFD.udp_dport_echo:
1293                 self.logger.debug(ppp("Got echo packet:", p))
1294                 if timeout_at is None:
1295                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1296                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1297                 p[BFD_vpp_echo].checksum = getrandbits(64)
1298                 p[Ether].dst = self.pg0.local_mac
1299                 self.logger.debug(ppp("Looping back modified echo packet:", p))
1300                 self.pg0.add_stream(p)
1301                 self.pg_start()
1302             elif p.haslayer(BFD):
1303                 self.logger.debug(ppp("Got packet:", p))
1304                 if "P" in p.sprintf("%BFD.flags%"):
1305                     final = self.test_session.create_packet()
1306                     final[BFD].flags = "F"
1307                     self.test_session.send_packet(final)
1308                 if p[BFD].state == BFDState.down:
1309                     self.assertIsNotNone(
1310                         timeout_at,
1311                         "Session went down before first echo packet received")
1312                     now = time.time()
1313                     self.assertGreaterEqual(
1314                         now, timeout_at,
1315                         "Session timeout at %s, but is expected at %s" %
1316                         (now, timeout_at))
1317                     self.assert_equal(p[BFD].diag,
1318                                       BFDDiagCode.echo_function_failed,
1319                                       BFDDiagCode)
1320                     events = self.vapi.collect_events()
1321                     self.assert_equal(len(events), 1, "number of bfd events")
1322                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1323                     timeout_ok = True
1324                     break
1325             else:
1326                 raise Exception(ppp("Received unknown packet:", p))
1327             self.test_session.send_packet()
1328         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1329
1330     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1331     def test_admin_up_down(self):
1332         """ put session admin-up and admin-down """
1333         bfd_session_up(self)
1334         self.vpp_session.admin_down()
1335         self.pg0.enable_capture()
1336         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1337         verify_event(self, e, expected_state=BFDState.admin_down)
1338         for dummy in range(2):
1339             p = wait_for_bfd_packet(self)
1340             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1341         # try to bring session up - shouldn't be possible
1342         self.test_session.update(state=BFDState.init)
1343         self.test_session.send_packet()
1344         for dummy in range(2):
1345             p = wait_for_bfd_packet(self)
1346             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1347         self.vpp_session.admin_up()
1348         self.test_session.update(state=BFDState.down)
1349         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1350         verify_event(self, e, expected_state=BFDState.down)
1351         p = wait_for_bfd_packet(
1352             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1353         self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1354         self.test_session.send_packet()
1355         p = wait_for_bfd_packet(
1356             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1357         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1358         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1359         verify_event(self, e, expected_state=BFDState.init)
1360         self.test_session.update(state=BFDState.up)
1361         self.test_session.send_packet()
1362         p = wait_for_bfd_packet(
1363             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1364         self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1365         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1366         verify_event(self, e, expected_state=BFDState.up)
1367
1368     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1369     def test_config_change_remote_demand(self):
1370         """ configuration change while peer in demand mode """
1371         bfd_session_up(self)
1372         demand = self.test_session.create_packet()
1373         demand[BFD].flags = "D"
1374         self.test_session.send_packet(demand)
1375         self.vpp_session.modify_parameters(
1376             required_min_rx=2 * self.vpp_session.required_min_rx)
1377         p = wait_for_bfd_packet(
1378             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1379         # poll bit must be set
1380         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1381         # terminate poll sequence
1382         final = self.test_session.create_packet()
1383         final[BFD].flags = "D+F"
1384         self.test_session.send_packet(final)
1385         # vpp should be quiet now again
1386         transmit_time = 0.9 \
1387             * max(self.vpp_session.required_min_rx,
1388                   self.test_session.desired_min_tx) \
1389             / USEC_IN_SEC
1390         count = 0
1391         for dummy in range(self.test_session.detect_mult * 2):
1392             time.sleep(transmit_time)
1393             self.test_session.send_packet(demand)
1394             try:
1395                 p = wait_for_bfd_packet(self, timeout=0)
1396                 self.logger.error(ppp("Received unexpected packet:", p))
1397                 count += 1
1398             except CaptureTimeoutError:
1399                 pass
1400         events = self.vapi.collect_events()
1401         for e in events:
1402             self.logger.error("Received unexpected event: %s", e)
1403         self.assert_equal(count, 0, "number of packets received")
1404         self.assert_equal(len(events), 0, "number of events received")
1405
1406
1407 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1408 class BFD6TestCase(VppTestCase):
1409     """Bidirectional Forwarding Detection (BFD) (IPv6) """
1410
1411     pg0 = None
1412     vpp_clock_offset = None
1413     vpp_session = None
1414     test_session = None
1415
1416     @classmethod
1417     def setUpClass(cls):
1418         super(BFD6TestCase, cls).setUpClass()
1419         try:
1420             cls.create_pg_interfaces([0])
1421             cls.pg0.config_ip6()
1422             cls.pg0.configure_ipv6_neighbors()
1423             cls.pg0.admin_up()
1424             cls.pg0.resolve_ndp()
1425             cls.create_loopback_interfaces([0])
1426             cls.loopback0 = cls.lo_interfaces[0]
1427             cls.loopback0.config_ip6()
1428             cls.loopback0.admin_up()
1429
1430         except Exception:
1431             super(BFD6TestCase, cls).tearDownClass()
1432             raise
1433
1434     def setUp(self):
1435         super(BFD6TestCase, self).setUp()
1436         self.factory = AuthKeyFactory()
1437         self.vapi.want_bfd_events()
1438         self.pg0.enable_capture()
1439         try:
1440             self.vpp_session = VppBFDUDPSession(self, self.pg0,
1441                                                 self.pg0.remote_ip6,
1442                                                 af=AF_INET6)
1443             self.vpp_session.add_vpp_config()
1444             self.vpp_session.admin_up()
1445             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1446             self.logger.debug(self.vapi.cli("show adj nbr"))
1447         except:
1448             self.vapi.want_bfd_events(enable_disable=0)
1449             raise
1450
1451     def tearDown(self):
1452         if not self.vpp_dead:
1453             self.vapi.want_bfd_events(enable_disable=0)
1454         self.vapi.collect_events()  # clear the event queue
1455         super(BFD6TestCase, self).tearDown()
1456
1457     def test_session_up(self):
1458         """ bring BFD session up """
1459         bfd_session_up(self)
1460
1461     def test_session_up_by_ip(self):
1462         """ bring BFD session up - first frame looked up by address pair """
1463         self.logger.info("BFD: Sending Slow control frame")
1464         self.test_session.update(my_discriminator=randint(0, 40000000))
1465         self.test_session.send_packet()
1466         self.pg0.enable_capture()
1467         p = self.pg0.wait_for_packet(1)
1468         self.assert_equal(p[BFD].your_discriminator,
1469                           self.test_session.my_discriminator,
1470                           "BFD - your discriminator")
1471         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1472         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1473                                  state=BFDState.up)
1474         self.logger.info("BFD: Waiting for event")
1475         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1476         verify_event(self, e, expected_state=BFDState.init)
1477         self.logger.info("BFD: Sending Up")
1478         self.test_session.send_packet()
1479         self.logger.info("BFD: Waiting for event")
1480         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1481         verify_event(self, e, expected_state=BFDState.up)
1482         self.logger.info("BFD: Session is Up")
1483         self.test_session.update(state=BFDState.up)
1484         self.test_session.send_packet()
1485         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1486
1487     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1488     def test_hold_up(self):
1489         """ hold BFD session up """
1490         bfd_session_up(self)
1491         for dummy in range(self.test_session.detect_mult * 2):
1492             wait_for_bfd_packet(self)
1493             self.test_session.send_packet()
1494         self.assert_equal(len(self.vapi.collect_events()), 0,
1495                           "number of bfd events")
1496         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1497
1498     def test_echo_looped_back(self):
1499         """ echo packets looped back """
1500         # don't need a session in this case..
1501         self.vpp_session.remove_vpp_config()
1502         self.pg0.enable_capture()
1503         echo_packet_count = 10
1504         # random source port low enough to increment a few times..
1505         udp_sport_tx = randint(1, 50000)
1506         udp_sport_rx = udp_sport_tx
1507         echo_packet = (Ether(src=self.pg0.remote_mac,
1508                              dst=self.pg0.local_mac) /
1509                        IPv6(src=self.pg0.remote_ip6,
1510                             dst=self.pg0.remote_ip6) /
1511                        UDP(dport=BFD.udp_dport_echo) /
1512                        Raw("this should be looped back"))
1513         for dummy in range(echo_packet_count):
1514             self.sleep(.01, "delay between echo packets")
1515             echo_packet[UDP].sport = udp_sport_tx
1516             udp_sport_tx += 1
1517             self.logger.debug(ppp("Sending packet:", echo_packet))
1518             self.pg0.add_stream(echo_packet)
1519             self.pg_start()
1520         for dummy in range(echo_packet_count):
1521             p = self.pg0.wait_for_packet(1)
1522             self.logger.debug(ppp("Got packet:", p))
1523             ether = p[Ether]
1524             self.assert_equal(self.pg0.remote_mac,
1525                               ether.dst, "Destination MAC")
1526             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1527             ip = p[IPv6]
1528             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1529             self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1530             udp = p[UDP]
1531             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1532                               "UDP destination port")
1533             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1534             udp_sport_rx += 1
1535             # need to compare the hex payload here, otherwise BFD_vpp_echo
1536             # gets in way
1537             self.assertEqual(str(p[UDP].payload),
1538                              str(echo_packet[UDP].payload),
1539                              "Received packet is not the echo packet sent")
1540         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1541                           "ECHO packet identifier for test purposes)")
1542         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1543                           "ECHO packet identifier for test purposes)")
1544
1545     def test_echo(self):
1546         """ echo function """
1547         bfd_session_up(self)
1548         self.test_session.update(required_min_echo_rx=150000)
1549         self.test_session.send_packet()
1550         detection_time = self.test_session.detect_mult *\
1551             self.vpp_session.required_min_rx / USEC_IN_SEC
1552         # echo shouldn't work without echo source set
1553         for dummy in range(10):
1554             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1555             self.sleep(sleep, "delay before sending bfd packet")
1556             self.test_session.send_packet()
1557         p = wait_for_bfd_packet(
1558             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1559         self.assert_equal(p[BFD].required_min_rx_interval,
1560                           self.vpp_session.required_min_rx,
1561                           "BFD required min rx interval")
1562         self.test_session.send_packet()
1563         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1564         echo_seen = False
1565         # should be turned on - loopback echo packets
1566         for dummy in range(3):
1567             loop_until = time.time() + 0.75 * detection_time
1568             while time.time() < loop_until:
1569                 p = self.pg0.wait_for_packet(1)
1570                 self.logger.debug(ppp("Got packet:", p))
1571                 if p[UDP].dport == BFD.udp_dport_echo:
1572                     self.assert_equal(
1573                         p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1574                     self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1575                                         "BFD ECHO src IP equal to loopback IP")
1576                     self.logger.debug(ppp("Looping back packet:", p))
1577                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1578                                       "ECHO packet destination MAC address")
1579                     p[Ether].dst = self.pg0.local_mac
1580                     self.pg0.add_stream(p)
1581                     self.pg_start()
1582                     echo_seen = True
1583                 elif p.haslayer(BFD):
1584                     if echo_seen:
1585                         self.assertGreaterEqual(
1586                             p[BFD].required_min_rx_interval,
1587                             1000000)
1588                     if "P" in p.sprintf("%BFD.flags%"):
1589                         final = self.test_session.create_packet()
1590                         final[BFD].flags = "F"
1591                         self.test_session.send_packet(final)
1592                 else:
1593                     raise Exception(ppp("Received unknown packet:", p))
1594
1595                 self.assert_equal(len(self.vapi.collect_events()), 0,
1596                                   "number of bfd events")
1597             self.test_session.send_packet()
1598         self.assertTrue(echo_seen, "No echo packets received")
1599
1600
1601 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1602 class BFDFIBTestCase(VppTestCase):
1603     """ BFD-FIB interactions (IPv6) """
1604
1605     vpp_session = None
1606     test_session = None
1607
1608     def setUp(self):
1609         super(BFDFIBTestCase, self).setUp()
1610         self.create_pg_interfaces(range(1))
1611
1612         self.vapi.want_bfd_events()
1613         self.pg0.enable_capture()
1614
1615         for i in self.pg_interfaces:
1616             i.admin_up()
1617             i.config_ip6()
1618             i.configure_ipv6_neighbors()
1619
1620     def tearDown(self):
1621         if not self.vpp_dead:
1622             self.vapi.want_bfd_events(enable_disable=0)
1623
1624         super(BFDFIBTestCase, self).tearDown()
1625
1626     @staticmethod
1627     def pkt_is_not_data_traffic(p):
1628         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1629         if p.haslayer(BFD) or is_ipv6_misc(p):
1630             return True
1631         return False
1632
1633     def test_session_with_fib(self):
1634         """ BFD-FIB interactions """
1635
1636         # packets to match against both of the routes
1637         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1638               IPv6(src="3001::1", dst="2001::1") /
1639               UDP(sport=1234, dport=1234) /
1640               Raw('\xa5' * 100)),
1641              (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1642               IPv6(src="3001::1", dst="2002::1") /
1643               UDP(sport=1234, dport=1234) /
1644               Raw('\xa5' * 100))]
1645
1646         # A recursive and a non-recursive route via a next-hop that
1647         # will have a BFD session
1648         ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1649                                   [VppRoutePath(self.pg0.remote_ip6,
1650                                                 self.pg0.sw_if_index,
1651                                                 is_ip6=1)],
1652                                   is_ip6=1)
1653         ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1654                                   [VppRoutePath(self.pg0.remote_ip6,
1655                                                 0xffffffff,
1656                                                 is_ip6=1)],
1657                                   is_ip6=1)
1658         ip_2001_s_64.add_vpp_config()
1659         ip_2002_s_64.add_vpp_config()
1660
1661         # bring the session up now the routes are present
1662         self.vpp_session = VppBFDUDPSession(self,
1663                                             self.pg0,
1664                                             self.pg0.remote_ip6,
1665                                             af=AF_INET6)
1666         self.vpp_session.add_vpp_config()
1667         self.vpp_session.admin_up()
1668         self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1669
1670         # session is up - traffic passes
1671         bfd_session_up(self)
1672
1673         self.pg0.add_stream(p)
1674         self.pg_start()
1675         for packet in p:
1676             captured = self.pg0.wait_for_packet(
1677                 1,
1678                 filter_out_fn=self.pkt_is_not_data_traffic)
1679             self.assertEqual(captured[IPv6].dst,
1680                              packet[IPv6].dst)
1681
1682         # session is up - traffic is dropped
1683         bfd_session_down(self)
1684
1685         self.pg0.add_stream(p)
1686         self.pg_start()
1687         with self.assertRaises(CaptureTimeoutError):
1688             self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1689
1690         # session is up - traffic passes
1691         bfd_session_up(self)
1692
1693         self.pg0.add_stream(p)
1694         self.pg_start()
1695         for packet in p:
1696             captured = self.pg0.wait_for_packet(
1697                 1,
1698                 filter_out_fn=self.pkt_is_not_data_traffic)
1699             self.assertEqual(captured[IPv6].dst,
1700                              packet[IPv6].dst)
1701
1702
1703 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1704 class BFDSHA1TestCase(VppTestCase):
1705     """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1706
1707     pg0 = None
1708     vpp_clock_offset = None
1709     vpp_session = None
1710     test_session = None
1711
1712     @classmethod
1713     def setUpClass(cls):
1714         super(BFDSHA1TestCase, cls).setUpClass()
1715         try:
1716             cls.create_pg_interfaces([0])
1717             cls.pg0.config_ip4()
1718             cls.pg0.admin_up()
1719             cls.pg0.resolve_arp()
1720
1721         except Exception:
1722             super(BFDSHA1TestCase, cls).tearDownClass()
1723             raise
1724
1725     def setUp(self):
1726         super(BFDSHA1TestCase, self).setUp()
1727         self.factory = AuthKeyFactory()
1728         self.vapi.want_bfd_events()
1729         self.pg0.enable_capture()
1730
1731     def tearDown(self):
1732         if not self.vpp_dead:
1733             self.vapi.want_bfd_events(enable_disable=0)
1734         self.vapi.collect_events()  # clear the event queue
1735         super(BFDSHA1TestCase, self).tearDown()
1736
1737     def test_session_up(self):
1738         """ bring BFD session up """
1739         key = self.factory.create_random_key(self)
1740         key.add_vpp_config()
1741         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1742                                             self.pg0.remote_ip4,
1743                                             sha1_key=key)
1744         self.vpp_session.add_vpp_config()
1745         self.vpp_session.admin_up()
1746         self.test_session = BFDTestSession(
1747             self, self.pg0, AF_INET, sha1_key=key,
1748             bfd_key_id=self.vpp_session.bfd_key_id)
1749         bfd_session_up(self)
1750
1751     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1752     def test_hold_up(self):
1753         """ hold BFD session up """
1754         key = self.factory.create_random_key(self)
1755         key.add_vpp_config()
1756         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1757                                             self.pg0.remote_ip4,
1758                                             sha1_key=key)
1759         self.vpp_session.add_vpp_config()
1760         self.vpp_session.admin_up()
1761         self.test_session = BFDTestSession(
1762             self, self.pg0, AF_INET, sha1_key=key,
1763             bfd_key_id=self.vpp_session.bfd_key_id)
1764         bfd_session_up(self)
1765         for dummy in range(self.test_session.detect_mult * 2):
1766             wait_for_bfd_packet(self)
1767             self.test_session.send_packet()
1768         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1769
1770     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1771     def test_hold_up_meticulous(self):
1772         """ hold BFD session up - meticulous auth """
1773         key = self.factory.create_random_key(
1774             self, BFDAuthType.meticulous_keyed_sha1)
1775         key.add_vpp_config()
1776         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1777                                             self.pg0.remote_ip4, sha1_key=key)
1778         self.vpp_session.add_vpp_config()
1779         self.vpp_session.admin_up()
1780         # specify sequence number so that it wraps
1781         self.test_session = BFDTestSession(
1782             self, self.pg0, AF_INET, sha1_key=key,
1783             bfd_key_id=self.vpp_session.bfd_key_id,
1784             our_seq_number=0xFFFFFFFF - 4)
1785         bfd_session_up(self)
1786         for dummy in range(30):
1787             wait_for_bfd_packet(self)
1788             self.test_session.inc_seq_num()
1789             self.test_session.send_packet()
1790         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1791
1792     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1793     def test_send_bad_seq_number(self):
1794         """ session is not kept alive by msgs with bad sequence numbers"""
1795         key = self.factory.create_random_key(
1796             self, BFDAuthType.meticulous_keyed_sha1)
1797         key.add_vpp_config()
1798         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1799                                             self.pg0.remote_ip4, sha1_key=key)
1800         self.vpp_session.add_vpp_config()
1801         self.test_session = BFDTestSession(
1802             self, self.pg0, AF_INET, sha1_key=key,
1803             bfd_key_id=self.vpp_session.bfd_key_id)
1804         bfd_session_up(self)
1805         detection_time = self.test_session.detect_mult *\
1806             self.vpp_session.required_min_rx / USEC_IN_SEC
1807         send_until = time.time() + 2 * detection_time
1808         while time.time() < send_until:
1809             self.test_session.send_packet()
1810             self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1811                        "time between bfd packets")
1812         e = self.vapi.collect_events()
1813         # session should be down now, because the sequence numbers weren't
1814         # updated
1815         self.assert_equal(len(e), 1, "number of bfd events")
1816         verify_event(self, e[0], expected_state=BFDState.down)
1817
1818     def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1819                                        legitimate_test_session,
1820                                        rogue_test_session,
1821                                        rogue_bfd_values=None):
1822         """ execute a rogue session interaction scenario
1823
1824         1. create vpp session, add config
1825         2. bring the legitimate session up
1826         3. copy the bfd values from legitimate session to rogue session
1827         4. apply rogue_bfd_values to rogue session
1828         5. set rogue session state to down
1829         6. send message to take the session down from the rogue session
1830         7. assert that the legitimate session is unaffected
1831         """
1832
1833         self.vpp_session = vpp_bfd_udp_session
1834         self.vpp_session.add_vpp_config()
1835         self.test_session = legitimate_test_session
1836         # bring vpp session up
1837         bfd_session_up(self)
1838         # send packet from rogue session
1839         rogue_test_session.update(
1840             my_discriminator=self.test_session.my_discriminator,
1841             your_discriminator=self.test_session.your_discriminator,
1842             desired_min_tx=self.test_session.desired_min_tx,
1843             required_min_rx=self.test_session.required_min_rx,
1844             detect_mult=self.test_session.detect_mult,
1845             diag=self.test_session.diag,
1846             state=self.test_session.state,
1847             auth_type=self.test_session.auth_type)
1848         if rogue_bfd_values:
1849             rogue_test_session.update(**rogue_bfd_values)
1850         rogue_test_session.update(state=BFDState.down)
1851         rogue_test_session.send_packet()
1852         wait_for_bfd_packet(self)
1853         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1854
1855     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1856     def test_mismatch_auth(self):
1857         """ session is not brought down by unauthenticated msg """
1858         key = self.factory.create_random_key(self)
1859         key.add_vpp_config()
1860         vpp_session = VppBFDUDPSession(
1861             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1862         legitimate_test_session = BFDTestSession(
1863             self, self.pg0, AF_INET, sha1_key=key,
1864             bfd_key_id=vpp_session.bfd_key_id)
1865         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
1866         self.execute_rogue_session_scenario(vpp_session,
1867                                             legitimate_test_session,
1868                                             rogue_test_session)
1869
1870     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1871     def test_mismatch_bfd_key_id(self):
1872         """ session is not brought down by msg with non-existent key-id """
1873         key = self.factory.create_random_key(self)
1874         key.add_vpp_config()
1875         vpp_session = VppBFDUDPSession(
1876             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1877         # pick a different random bfd key id
1878         x = randint(0, 255)
1879         while x == vpp_session.bfd_key_id:
1880             x = randint(0, 255)
1881         legitimate_test_session = BFDTestSession(
1882             self, self.pg0, AF_INET, sha1_key=key,
1883             bfd_key_id=vpp_session.bfd_key_id)
1884         rogue_test_session = BFDTestSession(
1885             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
1886         self.execute_rogue_session_scenario(vpp_session,
1887                                             legitimate_test_session,
1888                                             rogue_test_session)
1889
1890     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1891     def test_mismatched_auth_type(self):
1892         """ session is not brought down by msg with wrong auth type """
1893         key = self.factory.create_random_key(self)
1894         key.add_vpp_config()
1895         vpp_session = VppBFDUDPSession(
1896             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1897         legitimate_test_session = BFDTestSession(
1898             self, self.pg0, AF_INET, sha1_key=key,
1899             bfd_key_id=vpp_session.bfd_key_id)
1900         rogue_test_session = BFDTestSession(
1901             self, self.pg0, AF_INET, sha1_key=key,
1902             bfd_key_id=vpp_session.bfd_key_id)
1903         self.execute_rogue_session_scenario(
1904             vpp_session, legitimate_test_session, rogue_test_session,
1905             {'auth_type': BFDAuthType.keyed_md5})
1906
1907     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1908     def test_restart(self):
1909         """ simulate remote peer restart and resynchronization """
1910         key = self.factory.create_random_key(
1911             self, BFDAuthType.meticulous_keyed_sha1)
1912         key.add_vpp_config()
1913         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1914                                             self.pg0.remote_ip4, sha1_key=key)
1915         self.vpp_session.add_vpp_config()
1916         self.test_session = BFDTestSession(
1917             self, self.pg0, AF_INET, sha1_key=key,
1918             bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
1919         bfd_session_up(self)
1920         # don't send any packets for 2*detection_time
1921         detection_time = self.test_session.detect_mult *\
1922             self.vpp_session.required_min_rx / USEC_IN_SEC
1923         self.sleep(2 * detection_time, "simulating peer restart")
1924         events = self.vapi.collect_events()
1925         self.assert_equal(len(events), 1, "number of bfd events")
1926         verify_event(self, events[0], expected_state=BFDState.down)
1927         self.test_session.update(state=BFDState.down)
1928         # reset sequence number
1929         self.test_session.our_seq_number = 0
1930         self.test_session.vpp_seq_number = None
1931         # now throw away any pending packets
1932         self.pg0.enable_capture()
1933         bfd_session_up(self)
1934
1935
1936 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1937 class BFDAuthOnOffTestCase(VppTestCase):
1938     """Bidirectional Forwarding Detection (BFD) (changing auth) """
1939
1940     pg0 = None
1941     vpp_session = None
1942     test_session = None
1943
1944     @classmethod
1945     def setUpClass(cls):
1946         super(BFDAuthOnOffTestCase, cls).setUpClass()
1947         try:
1948             cls.create_pg_interfaces([0])
1949             cls.pg0.config_ip4()
1950             cls.pg0.admin_up()
1951             cls.pg0.resolve_arp()
1952
1953         except Exception:
1954             super(BFDAuthOnOffTestCase, cls).tearDownClass()
1955             raise
1956
1957     def setUp(self):
1958         super(BFDAuthOnOffTestCase, self).setUp()
1959         self.factory = AuthKeyFactory()
1960         self.vapi.want_bfd_events()
1961         self.pg0.enable_capture()
1962
1963     def tearDown(self):
1964         if not self.vpp_dead:
1965             self.vapi.want_bfd_events(enable_disable=0)
1966         self.vapi.collect_events()  # clear the event queue
1967         super(BFDAuthOnOffTestCase, self).tearDown()
1968
1969     def test_auth_on_immediate(self):
1970         """ turn auth on without disturbing session state (immediate) """
1971         key = self.factory.create_random_key(self)
1972         key.add_vpp_config()
1973         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1974                                             self.pg0.remote_ip4)
1975         self.vpp_session.add_vpp_config()
1976         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1977         bfd_session_up(self)
1978         for dummy in range(self.test_session.detect_mult * 2):
1979             p = wait_for_bfd_packet(self)
1980             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1981             self.test_session.send_packet()
1982         self.vpp_session.activate_auth(key)
1983         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1984         self.test_session.sha1_key = key
1985         for dummy in range(self.test_session.detect_mult * 2):
1986             p = wait_for_bfd_packet(self)
1987             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1988             self.test_session.send_packet()
1989         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1990         self.assert_equal(len(self.vapi.collect_events()), 0,
1991                           "number of bfd events")
1992
1993     def test_auth_off_immediate(self):
1994         """ turn auth off without disturbing session state (immediate) """
1995         key = self.factory.create_random_key(self)
1996         key.add_vpp_config()
1997         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1998                                             self.pg0.remote_ip4, sha1_key=key)
1999         self.vpp_session.add_vpp_config()
2000         self.test_session = BFDTestSession(
2001             self, self.pg0, AF_INET, sha1_key=key,
2002             bfd_key_id=self.vpp_session.bfd_key_id)
2003         bfd_session_up(self)
2004         # self.vapi.want_bfd_events(enable_disable=0)
2005         for dummy in range(self.test_session.detect_mult * 2):
2006             p = wait_for_bfd_packet(self)
2007             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2008             self.test_session.inc_seq_num()
2009             self.test_session.send_packet()
2010         self.vpp_session.deactivate_auth()
2011         self.test_session.bfd_key_id = None
2012         self.test_session.sha1_key = None
2013         for dummy in range(self.test_session.detect_mult * 2):
2014             p = wait_for_bfd_packet(self)
2015             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2016             self.test_session.inc_seq_num()
2017             self.test_session.send_packet()
2018         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2019         self.assert_equal(len(self.vapi.collect_events()), 0,
2020                           "number of bfd events")
2021
2022     def test_auth_change_key_immediate(self):
2023         """ change auth key without disturbing session state (immediate) """
2024         key1 = self.factory.create_random_key(self)
2025         key1.add_vpp_config()
2026         key2 = self.factory.create_random_key(self)
2027         key2.add_vpp_config()
2028         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2029                                             self.pg0.remote_ip4, sha1_key=key1)
2030         self.vpp_session.add_vpp_config()
2031         self.test_session = BFDTestSession(
2032             self, self.pg0, AF_INET, sha1_key=key1,
2033             bfd_key_id=self.vpp_session.bfd_key_id)
2034         bfd_session_up(self)
2035         for dummy in range(self.test_session.detect_mult * 2):
2036             p = wait_for_bfd_packet(self)
2037             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2038             self.test_session.send_packet()
2039         self.vpp_session.activate_auth(key2)
2040         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2041         self.test_session.sha1_key = key2
2042         for dummy in range(self.test_session.detect_mult * 2):
2043             p = wait_for_bfd_packet(self)
2044             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2045             self.test_session.send_packet()
2046         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2047         self.assert_equal(len(self.vapi.collect_events()), 0,
2048                           "number of bfd events")
2049
2050     def test_auth_on_delayed(self):
2051         """ turn auth on without disturbing session state (delayed) """
2052         key = self.factory.create_random_key(self)
2053         key.add_vpp_config()
2054         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2055                                             self.pg0.remote_ip4)
2056         self.vpp_session.add_vpp_config()
2057         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2058         bfd_session_up(self)
2059         for dummy in range(self.test_session.detect_mult * 2):
2060             wait_for_bfd_packet(self)
2061             self.test_session.send_packet()
2062         self.vpp_session.activate_auth(key, delayed=True)
2063         for dummy in range(self.test_session.detect_mult * 2):
2064             p = wait_for_bfd_packet(self)
2065             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2066             self.test_session.send_packet()
2067         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2068         self.test_session.sha1_key = key
2069         self.test_session.send_packet()
2070         for dummy in range(self.test_session.detect_mult * 2):
2071             p = wait_for_bfd_packet(self)
2072             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2073             self.test_session.send_packet()
2074         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2075         self.assert_equal(len(self.vapi.collect_events()), 0,
2076                           "number of bfd events")
2077
2078     def test_auth_off_delayed(self):
2079         """ turn auth off without disturbing session state (delayed) """
2080         key = self.factory.create_random_key(self)
2081         key.add_vpp_config()
2082         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2083                                             self.pg0.remote_ip4, sha1_key=key)
2084         self.vpp_session.add_vpp_config()
2085         self.test_session = BFDTestSession(
2086             self, self.pg0, AF_INET, sha1_key=key,
2087             bfd_key_id=self.vpp_session.bfd_key_id)
2088         bfd_session_up(self)
2089         for dummy in range(self.test_session.detect_mult * 2):
2090             p = wait_for_bfd_packet(self)
2091             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2092             self.test_session.send_packet()
2093         self.vpp_session.deactivate_auth(delayed=True)
2094         for dummy in range(self.test_session.detect_mult * 2):
2095             p = wait_for_bfd_packet(self)
2096             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2097             self.test_session.send_packet()
2098         self.test_session.bfd_key_id = None
2099         self.test_session.sha1_key = None
2100         self.test_session.send_packet()
2101         for dummy in range(self.test_session.detect_mult * 2):
2102             p = wait_for_bfd_packet(self)
2103             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2104             self.test_session.send_packet()
2105         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2106         self.assert_equal(len(self.vapi.collect_events()), 0,
2107                           "number of bfd events")
2108
2109     def test_auth_change_key_delayed(self):
2110         """ change auth key without disturbing session state (delayed) """
2111         key1 = self.factory.create_random_key(self)
2112         key1.add_vpp_config()
2113         key2 = self.factory.create_random_key(self)
2114         key2.add_vpp_config()
2115         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2116                                             self.pg0.remote_ip4, sha1_key=key1)
2117         self.vpp_session.add_vpp_config()
2118         self.vpp_session.admin_up()
2119         self.test_session = BFDTestSession(
2120             self, self.pg0, AF_INET, sha1_key=key1,
2121             bfd_key_id=self.vpp_session.bfd_key_id)
2122         bfd_session_up(self)
2123         for dummy in range(self.test_session.detect_mult * 2):
2124             p = wait_for_bfd_packet(self)
2125             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2126             self.test_session.send_packet()
2127         self.vpp_session.activate_auth(key2, delayed=True)
2128         for dummy in range(self.test_session.detect_mult * 2):
2129             p = wait_for_bfd_packet(self)
2130             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2131             self.test_session.send_packet()
2132         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2133         self.test_session.sha1_key = key2
2134         self.test_session.send_packet()
2135         for dummy in range(self.test_session.detect_mult * 2):
2136             p = wait_for_bfd_packet(self)
2137             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2138             self.test_session.send_packet()
2139         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2140         self.assert_equal(len(self.vapi.collect_events()), 0,
2141                           "number of bfd events")
2142
2143
2144 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2145 class BFDCLITestCase(VppTestCase):
2146     """Bidirectional Forwarding Detection (BFD) (CLI) """
2147     pg0 = None
2148
2149     @classmethod
2150     def setUpClass(cls):
2151         super(BFDCLITestCase, cls).setUpClass()
2152
2153         try:
2154             cls.create_pg_interfaces((0,))
2155             cls.pg0.config_ip4()
2156             cls.pg0.config_ip6()
2157             cls.pg0.resolve_arp()
2158             cls.pg0.resolve_ndp()
2159
2160         except Exception:
2161             super(BFDCLITestCase, cls).tearDownClass()
2162             raise
2163
2164     def setUp(self):
2165         super(BFDCLITestCase, self).setUp()
2166         self.factory = AuthKeyFactory()
2167         self.pg0.enable_capture()
2168
2169     def tearDown(self):
2170         try:
2171             self.vapi.want_bfd_events(enable_disable=0)
2172         except UnexpectedApiReturnValueError:
2173             # some tests aren't subscribed, so this is not an issue
2174             pass
2175         self.vapi.collect_events()  # clear the event queue
2176         super(BFDCLITestCase, self).tearDown()
2177
2178     def cli_verify_no_response(self, cli):
2179         """ execute a CLI, asserting that the response is empty """
2180         self.assert_equal(self.vapi.cli(cli),
2181                           "",
2182                           "CLI command response")
2183
2184     def cli_verify_response(self, cli, expected):
2185         """ execute a CLI, asserting that the response matches expectation """
2186         self.assert_equal(self.vapi.cli(cli).strip(),
2187                           expected,
2188                           "CLI command response")
2189
2190     def test_show(self):
2191         """ show commands """
2192         k1 = self.factory.create_random_key(self)
2193         k1.add_vpp_config()
2194         k2 = self.factory.create_random_key(
2195             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2196         k2.add_vpp_config()
2197         s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2198         s1.add_vpp_config()
2199         s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2200                               sha1_key=k2)
2201         s2.add_vpp_config()
2202         self.logger.info(self.vapi.ppcli("show bfd keys"))
2203         self.logger.info(self.vapi.ppcli("show bfd sessions"))
2204         self.logger.info(self.vapi.ppcli("show bfd"))
2205
2206     def test_set_del_sha1_key(self):
2207         """ set/delete SHA1 auth key """
2208         k = self.factory.create_random_key(self)
2209         self.registry.register(k, self.logger)
2210         self.cli_verify_no_response(
2211             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2212             (k.conf_key_id,
2213                 "".join("{:02x}".format(ord(c)) for c in k.key)))
2214         self.assertTrue(k.query_vpp_config())
2215         self.vpp_session = VppBFDUDPSession(
2216             self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2217         self.vpp_session.add_vpp_config()
2218         self.test_session = \
2219             BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2220                            bfd_key_id=self.vpp_session.bfd_key_id)
2221         self.vapi.want_bfd_events()
2222         bfd_session_up(self)
2223         bfd_session_down(self)
2224         # try to replace the secret for the key - should fail because the key
2225         # is in-use
2226         k2 = self.factory.create_random_key(self)
2227         self.cli_verify_response(
2228             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2229             (k.conf_key_id,
2230                 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2231             "bfd key set: `bfd_auth_set_key' API call failed, "
2232             "rv=-103:BFD object in use")
2233         # manipulating the session using old secret should still work
2234         bfd_session_up(self)
2235         bfd_session_down(self)
2236         self.vpp_session.remove_vpp_config()
2237         self.cli_verify_no_response(
2238             "bfd key del conf-key-id %s" % k.conf_key_id)
2239         self.assertFalse(k.query_vpp_config())
2240
2241     def test_set_del_meticulous_sha1_key(self):
2242         """ set/delete meticulous SHA1 auth key """
2243         k = self.factory.create_random_key(
2244             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2245         self.registry.register(k, self.logger)
2246         self.cli_verify_no_response(
2247             "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2248             (k.conf_key_id,
2249                 "".join("{:02x}".format(ord(c)) for c in k.key)))
2250         self.assertTrue(k.query_vpp_config())
2251         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2252                                             self.pg0.remote_ip6, af=AF_INET6,
2253                                             sha1_key=k)
2254         self.vpp_session.add_vpp_config()
2255         self.vpp_session.admin_up()
2256         self.test_session = \
2257             BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2258                            bfd_key_id=self.vpp_session.bfd_key_id)
2259         self.vapi.want_bfd_events()
2260         bfd_session_up(self)
2261         bfd_session_down(self)
2262         # try to replace the secret for the key - should fail because the key
2263         # is in-use
2264         k2 = self.factory.create_random_key(self)
2265         self.cli_verify_response(
2266             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2267             (k.conf_key_id,
2268                 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2269             "bfd key set: `bfd_auth_set_key' API call failed, "
2270             "rv=-103:BFD object in use")
2271         # manipulating the session using old secret should still work
2272         bfd_session_up(self)
2273         bfd_session_down(self)
2274         self.vpp_session.remove_vpp_config()
2275         self.cli_verify_no_response(
2276             "bfd key del conf-key-id %s" % k.conf_key_id)
2277         self.assertFalse(k.query_vpp_config())
2278
2279     def test_add_mod_del_bfd_udp(self):
2280         """ create/modify/delete IPv4 BFD UDP session """
2281         vpp_session = VppBFDUDPSession(
2282             self, self.pg0, self.pg0.remote_ip4)
2283         self.registry.register(vpp_session, self.logger)
2284         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2285             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2286             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2287                                 self.pg0.remote_ip4,
2288                                 vpp_session.desired_min_tx,
2289                                 vpp_session.required_min_rx,
2290                                 vpp_session.detect_mult)
2291         self.cli_verify_no_response(cli_add_cmd)
2292         # 2nd add should fail
2293         self.cli_verify_response(
2294             cli_add_cmd,
2295             "bfd udp session add: `bfd_add_add_session' API call"
2296             " failed, rv=-101:Duplicate BFD object")
2297         verify_bfd_session_config(self, vpp_session)
2298         mod_session = VppBFDUDPSession(
2299             self, self.pg0, self.pg0.remote_ip4,
2300             required_min_rx=2 * vpp_session.required_min_rx,
2301             desired_min_tx=3 * vpp_session.desired_min_tx,
2302             detect_mult=4 * vpp_session.detect_mult)
2303         self.cli_verify_no_response(
2304             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2305             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2306             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2307              mod_session.desired_min_tx, mod_session.required_min_rx,
2308              mod_session.detect_mult))
2309         verify_bfd_session_config(self, mod_session)
2310         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2311             "peer-addr %s" % (self.pg0.name,
2312                               self.pg0.local_ip4, self.pg0.remote_ip4)
2313         self.cli_verify_no_response(cli_del_cmd)
2314         # 2nd del is expected to fail
2315         self.cli_verify_response(
2316             cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2317             " failed, rv=-102:No such BFD object")
2318         self.assertFalse(vpp_session.query_vpp_config())
2319
2320     def test_add_mod_del_bfd_udp6(self):
2321         """ create/modify/delete IPv6 BFD UDP session """
2322         vpp_session = VppBFDUDPSession(
2323             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2324         self.registry.register(vpp_session, self.logger)
2325         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2326             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2327             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2328                                 self.pg0.remote_ip6,
2329                                 vpp_session.desired_min_tx,
2330                                 vpp_session.required_min_rx,
2331                                 vpp_session.detect_mult)
2332         self.cli_verify_no_response(cli_add_cmd)
2333         # 2nd add should fail
2334         self.cli_verify_response(
2335             cli_add_cmd,
2336             "bfd udp session add: `bfd_add_add_session' API call"
2337             " failed, rv=-101:Duplicate BFD object")
2338         verify_bfd_session_config(self, vpp_session)
2339         mod_session = VppBFDUDPSession(
2340             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2341             required_min_rx=2 * vpp_session.required_min_rx,
2342             desired_min_tx=3 * vpp_session.desired_min_tx,
2343             detect_mult=4 * vpp_session.detect_mult)
2344         self.cli_verify_no_response(
2345             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2346             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2347             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2348              mod_session.desired_min_tx,
2349              mod_session.required_min_rx, mod_session.detect_mult))
2350         verify_bfd_session_config(self, mod_session)
2351         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2352             "peer-addr %s" % (self.pg0.name,
2353                               self.pg0.local_ip6, self.pg0.remote_ip6)
2354         self.cli_verify_no_response(cli_del_cmd)
2355         # 2nd del is expected to fail
2356         self.cli_verify_response(
2357             cli_del_cmd,
2358             "bfd udp session del: `bfd_udp_del_session' API call"
2359             " failed, rv=-102:No such BFD object")
2360         self.assertFalse(vpp_session.query_vpp_config())
2361
2362     def test_add_mod_del_bfd_udp_auth(self):
2363         """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2364         key = self.factory.create_random_key(self)
2365         key.add_vpp_config()
2366         vpp_session = VppBFDUDPSession(
2367             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2368         self.registry.register(vpp_session, self.logger)
2369         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2370             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2371             "detect-mult %s conf-key-id %s bfd-key-id %s"\
2372             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2373                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2374                vpp_session.detect_mult, key.conf_key_id,
2375                vpp_session.bfd_key_id)
2376         self.cli_verify_no_response(cli_add_cmd)
2377         # 2nd add should fail
2378         self.cli_verify_response(
2379             cli_add_cmd,
2380             "bfd udp session add: `bfd_add_add_session' API call"
2381             " failed, rv=-101:Duplicate BFD object")
2382         verify_bfd_session_config(self, vpp_session)
2383         mod_session = VppBFDUDPSession(
2384             self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2385             bfd_key_id=vpp_session.bfd_key_id,
2386             required_min_rx=2 * vpp_session.required_min_rx,
2387             desired_min_tx=3 * vpp_session.desired_min_tx,
2388             detect_mult=4 * vpp_session.detect_mult)
2389         self.cli_verify_no_response(
2390             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2391             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2392             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2393              mod_session.desired_min_tx,
2394              mod_session.required_min_rx, mod_session.detect_mult))
2395         verify_bfd_session_config(self, mod_session)
2396         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2397             "peer-addr %s" % (self.pg0.name,
2398                               self.pg0.local_ip4, self.pg0.remote_ip4)
2399         self.cli_verify_no_response(cli_del_cmd)
2400         # 2nd del is expected to fail
2401         self.cli_verify_response(
2402             cli_del_cmd,
2403             "bfd udp session del: `bfd_udp_del_session' API call"
2404             " failed, rv=-102:No such BFD object")
2405         self.assertFalse(vpp_session.query_vpp_config())
2406
2407     def test_add_mod_del_bfd_udp6_auth(self):
2408         """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2409         key = self.factory.create_random_key(
2410             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2411         key.add_vpp_config()
2412         vpp_session = VppBFDUDPSession(
2413             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2414         self.registry.register(vpp_session, self.logger)
2415         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2416             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2417             "detect-mult %s conf-key-id %s bfd-key-id %s" \
2418             % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2419                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2420                vpp_session.detect_mult, key.conf_key_id,
2421                vpp_session.bfd_key_id)
2422         self.cli_verify_no_response(cli_add_cmd)
2423         # 2nd add should fail
2424         self.cli_verify_response(
2425             cli_add_cmd,
2426             "bfd udp session add: `bfd_add_add_session' API call"
2427             " failed, rv=-101:Duplicate BFD object")
2428         verify_bfd_session_config(self, vpp_session)
2429         mod_session = VppBFDUDPSession(
2430             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2431             bfd_key_id=vpp_session.bfd_key_id,
2432             required_min_rx=2 * vpp_session.required_min_rx,
2433             desired_min_tx=3 * vpp_session.desired_min_tx,
2434             detect_mult=4 * vpp_session.detect_mult)
2435         self.cli_verify_no_response(
2436             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2437             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2438             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2439              mod_session.desired_min_tx,
2440              mod_session.required_min_rx, mod_session.detect_mult))
2441         verify_bfd_session_config(self, mod_session)
2442         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2443             "peer-addr %s" % (self.pg0.name,
2444                               self.pg0.local_ip6, self.pg0.remote_ip6)
2445         self.cli_verify_no_response(cli_del_cmd)
2446         # 2nd del is expected to fail
2447         self.cli_verify_response(
2448             cli_del_cmd,
2449             "bfd udp session del: `bfd_udp_del_session' API call"
2450             " failed, rv=-102:No such BFD object")
2451         self.assertFalse(vpp_session.query_vpp_config())
2452
2453     def test_auth_on_off(self):
2454         """ turn authentication on and off """
2455         key = self.factory.create_random_key(
2456             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2457         key.add_vpp_config()
2458         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2459         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2460                                         sha1_key=key)
2461         session.add_vpp_config()
2462         cli_activate = \
2463             "bfd udp session auth activate interface %s local-addr %s "\
2464             "peer-addr %s conf-key-id %s bfd-key-id %s"\
2465             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2466                key.conf_key_id, auth_session.bfd_key_id)
2467         self.cli_verify_no_response(cli_activate)
2468         verify_bfd_session_config(self, auth_session)
2469         self.cli_verify_no_response(cli_activate)
2470         verify_bfd_session_config(self, auth_session)
2471         cli_deactivate = \
2472             "bfd udp session auth deactivate interface %s local-addr %s "\
2473             "peer-addr %s "\
2474             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2475         self.cli_verify_no_response(cli_deactivate)
2476         verify_bfd_session_config(self, session)
2477         self.cli_verify_no_response(cli_deactivate)
2478         verify_bfd_session_config(self, session)
2479
2480     def test_auth_on_off_delayed(self):
2481         """ turn authentication on and off (delayed) """
2482         key = self.factory.create_random_key(
2483             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2484         key.add_vpp_config()
2485         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2486         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2487                                         sha1_key=key)
2488         session.add_vpp_config()
2489         cli_activate = \
2490             "bfd udp session auth activate interface %s local-addr %s "\
2491             "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2492             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2493                key.conf_key_id, auth_session.bfd_key_id)
2494         self.cli_verify_no_response(cli_activate)
2495         verify_bfd_session_config(self, auth_session)
2496         self.cli_verify_no_response(cli_activate)
2497         verify_bfd_session_config(self, auth_session)
2498         cli_deactivate = \
2499             "bfd udp session auth deactivate interface %s local-addr %s "\
2500             "peer-addr %s delayed yes"\
2501             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2502         self.cli_verify_no_response(cli_deactivate)
2503         verify_bfd_session_config(self, session)
2504         self.cli_verify_no_response(cli_deactivate)
2505         verify_bfd_session_config(self, session)
2506
2507     def test_admin_up_down(self):
2508         """ put session admin-up and admin-down """
2509         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2510         session.add_vpp_config()
2511         cli_down = \
2512             "bfd udp session set-flags admin down interface %s local-addr %s "\
2513             "peer-addr %s "\
2514             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2515         cli_up = \
2516             "bfd udp session set-flags admin up interface %s local-addr %s "\
2517             "peer-addr %s "\
2518             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2519         self.cli_verify_no_response(cli_down)
2520         verify_bfd_session_config(self, session, state=BFDState.admin_down)
2521         self.cli_verify_no_response(cli_up)
2522         verify_bfd_session_config(self, session, state=BFDState.down)
2523
2524     def test_set_del_udp_echo_source(self):
2525         """ set/del udp echo source """
2526         self.create_loopback_interfaces([0])
2527         self.loopback0 = self.lo_interfaces[0]
2528         self.loopback0.admin_up()
2529         self.cli_verify_response("show bfd echo-source",
2530                                  "UDP echo source is not set.")
2531         cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2532         self.cli_verify_no_response(cli_set)
2533         self.cli_verify_response("show bfd echo-source",
2534                                  "UDP echo source is: %s\n"
2535                                  "IPv4 address usable as echo source: none\n"
2536                                  "IPv6 address usable as echo source: none" %
2537                                  self.loopback0.name)
2538         self.loopback0.config_ip4()
2539         unpacked = unpack("!L", self.loopback0.local_ip4n)
2540         echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2541         self.cli_verify_response("show bfd echo-source",
2542                                  "UDP echo source is: %s\n"
2543                                  "IPv4 address usable as echo source: %s\n"
2544                                  "IPv6 address usable as echo source: none" %
2545                                  (self.loopback0.name, echo_ip4))
2546         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2547         echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2548                                             unpacked[2], unpacked[3] ^ 1))
2549         self.loopback0.config_ip6()
2550         self.cli_verify_response("show bfd echo-source",
2551                                  "UDP echo source is: %s\n"
2552                                  "IPv4 address usable as echo source: %s\n"
2553                                  "IPv6 address usable as echo source: %s" %
2554                                  (self.loopback0.name, echo_ip4, echo_ip6))
2555         cli_del = "bfd udp echo-source del"
2556         self.cli_verify_no_response(cli_del)
2557         self.cli_verify_response("show bfd echo-source",
2558                                  "UDP echo source is not set.")
2559
2560 if __name__ == '__main__':
2561     unittest.main(testRunner=VppTestRunner)