1 # Copyright (c) 2018 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """Implementation of keywords for managing Honeycomb notifications."""
16 from robot.api import logger
18 from resources.libraries.python.honeycomb.HoneycombUtil import HoneycombError
19 from resources.libraries.python.honeycomb.Netconf import Netconf
22 class Notifications(Netconf):
23 """Implements keywords for receiving Honeycomb notifications.
25 The keywords implemented in this class make it possible to:
26 - receive notifications from Honeycomb
27 - read received notifications
30 def add_notification_listener(self, subscription, time_out=10):
31 """Open a new channel on the SSH session, connect to Netconf subsystem
32 and subscribe to receive Honeycomb notifications.
34 :param subscription: RPC for subscription to notifications.
35 :param time_out: Timeout value for each read operation in seconds.
36 :type subscription: str
38 :raises HoneycombError: If subscription to notifications fails.
41 logger.debug(subscription)
42 self.send(subscription)
44 reply = self.get_response(
46 err="Timeout on notifications subscription."
49 if "<ok/>" not in reply:
50 raise HoneycombError("Notifications subscription failed with"
51 " message: {0}".format(reply))
53 logger.debug("Notifications subscription successful.")
55 def get_notification(self, time_out=10):
56 """Read and return the next notification message.
58 :param time_out: Timeout value for the read operation in seconds.
60 :returns: Data received from buffer.
64 logger.debug("Getting notification. Timeout set to {0} seconds."
67 reply = self.get_response(
69 err="Timeout on getting notification."