Source code for smc.administration.tasks

#  Licensed under the Apache License, Version 2.0 (the "License"); you may
#  not use this file except in compliance with the License. You may obtain
#  a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#  License for the specific language governing permissions and limitations
#  under the License.
"""
Tasks will be fired when executing specific actions such as a policy
upload, refresh, or making backups.

This module provides that ability to access task specific attributes
and optionally poll for status of an operation.

An example of using a task poller when uploading an engine policy
(use `wait_for_finish=True`)::

    engine = Engine('myfirewall')
    poller = engine.upload(policy=fwpolicy, wait_for_finish=True)
    while not poller.done():
        poller.wait(5)
        print("Task Progress {}%".format(poller.task.progress))
    print(poller.last_message())
    assert poller.is_success(), logging.error("Timeout error during policy upload")

"""
import re
import time
import threading
import zipfile
import datetime
import os
from smc.base.model import ElementCache, Element, SubElement
from smc.api.exceptions import TaskRunFailed, ActionCommandFailed, ResourceNotFound
from smc.base.collection import Search
from smc.base.util import millis_to_utc
from smc.compat import PYTHON_v3_9, is_smc_version_less_than

clean_html = re.compile(r"<.*?>")


[docs] def TaskHistory(): """ Task history retrieves a list of tasks in an event queue. :return: list of task events :rtype: list(TaskProgress) """ events = Search.objects.entry_point("task_progress") return [event for event in events]
[docs] class TaskProgress(Element): """ Task Progress represents a task event queue. These tasks may be completed or still running. The task event queue events can be retrieved by calling :func:`~TaskHistory`. """ typeof = "task_progress" @property def task(self): """ Return the task associated with this event :rtype: Task """ return Task(self.data)
[docs] class Task(SubElement): """ Task representation. This is generic and the format is used for any calls to SMC that return an asynchronous follower link to check the status of the task. :param str last_message: Last message received on this task :param bool in_progress: Whether the task is in progress or finished :param bool success: Whether the task succeeded or not :param str follower: Fully qualified path to the follower link to track this task. """ def __init__(self, task): super(Task, self).__init__(href=task.get("follower", None), name=task.get("type", None)) self.data = ElementCache(task) @property def parent(self): """ The possible parent task :rtype: Task or None """ if not is_smc_version_less_than("7.1.6"): if "parent" in self.data: return Task(self.make_request(method="read", href=self.data.get("parent"))) # by default No parent task return None @property def children(self): """ The possible children tasks :rtype: list(Task) or None """ if not is_smc_version_less_than("7.1.6"): if "children" in self.data: return [Task(self.make_request(method="read", href=child)) for child in self.data.get("children", [])] # by default No children task return None @property def resource(self): """ The resource/s associated with this task :rtype: list(Element) """ return [Element.from_href(resource) for resource in self.data.get("resource", [])] @property def progress(self): """ Percentage of completion :rtype: int """ return self.data.get("progress", 0) @property def waiting_inputs(self): """ the task is waiting for inputs :rtype: boolean """ return self.data.get("waiting_inputs", 0) def get_waiting_input_link(self): links = self.data.get("link", []) waiting_input_link = None for link in links: if link["rel"] == "waiting_input": waiting_input_link = link["href"] return waiting_input_link @property def success(self): """ the task has succeed :rtype: boolean """ return self.data.get("success", 0) @property def last_message(self): """ the last message returned by the task :rtype: string """ return self.data.get("last_message", 0) @property def start_time(self): """ Task start time in UTC datetime format :rtype: datetime """ start_time = self.data.get("start_time") if start_time: return millis_to_utc(start_time) @property def end_time(self): """ Task end time in UTC datetime format :rtype: datetime """ end_time = self.data.get("end_time") if end_time: return millis_to_utc(end_time)
[docs] def abort(self): """ Abort existing task. :raises ActionCommandFailed: aborting task failed with reason :return: None """ try: self.make_request(method="delete", resource="abort") except ResourceNotFound: pass except ActionCommandFailed: pass
@property def result_url(self): """ Link to result (this task) :rtype: str """ if self.in_progress: raise IOError("Task is not finished!") return self.get_relation("result")
[docs] def get_result(self): """ Get result of task. :rtype SMCResult. """ return self.make_request(TaskRunFailed, href=self.result_url, raw_result=True).content
[docs] def save_result(self, filename): """ Save result of task to specified filename :param str filename: path and filename to save result to :raises IOError: if task not finished or unable to save file """ if self.in_progress: raise IOError("Task is not finished!") self.make_request(TaskRunFailed, href=self.result_url, raw_result=True, filename=filename)
[docs] def get_progress_report(self): """ Get progress report if applicable :raises BaseException if not applicable :str[] array of progress report lines """ path = os.getcwd() extract_path = f"{path}/progress_report_{datetime.datetime.now().timestamp()}_result" saved_path = f"{extract_path}.zip" self.make_request(TaskRunFailed, href=self.result_url, filename=saved_path) with zipfile.ZipFile(saved_path, "r") as zip_ref: zip_ref.extractall(extract_path) if "display_conf_report.txt" in os.listdir(extract_path): with open(os.path.join(extract_path, "display_conf_report.txt")) as progress_report_file: return progress_report_file.readlines() else: raise BaseException("No progress report available!")
[docs] def update_status(self): """ Gets the current status of this task and returns a new task object. :raises TaskRunFailed: fail to update task status """ task = self.make_request(TaskRunFailed, href=self.href) return Task(task)
def __getattr__(self, key): return self.data.get(key) @staticmethod def execute(self, resource, **kw): """ Execute the task and return a TaskOperationPoller. :rtype: TaskOperationPoller """ params = kw.pop("params", {}) json = kw.pop("json", None) task = self.make_request( TaskRunFailed, method="create", params=params, json=json, resource=resource ) timeout = kw.pop("timeout", 5) wait_for_finish = kw.pop("wait_for_finish", True) return TaskOperationPoller( task=task, timeout=timeout, wait_for_finish=wait_for_finish, **kw )
[docs] def get_task_poller(self, **kw): """ return a TaskOperationPoller for the Task. :rtype: TaskOperationPoller """ timeout = kw.pop('timeout', 5) wait_for_finish = kw.pop('wait_for_finish', True) return TaskOperationPoller( task=self.data, timeout=timeout, wait_for_finish=wait_for_finish, **kw)
@staticmethod def download(self, resource, filename, timeout=5, max_tries=36, **kw): """ Start and return a Download Task :rtype: DownloadTask(TaskOperationPoller) """ params = kw.pop("params", {}) task = self.make_request(TaskRunFailed, method="create", resource=resource, params=params) return DownloadTask(timeout=timeout, max_tries=max_tries, filename=filename, task=task)
[docs] def download_only(self, filename, timeout=5, max_tries=36): """ This does not start task only return a Download Task. :rtype: DownloadTask(TaskOperationPoller) """ return DownloadTask(timeout=timeout, max_tries=max_tries, filename=filename, task=self.data)
[docs] class TaskOperationPoller(object): """ Task Operation Poller provides a way to poll the SMC for the status of the task operation. This is returned by functions that return a task. Typically these will be operations like refreshing policy, uploading policy, etc. """ def __init__(self, task, timeout=5, max_tries=36, wait_for_finish=False): self._task = Task(task) self._thread = None self._done = None self._exception = None self.callbacks = [] # Call after operation completes if wait_for_finish: self._max_tries = max_tries self._timeout = timeout self._done = threading.Event() self._thread = threading.Thread(target=self._start) self._thread.daemon = True self._thread.start() def _start(self): while not self.finished(): try: time.sleep(self._timeout) self._task = self._task.update_status() self._max_tries -= 1 except Exception as e: self._exception = e break self._done.set() for call in self.callbacks: call(self.task) def finished(self): return self._done.is_set() or not self._task.in_progress or self._max_tries == 0
[docs] def add_done_callback(self, callback): """ Add a callback to run after the task completes. The callable must take 1 argument which will be the completed Task. :param callback: a callable that takes a single argument which will be the completed Task. """ if self._done is None or self._done.is_set(): raise ValueError("Task has already finished") if callable(callback): self.callbacks.append(callback)
[docs] def result(self, timeout=None): """ Return the current Task after waiting for timeout :rtype: Task """ self.wait(timeout) return self._task
[docs] def wait(self, timeout=None): """ Blocking wait for task status. """ if self._thread is None: return self._thread.join(timeout=timeout)
[docs] def last_message(self, timeout=5): """ Wait a specified amount of time and return the last message from the task :rtype: str """ if self._thread is not None: self._thread.join(timeout=timeout) return self._task.last_message
[docs] def done(self): """ Is the task done yet :rtype: bool """ # isAlive() is removed since python3.9 return self._thread is None or \ (not PYTHON_v3_9 and not self._thread.isAlive()) or \ (PYTHON_v3_9 and not self._thread.is_alive())
@property def task(self): """ Access to task :rtype: Task """ return self._task
[docs] def stop(self): """ Stop the running task """ if self._thread is not None and self._thread.isAlive(): self._done.set()
[docs] def is_success(self): """ Check if the task was successful and not in progress. complete done method since we can be in timeout :rtype: bool """"" return not self._task.data.data['in_progress'] and self._task.success
[docs] class DownloadTask(TaskOperationPoller): """ A download task handles tasks that have files associated, for example exporting an element to a specified file. """ def __init__(self, filename, task, timeout=5, max_tries=36, **kw): super(DownloadTask, self).__init__( task, timeout=timeout, max_tries=max_tries, wait_for_finish=True, **kw ) self.type = "download_task" self.filename = filename self.download(None) def download(self, timeout): self.wait(timeout) if not self.task.in_progress and not self.task.success: raise TaskRunFailed(self.task.last_message) try: result = self.task.make_request( TaskRunFailed, raw_result=True, href=self.task.result_url, filename=self.filename ) self.filename = result.content except IOError as io: raise TaskRunFailed("Export task failed with message: {}".format(io))