# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
ActiveAlert Query provides the ability to view current alert entries from the
alert log viewer.
When creating the query, you must specify a target which speifies the SMC domain
for which to retrieve the alerts.
A basic alert query using a local timezone example::
query = ActiveAlertQuery('Shared Domain')
query.format.timezone('CST')
You can also use standard filters to specify a more exact match, for example, showing
alerts with a severity of CRITICAL::
query.add_in_filter(
FieldValue(LogField.ALERTSEVERITY), [ConstantValue(Alerts.CRITICAL)])
"""
from smc_monitoring.models.query import Query
from smc_monitoring.models.constants import LogField
[docs]
class ActiveAlertQuery(Query):
"""
Active Alert Query is an interface to the alert log viewer in Log Server.
This query type provides the ability to fetch and filter on active
alerts.
You can create a new query specifying a valid timezone abbreviation::
query = ActiveAlertQuery('Shared Domain', timezone='CST')
Or alternatively no timezone::
query = ActiveAlertQuery('DomainFoo')
:param str target: domain for which to filter alerts. Default: 'Shared Domain'
:param str timezone: timezone for timestamps, i.e. 'CST', etc
"""
location = "/monitoring/session/socket"
field_ids = [
LogField.TIMESTAMP,
LogField.ALERTSEVERITY,
LogField.ACTION,
LogField.NODEID,
LogField.SRC,
LogField.DST,
LogField.SERVICE,
LogField.PROTOCOL,
LogField.SPORT,
LogField.DPORT,
LogField.SITUATION,
LogField.VULNERABILITYREFERENCES,
]
def __init__(self, target="Shared Domain", timezone=None):
super(ActiveAlertQuery, self).__init__("ACTIVE_ALERTS", target)
if timezone is not None:
self.format.set_resolving(timezone=timezone)
[docs]
def fetch_as_element(self, **kw):
"""
Fetch the results and return as a User element. The original
query is not modified.
:return: generator returning element instances
:rtype: Alert
"""
clone = self.copy()
clone.format.field_format("id")
for custom_field in ["field_ids", "field_names"]:
clone.format.data.pop(custom_field, None)
for list_of_results in clone.fetch_raw(**kw):
for entry in list_of_results:
yield Alert(**entry)
[docs]
class Alert(object):
"""
Alert definition returned from specified domain.
This is the result of making a :class:`.ActiveAlertQuery` and using
:meth:`~ActiveAlertQuery.fetch_as_element`.
"""
def __init__(self, **data):
self.alert = data
@property
def timestamp(self):
"""
Timestamp of this connection. It is recommended to set the timezone
on the query to view this timestamp in the systems local time.
For example::
query.format.timezone('CST')
:return timestamp in string format
:rtype: str
"""
return self.alert.get(str(LogField.TIMESTAMP))
@property
def engine(self):
"""
The engine/cluster for this state table entry
:return: engine or cluster for this entry
:rtype: str
"""
return self.alert.get(str(LogField.NODEID))
@property
def severity(self):
"""
Severity for this alert
:rtype: str
"""
return self.alert.get(str(LogField.ALERTSEVERITY))
@property
def action(self):
"""
Action performed for the alert
:rtype: str
"""
return self.alert.get(str(LogField.ACTION))
@property
def source(self):
"""
Source IP for the alert
:rtype: str
"""
return self.alert.get(str(LogField.SRC))
@property
def destination(self):
"""
Destination IP for the alert
:rtype: str
"""
return self.alert.get(str(LogField.DST))
@property
def service(self):
"""
Service associated with alert
:rtype: str
"""
return self.alert.get(str(LogField.SERVICE))
@property
def protocol(self):
"""
Protocol for alert
:rtype: str
"""
return self.alert.get(str(LogField.PROTOCOL))
@property
def source_port(self):
"""
Source port for alert
:rtype: int
"""
return int(self.alert.get(str(LogField.SPORT)))
@property
def destination_port(self):
"""
Destination port for alert
:rtype: int
"""
return int(self.alert.get(str(LogField.DPORT)))
@property
def situation(self):
"""
Situation defined for this alert
:rtype: str
"""
return self.alert.get(str(LogField.SITUATION))
@property
def vulnerability_refs(self):
"""
Comma seperated string listing any vulnerability references for
the alert, if any.
:rtype: str
"""
return self.alert.get(str(LogField.VULNERABILITYREFERENCES))
def __str__(self):
return "{}(severity={},src={},dst={},action={},situation={})".format(
self.__class__.__name__,
self.severity,
self.source,
self.destination,
self.action,
self.situation,
)
def __repr__(self):
return str(self)