Source code for smc.api.session

#  Licensed under the Apache License, Version 2.0 (the "License"); you may
#  not use this file except in compliance with the License. You may obtain
#  a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#  License for the specific language governing permissions and limitations
#  under the License.
"""
Session module for tracking existing connection state to SMC
"""
import copy
import json
import logging
import ssl

import requests
import collections

from _socket import SO_KEEPALIVE, SOL_SOCKET

import smc
# import smc.api.web
from smc.api.web import send_request, counters
from smc.api.entry_point import Resource
from smc.api.configloader import load_from_file, load_from_environ
from smc.api.common import SMCRequest
from smc.base.decorators import cached_property
from smc.api.exceptions import (
    ConfigLoadError,
    SMCConnectionError,
    UnsupportedEntryPoint,
    SessionManagerNotFound,
    SessionNotFound,
    SMCOperationFailure,
)
from smc.base.model import ElementFactory
from requests.adapters import HTTPAdapter, PoolManager
from urllib3 import Retry

# requests.packages.urllib3.disable_warnings()

MAX_RETRY = 5
ERROR_CODES_SUPPORTING_AUTO_RETRY = [
    503,  # to support db concurrent access from SMC
    409,  # to support ETag conflict
    429,  # to support too many requests
    413,  # to support entity too large
]

logger = logging.getLogger(__name__)


class SessionManager(object):
    """
    The SessionManager keeps track of sessions created within smc-python.
    In most cases, this is transparent functionality as most scripts will
    use only a single session for it's lifetime.

    By default a single Session Manager is created that will be used
    when processing all requests to the SMC.

    Session Manager also has a hook that can be called that will change the
    way the session manager retrieves the session from the manager.
    An example might be that you are using smc-python in a web application
    and authenticating the user to the SMC. Each web session stores the
    name of the authenticated user. You want to use that to map to the
    SMC session by retrieving the user from the web session ID and then
    from the SessionManager.

    A function hook can be registered that will retrieve use some external
    criteria to determine which session to retrieve from the SessionManager.
    .. seealso:: :meth:`~register_hook`.

    Creating your own session manager might be useful if you needed custom
    functionality or needed to extend the default. Create a new manager
    and call mount to set it on the global request object::

        manager = SessionManager()
        manager.mount()

    ..note:: By default, a single session is maintained and considered the
        `default` session.

    :param list(Session) sessions: list of sessions
    """

    _session_hook = None

    def __init__(self, sessions=None):
        self._sessions = collections.OrderedDict()
        sessions = sessions or []
        for session in sessions:
            self._register(session)
        # self._connections = local() #TODO: Make self._sessions an attribute
        # of threading local

    @classmethod
    def create(cls, sessions=None):
        """
        A session manager will be mounted to the SMCRequest class through
        this classmethod. If there is already an existing SessionManager,
        that is returned instead.

        :param list sessions: a list of Session objects
        :rtype: SessionManager
        """
        manager = getattr(SMCRequest, "_session_manager")
        if manager is not None:
            return manager
        manager = SessionManager(sessions)
        manager.mount()
        return manager

    def mount(self):
        """
        Mount this session manager on the request class, making this
        the global manager for processing requests

        :return: None
        """
        setattr(SMCRequest, "_session_manager", self)

    def register_hook(self, hook):
        """
        Add a hook that specifies how to retrieve the session from the
        session manager. A hook must be a callable that takes one argument
        (the SessionManager) and extracts the session based on some criteria.
        An example of using a hook::

            from smc import manager
            def retrieve_session(session_manager):
                ...
                admin = session_manager.get_session('admin')
                return admin if admin.is_active else session_manager.get_default_session()

            manager.register_hook(retrieve_session)

        Hooks can be used when your application requires multiple sessions
        within the same python interpreter. For example, in a web app, you
        might use the SMC administrator account to log in and store the session
        within the web application which allows you to also store and retrieve
        that SMC based session for further operations.
        """
        if callable(hook):
            self._session_hook = hook

    def __contains__(self, session):
        """
        A session is considered to exist if the current user attached to
        the session matches. In SMC, an administrative account must be
        unique even if it only exists in a specific domain.

        :rtype: bool
        """
        return session in self.sessions

    @property
    def sessions(self):
        """
        All available sessions in this session manager

        :rtype: list(Session)
        """
        return list(self._sessions.values())

    def get_default_session(self):
        """
        The default session is nothing more than the first session added
        into the session handler pool. This will likely change in the future
        but for now each session identifies the domain and also manages
        domain switching within a single session.

        :rtype: Session
        """
        if self._sessions:
            return self.get_session(next(iter(self._sessions)))
        return self.get_session()

    def get_session(self, user=None):
        """
        Retrieve the session based on user, or return and empty session.
        Note that an empty session is not inserted into the Session Manager
        until `login` has been successfully called on the session.

        :param str user: optional user to find in the session manager
        :raises SessionNotFound: session was not found in manager
        :rtype: Session
        """
        session = self._sessions.get(user)
        return session if session else Session()

    #         raise SessionNotFound('Session specified by name: %s does not currently '
    #             'exist.' % user)

    def close_all(self):
        for admin_session in list(self._sessions.keys()):
            self._sessions[admin_session].logout()
        self._sessions.clear()

    def _get_session_key(self, session):
        for name, _session in self._sessions.items():
            if _session == session:
                return name

    def _register(self, session):
        """
        Register a session
        """
        user_name = session.name
        smc.session_name.name = "{}-{}".format(user_name, session.api_version)
        if user_name:
            self._sessions[smc.session_name.name] = session
        return smc.session_name.name

    def _deregister(self, session):
        """
        Deregister a session.
        """
        if session in self:
            self._sessions.pop(self._get_session_key(session), None)


class SSLAdapter(HTTPAdapter):

    def init_poolmanager(self, connections, maxsize, block=False):
        """
        :param maxsize:
        Number of connections to save that can be reused. More than 1 is useful
        in multithreaded situations. If ``block`` is set to False, more
        connections will be created but they will not be saved once they've
        been used.
        :param block:
        If set to True, no more than ``maxsize`` connections will be used at
        a time. When no free connections are available, the call will block
        until a connection has been released. This is a useful side effect for
        particular multithreaded situations where one does not want to use more
        than maxsize connections per host to prevent flooding.
        """
        self.poolmanager = PoolManager(
            maxsize=maxsize,
            num_pools=connections,
            block=block,
            timeout=720,
            socket_options=[(SOL_SOCKET, SO_KEEPALIVE, 1)],
            ssl_version=ssl.PROTOCOL_TLSv1_2)


[docs] class Session(object): """ Session represents the clients session to the SMC. A session is obtained by calling login(). If sessions need to be long lived as might be the case when running under a web platform, a session is automatically refreshed when it expires. Best practice is to call logout() after to clear the session from the SMC. A session will be automatically closed once the python interpreter closes. Each session will also have a single connection pool associated with it. This results in a single persistent connection to the SMC that will be re-used as needed. """ # SMC-46756 makes max_pool configurable DEFAULT_POOL_MAXSIZE = requests.adapters.DEFAULT_POOLSIZE def __init__(self, manager=None): self._params = {} # Retrieved from login self._session = None # requests.Session self._resource = None # smc.api.entry_point.Resource self._manager = manager # Session Manager that tracks this session self._connpool = None # default is 10 connections # Transactions are supported in version 0.6.2 and beyond. When # run with atomic, this session parameter indicates whether the # operation in process is within an atomic block or not self.in_atomic_block = False # Transactions that are within the given atomic block self.transactions = [] smc.session_name.name = None @property def manager(self): """ Return the session manager for this session :rtype: SessionManager """ manager = SMCRequest._session_manager if not self._manager else self._manager if not manager: raise SessionManagerNotFound( "A session manager was not found. " "This is an initialization error binding the SessionManager. " ) return manager @property def is_active(self): """ Is this session active. Active means there is a stored session ID for the SMC using the current account. This does not specify whether the session ID has been timed out on the server but does indicate the account has not called logout. :rtype: bool """ return self._session is not None and "JSESSIONID" in self._session.cookies @property def _extra_args(self): """ Extra args are collected from login and used if provided. These are generally not needed but may be used to enable visibility of beta features or set special settings """ return self._params.get("kwargs", {}) @property def entry_points(self): """ Entry points that are bound to this session. Entry points are exposed by the SMC API and provide links to top level resources :rtype: Resource """ if not self._resource: raise SMCConnectionError( "No entry points found, it is likely " "there is no valid login session." ) return self._resource @property def api_version(self): """ Current API Version :rtype: str """ return self._params.get("api_version") @property def session(self): return self._session @property def sock(self): """ get a secure socket from the pool if one is available else get a new connection :rtype: SSLSocket """ logger.debug("Get secure socket, from pool available sock:{}". format(self._connpool.pool.queue)) return self._connpool._get_conn().sock @property def session_id(self): """ The session ID in header type format. Can be inserted into a connection if necessary using:: {'Cookie': session.session_id} :rtype: str """ return ( None if not self.session or "JSESSIONID" not in self.session.cookies else "JSESSIONID={}".format(self.session.cookies["JSESSIONID"]) ) @property def credential(self): # Login credentials return Credential(**{k: self._params.get(k) for k in ("api_key", "login", "pwd")}) @property def url(self): """ The fully qualified SMC URL in use, includes the port number :rtype: str """ return self._params.get("url", "") @property def web_socket_url(self): socket_proto = "wss" if self.is_ssl else "ws" return "{}://{}/{}".format(socket_proto, self.url.split("://")[-1], self.api_version) @property def is_ssl(self): """ Is this an SSL connection :rtype: bool """ return self.url.startswith("https") if self.session else False @property def timeout(self): """ Session timeout in seconds :rtype: int """ return self._params.get("timeout", 30) @property def domain(self): """ Logged in SMC domain :rtype: str """ return "Shared Domain" if not self._params.get("domain") else self._params.get("domain") @property def name(self): """ Return the administrator name for this session. Can be None if the session has not yet been established. .. note:: The administrator name was introduced in SMC version 6.4. Previous versions will show the unique session identifier for this session. :rtype: str """ if self.session: # protect cached property from being set before session try: return self.current_user.name except AttributeError: # TODO: Catch ConnectionError? No session pass except SMCOperationFailure: # Related to recent SMC update. It can failed in case user # does not have Manage Administrator defined in his role. logging.error( "Failed to get username, please make sure you " "can have 'Manage Administrator' in your role" ) return hash(self) @cached_property def current_user(self): """ .. versionadded:: 0.6.0 Requires SMC version >= 6.4 Return the currently logged on API Client user element. :raises UnsupportedEntryPoint: Current user is only supported with SMC version >= 6.4 :rtype: Element """ if self.session: try: response = self.session.get(self.entry_points.get("current_user")) if response.status_code in (200, 201): admin_href = response.json().get("value") request = SMCRequest(href=admin_href) smcresult = send_request(self, "get", request) return ElementFactory(admin_href, smcresult) except UnsupportedEntryPoint: pass
[docs] def login( self, url=None, api_key=None, login=None, pwd=None, api_version=None, timeout=None, verify=True, alt_filepath=None, domain=None, pool_maxsize=None, max_retry=None, **kwargs ): """ Login to SMC API and retrieve a valid session. Sessions use a pool connection manager to provide dynamic scalability during times of increased load. Each session is managed by a global session manager making it possible to have more than one session per interpreter. An example login and logout session:: from smc import session session.login(url='http://1.1.1.1:8082', api_key='SomeSMCG3ener@t3dPwd') .....do stuff..... session.logout() :param str url: ip of SMC management server :param str api_key: API key created for api client in SMC :param str login: Administrator user in SMC that has privilege to SMC API. :param str pwd: Password for user login. :param api_version: specify api version (optional) :param int timeout: (optional): specify a timeout for initial connect; (default 10) :param str|boolean verify: verify SSL connections using cert (default: verify=True) You can pass verify the path to a CA_BUNDLE file or directory with certificates of trusted CAs :param str alt_filepath: If using .smcrc, alternate path+filename :param str domain: domain to log in to. If domains are not configured, this field will be ignored and api client logged in to 'Shared Domain'. :param bool retry_on_busy: pass as kwarg with boolean if you want to add retries if the SMC returns HTTP 503 error during operation. You can also optionally customize this behavior and call :meth:`.set_retry_on_busy` :param int pool_maxsize: The maximum number of connections to save in the pool. :param int max_retry: The maximum number of retry. :return: user session name in SessionManager :rtype: str :raises ConfigLoadError: loading cfg from ~.smcrc fails For SSL connections, you can disable validation of the SMC SSL certificate by setting verify=False, however this is not a recommended practice. If you want to use the SSL certificate generated and used by the SMC API server for validation, set verify='path_to_my_dot_pem'. It is also recommended that your certificate has subjectAltName defined per RFC 2818 If SSL warnings are thrown in debug output, see: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings Logout should be called to remove the session immediately from the SMC server. .. note:: As of SMC 6.4 it is possible to give a standard Administrative user access to the SMC API. It is still possible to use an API Client by providing the api_key in the login call. """ params = {} ''' The global variable MAX_RETRY is hardcoded to 5. The request framework makes use of this value. It can be overridden by passing the max_retry parameter in the login method. ''' global MAX_RETRY if not url or (not api_key and not (login and pwd)): try: # First try load from file params = ( load_from_file(alt_filepath) if alt_filepath is not None else load_from_file() ) logger.debug("Read config data from file: %s", params) except ConfigLoadError: # Last ditch effort, try to load from environment params = load_from_environ() logger.debug("Read config data from environ: %s", params) params = params or dict( url=url, api_key=api_key, login=login, pwd=pwd, api_version=api_version, verify=verify, timeout=timeout, domain=domain, kwargs=kwargs or {}, ) # Check to see this session is already logged in. If so, return. # The session object represents a single connection. Log out to # re-use the same session object or get_session() from the # SessionManager to track multiple sessions. if self.manager and (self.session and self in self.manager): logger.info( "An attempt to log in occurred when a session already " "exists, bypassing login for session: %s" % self ) return self._params = {k: v for k, v in params.items() if v is not None} verify_ssl = self._params.get("verify", True) # Determine and set the API version we will use. self._params.update( api_version=get_api_version(self.url, self.api_version, self.timeout, verify_ssl) ) extra_args = self._params.get("kwargs", {}) # Retries configured retry_on_busy = extra_args.pop("retry_on_busy", False) request = self._build_auth_request(verify_ssl, **extra_args) if pool_maxsize: Session.DEFAULT_POOL_MAXSIZE = pool_maxsize ''' SMC-47064 If the max_retry parameter is passed in the login method, it overrides the hardcoded value of MAX_RETRY. ''' if max_retry: MAX_RETRY = max_retry # This will raise if session login fails... self._session = self._get_session(request) self.session.verify = verify_ssl if retry_on_busy: self.set_retry_on_busy() # Load entry points if not self._resource: load_entry_points(self) logger.debug( "Login succeeded for admin: %s in domain: %s, session: %s", self.name, self.domain, self.session_id, ) return self.manager._register(self)
def __repr__(self): return "Session(name=%s,domain=%s)" % (self.name, self.domain) def _build_auth_request(self, verify=False, **kwargs): """ Build the authentication request to SMC """ json = {"domain": self.domain} credential = self.credential params = {} if credential.provider_name.startswith("lms"): if self.domain: params = dict(login=credential._login, pwd=credential._pwd, domain=self.domain) else: params = dict(login=credential._login, pwd=credential._pwd) else: json.update(authenticationkey=credential._api_key) if kwargs: json.update(**kwargs) # Store in case we need to rebuild later self._extra_args.update(**kwargs) request = dict( url=self.credential.get_provider_entry_point(self.url, self.api_version), json=json, params=params, headers={"content-type": "application/json"}, verify=verify, ) return request def _get_session(self, request): """ Authenticate the request dict :param dict request: request dict built from user input :raises SMCConnectionError: failure to connect :return: python requests session :rtype: requests.Session """ # build the user session if self.session is None: _session = requests.sessions.Session() # empty session retry = Retry( total=MAX_RETRY, read=MAX_RETRY, connect=MAX_RETRY, backoff_factor=0.3, status_forcelist=ERROR_CODES_SUPPORTING_AUTO_RETRY, ) adapter = HTTPAdapter(max_retries=retry, pool_maxsize=Session.DEFAULT_POOL_MAXSIZE) ssladapter = SSLAdapter(max_retries=retry, pool_maxsize=Session.DEFAULT_POOL_MAXSIZE) _session.mount("http://", adapter) _session.mount("https://", ssladapter) else: _session = self.session response = _session.post(**request) logger.info("Using SMC API version: %s", self.api_version) if not self._connpool: self._connpool = ssladapter.get_connection(request.get("url")) if response.status_code != 200: raise SMCConnectionError( "Login failed, HTTP status code: %s and reason: %s" % (response.status_code, response.reason) ) return _session
[docs] def logout(self): """ Logout session from SMC :return: None """ if not self.session: self.manager._deregister(self) return if len(self._connpool.pool.queue) < self._connpool.pool.maxsize: logger.info("Can't logout connectionpool is currently used") self.manager._deregister(self) return try: r = self.session.put(self.entry_points.get("logout")) if r.status_code == 204: logger.info( "Logged out admin: %s of domain: %s successfully", self.name, self.domain ) else: logger.error( "Logout status was unexpected. Received response " "with status code: %s", (r.status_code), ) except requests.exceptions.SSLError as e: logger.error("SSL exception thrown during logout: %s", e) except requests.exceptions.ConnectionError as e: logger.error("Connection error on logout: %s", e) finally: self.entry_points.clear() self.manager._deregister(self) # Reset session object after logout requests.sessions.session().close() self._session = None try: delattr(self, "current_user") except AttributeError: pass logger.debug("Call counters: %s" % counters)
[docs] def refresh(self): """ Refresh session on 401. This is called automatically if your existing session times out and resends the operation/s which returned the error. :raises SMCConnectionError: Problem re-authenticating using existing api credentials """ if self.session and self.name: # Did session timeout? logger.info( "Session timed out, will try obtaining a new session using " "previously saved credential information." ) try: self.logout() # Force log out session just in case except SMCConnectionError: logger.debug("Tried to logout but session is already closed, can continue..") self.login(**self.copy()) if self.session: return raise SMCConnectionError("Session expired and attempted refresh failed.")
[docs] def switch_domain(self, domain): """ Switch from one domain to another. You can call session.login() with a domain key value to log directly into the domain of choice or alternatively switch from domain to domain. The user must have permissions to the domain or unauthorized will be returned. In addition, when switching domains, you will be logged out of the current domain to close the connection pool associated with the previous session. This prevents potentially excessive open connections to SMC :: session.login() # Log in to 'Shared Domain' ... session.switch_domain('MyDomain') :raises SMCConnectionError: Error logging in to specified domain. This typically means the domain either doesn't exist or the user does not have privileges to that domain. """ if self.domain != domain: if self in self.manager: # Exit current domain self.logout() logger.info("Switching to domain: %r and creating new session", domain) params = self.copy() params.update(domain=domain) self.login(**params)
[docs] def set_retry_on_busy(self, total=5, backoff_factor=0.1, status_forcelist=None, **kwargs): """ Mount a custom retry object on the current session that allows service level retries when the SMC might reply with a Service Unavailable (503) message. This can be possible in larger environments with higher database activity. You can all this on the existing session, or provide as a dict to the login constructor. :param int total: total retries :param float backoff_factor: when to retry :param list status_forcelist: list of HTTP error codes to retry on :param list method_whitelist: list of methods to apply retries for, GET, POST and PUT by default :return: None """ if self.session: from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry allowed_methods = kwargs.pop("method_whitelist", []) or ["GET", "POST", "PUT"] status_forcelist = frozenset(status_forcelist) if status_forcelist else frozenset([503]) retry = Retry( total=total, backoff_factor=backoff_factor, status_forcelist=status_forcelist, allowed_methods=allowed_methods, ) for proto_str in ("http://", "https://"): self.session.mount(proto_str, HTTPAdapter(max_retries=retry)) logger.debug("Mounting retry object to HTTP session: %s" % retry)
def copy(self): # Copy the relevant parameters to make another session login # using the existing information params = copy.copy(self._params) kwargs = params.pop("kwargs", {}) params.update(**kwargs) return params def _get_log_schema(self): """ Get the log schema for this SMC version. :return: dict """ if self.session and self.session_id: schema = "{}/{}/monitoring/log/schemas".format(self.url, self.api_version) response = self.session.get( url=schema, headers={"cookie": self.session_id, "content-type": "application/json"} ) if response.status_code in (200, 201): return response.json()
class Credential(object): """ Provider for authenticating the user. LMS Login is a user created within the SMC as a normal administrative account. Login is the standard way of using an API client and key as password. The key of the CredentialMap also indicates the entry point for which to POST the authentication. """ CredentialMap = {"lms_login": ("login", "pwd"), "login": ("api_key",)} def __init__(self, api_key=None, login=None, pwd=None): self._api_key = api_key self._login = login self._pwd = pwd @property def provider_name(self): return "login" if self._api_key else "lms_login" def get_provider_entry_point(self, url, api_version): return "{url}/{api_version}/{provider_name}".format( url=url, api_version=api_version, provider_name=self.provider_name ) @property def has_credentials(self): """ Does this session have valid credentials :rtype: bool """ return all( [ getattr(self, "_%s" % field, None) is not None for field in self.CredentialMap.get(self.provider_name) ] ) def load_entry_points(self): try: r = self.session.get( "{url}/{api_version}/api".format(url=self.url, api_version=self.api_version) ) if r.status_code == 200: result_list = json.loads(r.text) self._resource = Resource(result_list["entry_point"]) logger.debug("Loaded entry points with obtained session.") else: raise SMCConnectionError( "Invalid status received while getting entry points from SMC. " "Status code received %s. Reason: %s" % (r.status_code, r.reason) ) except requests.exceptions.RequestException as e: raise SMCConnectionError(e) def available_api_versions(base_url, timeout=10, verify=True): """ Get all available API versions for this SMC :return version numbers :rtype: list """ try: r = requests.get("%s/api" % base_url, timeout=timeout, verify=verify) # no session required if r.status_code == 200: j = json.loads(r.text) versions = [] for version in j["version"]: versions.append(version["rel"]) return versions raise SMCConnectionError( "Invalid status received while getting entry points from SMC. " "Status code received %s. Reason: %s" % (r.status_code, r.reason) ) except requests.exceptions.RequestException as e: raise SMCConnectionError(e) def get_api_version(base_url, api_version=None, timeout=10, verify=True): """ Get the API version specified or resolve the latest version :return api version :rtype: float """ versions = available_api_versions(base_url, timeout, verify) # search min and max versions min_version = "99.99" max_version = "0.0" for i in versions: major, minor = str(i).split(".") major_max, minor_max = str(max_version).split(".") major_min, minor_min = str(min_version).split(".") if (int(major) == int(major_max) and int(minor) > int(minor_max))\ or (int(major) > int(major_max)): max_version = major + "." + minor if int(major) <= int(major_min) and int(minor) < int(minor_min): min_version = major + "." + minor newest_version = max_version major_newest, minor_newest = str(newest_version).split(".") if int(major_newest) <= 6 and int(minor_newest) <= 5: best_version = newest_version else: best_version = min_version if api_version is None: api_version = best_version else: if api_version not in versions: api_version = best_version return api_version