4 from framework import VppTestCase
5 from asfframework import VppTestRunner
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from syslog_rfc5424_parser import SyslogMessage, ParseError
10 from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity
11 from vpp_papi import VppEnum
14 class TestSyslog(VppTestCase):
15 """Syslog Protocol Test Cases"""
18 def SYSLOG_SEVERITY(self):
19 return VppEnum.vl_api_syslog_severity_t
23 super(TestSyslog, cls).setUpClass()
26 (cls.pg0,) = cls.create_pg_interfaces(range(1))
32 super(TestSyslog, cls).tearDownClass()
36 def tearDownClass(cls):
37 super(TestSyslog, cls).tearDownClass()
39 def syslog_generate(self, facility, severity, appname, msgid, sd=None, msg=None):
41 Generate syslog message
43 :param facility: facility value
44 :param severity: severity level
45 :param appname: application name that originate message
46 :param msgid: message identifier
47 :param sd: structured data (optional)
48 :param msg: free-form message (optional)
55 "security-authorization",
88 cli_str = "test syslog %s %s %s %s" % (
89 facility_str[facility],
90 severity_str[severity],
95 for sd_id, sd_params in sd.items():
96 cli_str += " sd-id %s" % (sd_id)
97 for name, value in sd_params.items():
98 cli_str += " sd-param %s %s" % (name, value)
100 cli_str += " %s" % (msg)
101 self.vapi.cli(cli_str)
104 self, data, facility, severity, appname, msgid, sd=None, msg=None
107 Verify syslog message
109 :param data: syslog message
110 :param facility: facility value
111 :param severity: severity level
112 :param appname: application name that originate message
113 :param msgid: message identifier
114 :param sd: structured data (optional)
115 :param msg: free-form message (optional)
117 message = data.decode("utf-8")
121 message = SyslogMessage.parse(message)
122 except ParseError as e:
126 self.assertEqual(message.facility, facility)
127 self.assertEqual(message.severity, severity)
128 self.assertEqual(message.appname, appname)
129 self.assertEqual(message.msgid, msgid)
130 self.assertEqual(message.msg, msg)
131 self.assertEqual(message.sd, sd)
132 self.assertEqual(message.version, 1)
133 self.assertEqual(message.hostname, self.pg0.local_ip4)
135 def test_syslog(self):
136 """Syslog Protocol test"""
137 self.vapi.syslog_set_sender(
138 src_address=self.pg0.local_ip4, collector_address=self.pg0.remote_ip4
140 config = self.vapi.syslog_get_sender()
141 self.assertEqual(str(config.collector_address), self.pg0.remote_ip4)
142 self.assertEqual(config.collector_port, 514)
143 self.assertEqual(str(config.src_address), self.pg0.local_ip4)
144 self.assertEqual(config.vrf_id, 0)
145 self.assertEqual(config.max_msg_size, 480)
149 msg = "this is message"
151 "exampleSDID@32473": {"iut": "3", "eventSource": "App", "eventID": "1011"}
154 "exampleSDID@32473": {"iut": "3", "eventSource": "App", "eventID": "1011"},
155 "examplePriority@32473": {"class": "high"},
158 self.pg_enable_capture(self.pg_interfaces)
159 self.syslog_generate(
160 SyslogFacility.local7, SyslogSeverity.info, appname, msgid, None, msg
162 capture = self.pg0.get_capture(1)
164 self.assertEqual(capture[0][IP].src, self.pg0.local_ip4)
165 self.assertEqual(capture[0][IP].dst, self.pg0.remote_ip4)
166 self.assertEqual(capture[0][UDP].dport, 514)
167 self.assert_packet_checksums_valid(capture[0], False)
169 self.logger.error(ppp("invalid packet:", capture[0]))
172 capture[0][Raw].load,
173 SyslogFacility.local7,
181 self.pg_enable_capture(self.pg_interfaces)
182 self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_WARN)
183 filter = self.vapi.syslog_get_filter()
184 self.assertEqual(filter.severity, self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_WARN)
185 self.syslog_generate(
186 SyslogFacility.local7, SyslogSeverity.info, appname, msgid, None, msg
188 self.pg0.assert_nothing_captured()
190 self.pg_enable_capture(self.pg_interfaces)
191 self.syslog_generate(
192 SyslogFacility.local6, SyslogSeverity.warning, appname, msgid, sd1, msg
194 capture = self.pg0.get_capture(1)
196 capture[0][Raw].load,
197 SyslogFacility.local6,
198 SyslogSeverity.warning,
205 self.vapi.syslog_set_sender(
206 self.pg0.local_ip4, self.pg0.remote_ip4, collector_port=12345
208 config = self.vapi.syslog_get_sender()
209 self.assertEqual(config.collector_port, 12345)
211 self.pg_enable_capture(self.pg_interfaces)
212 self.syslog_generate(
213 SyslogFacility.local5, SyslogSeverity.err, appname, msgid, sd2, None
215 capture = self.pg0.get_capture(1)
217 self.assertEqual(capture[0][UDP].dport, 12345)
219 self.logger.error(ppp("invalid packet:", capture[0]))
222 capture[0][Raw].load,
223 SyslogFacility.local5,
232 if __name__ == "__main__":
233 unittest.main(testRunner=VppTestRunner)