vcl: allow more rx events on peek
[vpp.git] / test / test_syslog.py
1 #!/usr/bin/env python3
2
3 import unittest
4 from framework import VppTestCase
5 from asfframework import VppTestRunner
6 from util import ppp
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from syslog_rfc5424_parser import SyslogMessage, ParseError
10 from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity
11 from vpp_papi import VppEnum
12
13
14 class TestSyslog(VppTestCase):
15     """Syslog Protocol Test Cases"""
16
17     @property
18     def SYSLOG_SEVERITY(self):
19         return VppEnum.vl_api_syslog_severity_t
20
21     @classmethod
22     def setUpClass(cls):
23         super(TestSyslog, cls).setUpClass()
24
25         try:
26             (cls.pg0,) = cls.create_pg_interfaces(range(1))
27             cls.pg0.admin_up()
28             cls.pg0.config_ip4()
29             cls.pg0.resolve_arp()
30
31         except Exception:
32             super(TestSyslog, cls).tearDownClass()
33             raise
34
35     @classmethod
36     def tearDownClass(cls):
37         super(TestSyslog, cls).tearDownClass()
38
39     def syslog_generate(self, facility, severity, appname, msgid, sd=None, msg=None):
40         """
41         Generate syslog message
42
43         :param facility: facility value
44         :param severity: severity level
45         :param appname: application name that originate message
46         :param msgid: message identifier
47         :param sd: structured data (optional)
48         :param msg: free-form message (optional)
49         """
50         facility_str = [
51             "kernel",
52             "user-level",
53             "mail-system",
54             "system-daemons",
55             "security-authorization",
56             "syslogd",
57             "line-printer",
58             "network-news",
59             "uucp",
60             "clock-daemon",
61             "",
62             "ftp-daemon",
63             "ntp-subsystem",
64             "log-audit",
65             "log-alert",
66             "",
67             "local0",
68             "local1",
69             "local2",
70             "local3",
71             "local4",
72             "local5",
73             "local6",
74             "local7",
75         ]
76
77         severity_str = [
78             "emergency",
79             "alert",
80             "critical",
81             "error",
82             "warning",
83             "notice",
84             "informational",
85             "debug",
86         ]
87
88         cli_str = "test syslog %s %s %s %s" % (
89             facility_str[facility],
90             severity_str[severity],
91             appname,
92             msgid,
93         )
94         if sd is not None:
95             for sd_id, sd_params in sd.items():
96                 cli_str += " sd-id %s" % (sd_id)
97                 for name, value in sd_params.items():
98                     cli_str += " sd-param %s %s" % (name, value)
99         if msg is not None:
100             cli_str += " %s" % (msg)
101         self.vapi.cli(cli_str)
102
103     def syslog_verify(
104         self, data, facility, severity, appname, msgid, sd=None, msg=None
105     ):
106         """
107         Verify syslog message
108
109         :param data: syslog message
110         :param facility: facility value
111         :param severity: severity level
112         :param appname: application name that originate message
113         :param msgid: message identifier
114         :param sd: structured data (optional)
115         :param msg: free-form message (optional)
116         """
117         message = data.decode("utf-8")
118         if sd is None:
119             sd = {}
120         try:
121             message = SyslogMessage.parse(message)
122         except ParseError as e:
123             self.logger.error(e)
124             raise
125         else:
126             self.assertEqual(message.facility, facility)
127             self.assertEqual(message.severity, severity)
128             self.assertEqual(message.appname, appname)
129             self.assertEqual(message.msgid, msgid)
130             self.assertEqual(message.msg, msg)
131             self.assertEqual(message.sd, sd)
132             self.assertEqual(message.version, 1)
133             self.assertEqual(message.hostname, self.pg0.local_ip4)
134
135     def test_syslog(self):
136         """Syslog Protocol test"""
137         self.vapi.syslog_set_sender(
138             src_address=self.pg0.local_ip4, collector_address=self.pg0.remote_ip4
139         )
140         config = self.vapi.syslog_get_sender()
141         self.assertEqual(str(config.collector_address), self.pg0.remote_ip4)
142         self.assertEqual(config.collector_port, 514)
143         self.assertEqual(str(config.src_address), self.pg0.local_ip4)
144         self.assertEqual(config.vrf_id, 0)
145         self.assertEqual(config.max_msg_size, 480)
146
147         appname = "test"
148         msgid = "testMsg"
149         msg = "this is message"
150         sd1 = {
151             "exampleSDID@32473": {"iut": "3", "eventSource": "App", "eventID": "1011"}
152         }
153         sd2 = {
154             "exampleSDID@32473": {"iut": "3", "eventSource": "App", "eventID": "1011"},
155             "examplePriority@32473": {"class": "high"},
156         }
157
158         self.pg_enable_capture(self.pg_interfaces)
159         self.syslog_generate(
160             SyslogFacility.local7, SyslogSeverity.info, appname, msgid, None, msg
161         )
162         capture = self.pg0.get_capture(1)
163         try:
164             self.assertEqual(capture[0][IP].src, self.pg0.local_ip4)
165             self.assertEqual(capture[0][IP].dst, self.pg0.remote_ip4)
166             self.assertEqual(capture[0][UDP].dport, 514)
167             self.assert_packet_checksums_valid(capture[0], False)
168         except:
169             self.logger.error(ppp("invalid packet:", capture[0]))
170             raise
171         self.syslog_verify(
172             capture[0][Raw].load,
173             SyslogFacility.local7,
174             SyslogSeverity.info,
175             appname,
176             msgid,
177             None,
178             msg,
179         )
180
181         self.pg_enable_capture(self.pg_interfaces)
182         self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_WARN)
183         filter = self.vapi.syslog_get_filter()
184         self.assertEqual(filter.severity, self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_WARN)
185         self.syslog_generate(
186             SyslogFacility.local7, SyslogSeverity.info, appname, msgid, None, msg
187         )
188         self.pg0.assert_nothing_captured()
189
190         self.pg_enable_capture(self.pg_interfaces)
191         self.syslog_generate(
192             SyslogFacility.local6, SyslogSeverity.warning, appname, msgid, sd1, msg
193         )
194         capture = self.pg0.get_capture(1)
195         self.syslog_verify(
196             capture[0][Raw].load,
197             SyslogFacility.local6,
198             SyslogSeverity.warning,
199             appname,
200             msgid,
201             sd1,
202             msg,
203         )
204
205         self.vapi.syslog_set_sender(
206             self.pg0.local_ip4, self.pg0.remote_ip4, collector_port=12345
207         )
208         config = self.vapi.syslog_get_sender()
209         self.assertEqual(config.collector_port, 12345)
210
211         self.pg_enable_capture(self.pg_interfaces)
212         self.syslog_generate(
213             SyslogFacility.local5, SyslogSeverity.err, appname, msgid, sd2, None
214         )
215         capture = self.pg0.get_capture(1)
216         try:
217             self.assertEqual(capture[0][UDP].dport, 12345)
218         except:
219             self.logger.error(ppp("invalid packet:", capture[0]))
220             raise
221         self.syslog_verify(
222             capture[0][Raw].load,
223             SyslogFacility.local5,
224             SyslogSeverity.err,
225             appname,
226             msgid,
227             sd2,
228             None,
229         )
230
231
232 if __name__ == "__main__":
233     unittest.main(testRunner=VppTestRunner)