Source code for smc_monitoring.monitors.sslvpn

#  Licensed under the Apache License, Version 2.0 (the "License"); you may
#  not use this file except in compliance with the License. You may obtain
#  a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#  License for the specific language governing permissions and limitations
#  under the License.
"""
SSLVPN currently connected users.

Create a query to obtain all connections for a given engine::

    query = SSLVPNQuery('sg_vm')

Add a timezone to the query::

    query.format.timezone('CST')

Execute query and return raw results::

    for records in query.fetch_batch():
        ...

Execute query and return as an :class:`.SSLVPNUser` element::

    for records in query.fetch_as_element():
        ...

.. seealso:: :class:`smc_monitoring.models.filters` for more information on creating filters

"""
from smc_monitoring.models.query import Query
from smc_monitoring.models.constants import LogField


[docs] class SSLVPNQuery(Query): """ Show all current SSL VPN connections on the specified target. :ivar list field_ids: field IDs are the default fields for this entry type and are constants found in :class:`smc_monitoring.models.constants.LogField` :param str target: name of target engine/cluster """ location = "/monitoring/session/socket" field_ids = [ LogField.SSLVPNSESSIONMONRECEIVED, LogField.SSLVPNSESSIONMONTIMEOUT, LogField.NODEID, LogField.SRC, LogField.USERNAME, ] def __init__(self, target, **kw): super(SSLVPNQuery, self).__init__("SSLVPNV2", target, **kw)
[docs] def fetch_as_element(self, **kw): """ Fetch the results and return as an SSLVPNUser element. The original query is not modified. :param int query_timeout: length of time to wait on recieving web socket results (total query time). :param int inactivity_timeout: length of time before exiting if no new entry. :param int max_recv: for queries that are not 'live', set this to supply a max number of receive iterations. :return: generator of elements :rtype: :class:`.SSLVPNUser` """ clone = self.copy() clone.format.field_format("id") for custom_field in ["field_ids", "field_names"]: clone.format.data.pop(custom_field, None) for list_of_results in clone.fetch_raw(**kw): for entry in list_of_results: first_fetch = entry.get("first_fetch") first_fetch = first_fetch if first_fetch else False entry.update({"first_fetch": first_fetch}) yield SSLVPNUser(**entry)
[docs] class SSLVPNUser(object): """ Connection represents a state table entry. This is the result of making a :class:`.SSLVPNQuery` and using :meth:`~SSLVPNQuery.fetch_as_element`. """ def __init__(self, **data): self.user = data @property def first_fetch(self): """ first fetch True means entry is part of initial data at first fetch :rtype: bool """ return self.user.get("first_fetch") @property def engine(self): """ The engine/cluster for this state table entry :return: engine or cluster for this entry :rtype: str """ return self.user.get(str(LogField.NODEID)) @property def source_addr(self): """ Source IP address for the SSL VPN user :rtype: str """ return self.user.get(str(LogField.SRC)) @property def username(self): """ Username for this SSL VPN user :rtype: str """ return self.user.get(str(LogField.USERNAME)) @property def session_start(self): """ Time the session started. It is recommended that you add a timezone to the query to present this in human readable format:: query.format.timezone('CST') :rtype: str """ return self.user.get(str(LogField.SSLVPNSESSIONMONRECEIVED)) @property def session_expiration(self): """ Time the session expires. It is recommended that you add a timezone to the query to present this in human readable format:: query.format.timezone('CST') :rtype: str """ return self.user.get(str(LogField.SSLVPNSESSIONMONTIMEOUT)) def __str__(self): return "{}(user={},ipaddress={},session_start={})".format( self.__class__.__name__, self.username, self.source_addr, self.session_start) def __repr__(self): return str(self)