# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
.. versionadded:: 0.5.7
Requires SMC version >= 6.3.2
Scheduled tasks are administrative processes that can run either immediately
after being defined, or scheduled to run on a regular basis. Scheduled tasks
in the SMC are defined under Administration->Tasks->Definition.
Some tasks are read-only, meaning they are system elements and cannot be modified
or copied and can therefore only be scheduled (these task related classes will not
have a `create` method). Other tasks can be created and custom settings can be
defined. Check the documentation for each task to determine the capabilities.
All tasks inherit the ScheduledTaskMixin which provides a `start` method and
access to a TaskSchedule instance through the `task_schedule` property. The
associated TaskSchedule defines whether to run the task ongoing and details
specifying when the task should be run and how often.
An example follows that shows how to use a refresh policy task. Other tasks use the
same API syntax.
Finding existing tasks for a specific task type::
for task in RefreshPolicyTask.objects.all():
print(task, task.task_schedule)
Review an existing task and it's task schedule::
task = RefreshPolicyTask(name='mytask')
for schedule in task.task_schedule:
print(schedule.activation_date, schedule.activated)
Create a refresh policy refresh task::
task = RefreshPolicyTask.create(
name='mytask',
engines=[Engine('engine1'), Engine('engine2')],
comment='some comment')
A created task can always be run at any time without having to set a
schedule for the task by calling `start` on the task::
task = RefreshPolicyTask('mytask')
task.start()
A task can also be scheduled for a future time. Adding a scheduled run to
the task requires that we first obtain the task and add the schedule to it.
This can be done when creating the task, or the retrieved after::
task = RefreshPolicyTask.create(
name='mytask',
engines=[Engine('engine1'), Engine('engine2')],
comment='refresh policy on specified engines')
task.add_schedule(
name='refresh_policy_on_saturday',
activation_date=1512325716000, # 12/04/2017 00:00:00
day_period='weekly',
day_mask=128,
comment='tun this task weekly')
You can also specify tasks that run on a regular interval, such as
monthly::
task = RefreshPolicyTask(name='mytask')
task.add_schedule(
name='run_monthly',
activation_date=1512367200000, # Start 12/4/2017 at 00:00:00
day_period='monthly')
Repeat a task for a period of time, then disable task on specified
date::
task = DeleteLogTask.create(
name='Delete SMC Server logs',
servers='all',
time_range='last_full_month',
all_logs=True)
task.add_schedule(
name='Run for 6 months',
activation_date=1512367200000, # Start 12/04/2017
day_period='monthly',
repeat_until_date=1528088400000, # End 06/04/2018
comment='purge log task')
.. note:: You can use the helper method :func:`smc.base.util.datetime_to_ms`
for obtaining millisecond times for scheduled tasks.
"""
from smc.base.model import Element, SubElement, ElementCreator, ElementRef
from smc.api.exceptions import ActionCommandFailed, UnsupportedEngineFeature
from smc.administration.tasks import Task
from smc.base.structs import NestedDict
from smc.compat import min_smc_version, is_api_version_less_than
from smc.elements.servers import ManagementServer, LogServer
from smc.core.engines import MasterEngine
from smc.elements.other import FilterExpression
from smc.base.util import datetime_from_ms, element_resolver
[docs]
def policy_validation_settings(**kwargs):
"""
Set policy validation settings. This is used when policy based
tasks are created and `validate_policy` is set to True. The
following kwargs can be overridden in the create constructor.
:param bool configuration_validation_for_alert_chain: default False
:param bool duplicate_rule_check_settings: default False
:param bool empty_rule_check_settings: default True
:param bool emtpy_rule_check_settings_for_alert: default False
:param bool general_check_settings: default True
:param bool nat_modification_check_settings: default True
:param bool non_supported_feature: default True
:param bool routing_modification_check: default False
:param bool unreachable_rule_check_settings: default False
:param bool vpn_validation_check_settings: default True
:return: dict of validation settings
"""
validation_settings = {
"configuration_validation_for_alert_chain": False,
"duplicate_rule_check_settings": False,
"empty_rule_check_settings": True,
"empty_rule_check_settings_for_alert": False,
"general_check_settings": True,
"nat_modification_check_settings": True,
"non_supported_feature": True,
"routing_modification_check": False,
"unreachable_rule_check_settings": False,
"vpn_validation_check_settings": True,
}
for key, value in kwargs.items():
validation_settings[key] = value
return {"validation_settings": validation_settings}
[docs]
def log_target_types(all_logs=False, **kwargs):
"""
Log targets for log tasks. A log target defines the log types
that will be affected by the operation. For example, when creating
a DeleteLogTask, you can specify which log types are deleted.
:param bool for_alert_event_log: alert events traces (default: False)
:param bool for_alert_log: alerts (default: False)
:param bool for_fw_log: NGFW Engine logs (default: False)
:param bool for_ips_log: IPS logs (default: False)
:param bool for_ips_recording: any IPS pcaps (default: False)
:param bool for_l2fw_log: layer 2 Engine logs (default: False)
:param bool for_third_party_log: any 3rd party logs (default: False)
:return: dict of log targets
"""
log_types = {
"for_alert_event_log": False,
"for_alert_log": False,
"for_audit_log": False,
"for_fw_log": False,
"for_ips_log": False,
"for_ips_recording_log": False,
"for_l2fw_log": False,
"for_third_party_log": False,
}
if all_logs:
for key in log_types.keys():
log_types[key] = True
else:
for key, value in kwargs.items():
log_types[key] = value
return log_types
[docs]
def server_directory(**kwargs):
"""
create a server directory dict. This is used for export/archive logs
tasks.
The following kwargs can be overridden in the create constructor.
:param bool archive_mask: The bit mask of selected archive directories, -1 for all. default -1
:param bool only_archive:Indicates whether only the active directory is excluded.
:param LogServer / Server server: The server used for log retrieving.
:return: dict of server_directory
"""
srv_directory = {
"archive_mask": -1,
"only_archive": False,
"server": None
}
for key, value in kwargs.items():
if key == "server":
srv_directory[key] = value.href
else:
srv_directory[key] = value
return srv_directory
[docs]
def server_directories_settings(srv_directories):
"""
Server directories used to retrieve logs.. This is needed for export or archive logs
tasks.
:param list server_directories: list of server_directory
:return: dict of server_directories settings
"""
server_directories = []
for srv_directory in srv_directories:
server_directories.append(srv_directory)
return {"server_directories": server_directories}
[docs]
class TaskSchedule(SubElement):
"""
A task schedule is associated with a given task type that defines
when the scheduled task should run.
:ivar str day_period: how often to run the task
:var str final_action: what to do when the task is complete
:ivar str minute_period: if day_period is set to hourly, when to run
within the hour.
"""
@property
def activated(self):
"""
Whether this schedule is active for this task.
:rtype: bool
"""
return self.data.get("activated", False)
@property
def activation_date(self):
"""
Return the UTC time when the task is set to first run.
The activation date is returned as a python datetime
object.
:return: datetime object in format '%Y-%m-%d %H:%M:%S.%f'
:rtype: datetime.datetime
"""
return datetime_from_ms(self.data.get("activation_date"))
[docs]
def activate(self):
"""
If a task is suspended, this will re-activate the task.
Usually it's best to check for activated before running
this::
task = RefreshPolicyTask('mytask')
for scheduler in task.task_schedule:
if scheduler.activated:
scheduler.suspend()
else:
scheduler.activate()
"""
if "activate" in self.data.links:
self.make_request(
ActionCommandFailed, method="update", etag=self.etag, resource="activate"
)
self._del_cache()
else:
raise ActionCommandFailed(
"Task is already activated. To " "suspend, call suspend() on this task schedule"
)
[docs]
def suspend(self):
"""
Suspend this scheduled task.
:raises ActionCommandFailed: failed to suspend, already suspended. Call
activate on this task to reactivate.
:return: None
"""
if "suspend" in self.data.links:
self.make_request(
ActionCommandFailed, method="update", etag=self.etag, resource="suspend"
)
self._del_cache()
else:
raise ActionCommandFailed("Task is already suspended. Call activate " "to reactivate.")
[docs]
class ScheduledTaskMixin(object):
"""
Actions common to all scheduled tasks.
"""
[docs]
def start(self):
"""
Start the scheduled task now. Task can then be tracked by
using common Task methods.
:raises ActionCommandFailed: failed starting task
:return: return as a generic Task
:rtype: Task
"""
return Task(self.make_request(ActionCommandFailed, method="create", resource="start"))
@property
def task_schedule(self):
"""
Return any task schedules associated with this
scheduled task.
:raises ActionCommandFailed: failure to retrieve task schedule
:return: list of task schedules
:rtype: TaskSchedule
"""
return [TaskSchedule(**sched) for sched in self.make_request(resource="task_schedule")]
[docs]
def add_schedule(
self,
name,
activation_date,
day_period="one_time",
final_action="ALERT_FAILURE",
activated=True,
minute_period="one_time",
day_mask=None,
repeat_until_date=None,
comment=None,
):
"""
Add a schedule to an existing task.
:param str name: name for this schedule
:param int activation_date: when to start this task. Activation date
should be a UTC time represented in milliseconds.
:param str day_period: when this task should be run. Valid options:
'one_time', 'daily', 'weekly', 'monthly', 'yearly'. If 'daily' is
selected, you can also provide a value for 'minute_period'.
(default: 'one_time')
:param str minute_period: only required if day_period is set to 'daily'.
Valid options: 'each_quarter' (15 min), 'each_half' (30 minutes), or
'hourly', 'one_time' (default: 'one_time')
:param int day_mask: If the task day_period=weekly, then specify the day
or days for repeating. Day masks are: sun=1, mon=2, tue=4, wed=8,
thu=16, fri=32, sat=64. To repeat for instance every Monday, Wednesday
and Friday, the value must be 2 + 8 + 32 = 42
:param str final_action: what type of action to perform after the
scheduled task runs. Options are: 'ALERT_FAILURE', 'ALERT', or
'NO_ACTION' (default: ALERT_FAILURE)
:param bool activated: whether to activate the schedule (default: True)
:param str repeat_until_date: if this is anything but a one time task run,
you can specify the date when this task should end. The format is the
same as the `activation_date` param.
:param str comment: optional comment
:raises ActionCommandFailed: failed adding schedule
:return: None
"""
json = {
"name": name,
"activation_date": activation_date,
"day_period": day_period,
"day_mask": day_mask,
"activated": activated,
"final_action": final_action,
"minute_period": minute_period,
"repeat_until_date": repeat_until_date if repeat_until_date else None,
"comment": comment,
}
if "daily" in day_period:
minute_period = minute_period if minute_period != "one_time" else "hourly"
json["minute_period"] = minute_period
return self.make_request(
ActionCommandFailed, method="create", resource="task_schedule", json=json
)
@property
def resources(self):
"""
Resources associated with this task. Depending on the task, this
may be engines, policies, servers, etc.
:return: list of Elements
:rtype: list
"""
return [Element.from_href(href) for href in self.data.get("resources")]
[docs]
class RefreshPolicyTask(ScheduledTaskMixin, Element):
"""
A scheduled task associated with refreshing policy on engine/s.
A refresh will push an existing policy that is already mapped to
the engine/s. Use :class:`~UploadPolicyTask` to create a task
that will assign a policy to an engine/s and upload.
.. note:: Any engine can force a policy refresh on the engine
node directly by calling engine.refresh(), or from the engines
assigned policy by calling policy.refresh(engine) also.
"""
typeof = "refresh_policy_task"
[docs]
@classmethod
def create(cls, name, engines, comment=None, validate_policy=True, **kwargs):
"""
Create a refresh policy task associated with specific
engines. A policy refresh task does not require a policy
be specified. The policy used in the refresh will be the
policy already assigned to the engine.
:param str name: name of this task
:param engines: list of Engines for the task
:type engines: list(Engine)
:param str comment: optional comment
:param bool validate_policy: validate the policy before upload.
If set to true, validation kwargs can also be provided
if customization is required, otherwise default validation settings
are used.
:param kwargs: see :func:`~policy_validation_settings` for keyword
arguments and default values.
:raises ElementNotFound: engine specified does not exist
:raises CreateElementFailed: failure to create the task
:return: the task
:rtype: RefreshPolicyTask
"""
json = {"resources": [engine.href for engine in engines], "name": name, "comment": comment}
if "preserve_connections" in kwargs or "snapshot_creation" in kwargs:
json.update({"preserve_connections": kwargs.pop("preserve_connections")})
json.update({"snapshot_creation": kwargs.pop("snapshot_creation")})
if validate_policy:
json.update(policy_validation_settings(**kwargs))
return ElementCreator(cls, json)
[docs]
class UploadPolicyTask(ScheduledTaskMixin, Element):
"""
An upload policy task will assign a specified policy to an
engine or group of engines and upload. If an engine specified
has an existing policy assigned, the engine will be reassigned
the specified policy. If the intent is to create a policy task
to push an existing assigned policy, use :class:`~RefreshPolicyTask`
instead.
.. note:: Policy upload on an engine can be done from the engine
node itself by calling engine.upload('policy_name') or from a policy directly
by policy.upload('engine_name').
"""
typeof = "upload_policy_task"
[docs]
@classmethod
def create(cls, name, engines, policy, comment=None, validate_policy=False, **kwargs):
"""
Create an upload policy task associated with specific
engines. A policy reassigns any policies that might be
assigned to a specified engine.
:param str name: name of this task
:param engines: list of Engines for the task
:type engines: list(Engine)
:param Policy policy: Policy to assign to the engine/s
:param str comment: optional comment
:param bool validate_policy: validate the policy before upload.
If set to true, validation kwargs can also be provided
if customization is required, otherwise default validation settings
are used.
:param kwargs: see :func:`~policy_validation_settings` for keyword
arguments and default values.
:raises ElementNotFound: engine or policy specified does not exist
:raises CreateElementFailed: failure to create the task
:return: the task
:rtype: UploadPolicyTask
"""
json = {
"name": name,
"policy": policy.href,
"resources": [eng.href for eng in engines],
"comment": comment,
}
if "preserve_connections" in kwargs or "snapshot_creation" in kwargs:
json.update({"preserve_connections": kwargs.pop("preserve_connections")})
json.update({"snapshot_creation": kwargs.pop("snapshot_creation")})
if validate_policy:
json.update(policy_validation_settings(**kwargs))
return ElementCreator(cls, json)
[docs]
class RemoteUpgradeTask(ScheduledTaskMixin, Element):
"""
A Remote Upgrade task will Upgrade Firewall to a specified version.
If an engine specified
"""
typeof = 'remote_upgrade_task'
[docs]
@classmethod
def create(cls, name, engines, package, comment=None, **kwargs):
"""
Create an upload policy task associated with specific
engines. A policy reassigns any policies that might be
assigned to a specified engine.
:param str name: name of this task
:param engines: list of Engines for the task
:type engines: list(Engine)
:param str package: Package to assign to the engine/s
:param str comment: optional comment
:raises ElementNotFound: engine specified does not exist
:raises CreateElementFailed: failure to create the task
:return: the task
:rtype: RemoteUpgradeTask
"""
nodeList = []
for engine in engines:
for node in engine.nodes:
nodeList.append(node.href)
json = {
'name': name,
'engine_upgrade_filename': package,
'resources': nodeList,
'comment': comment}
return ElementCreator(cls, json)
[docs]
class ValidatePolicyTask(ScheduledTaskMixin, Element):
"""
Run a policy validation task. This does not perform a policy push.
This may be useful if you want to validate any pending changes before
a future policy push.
:ivar Element policy: The policy associated with this task
"""
typeof = "validate_policy_task"
policy = ElementRef("policy")
[docs]
@classmethod
def create(cls, name, engines, policy=None, comment=None, **kwargs):
"""
Create a new validate policy task.
If a policy is not specified, the engines existing policy will
be validated. Override default validation settings as kwargs.
:param str name: name of task
:param engines: list of engines to validate
:type engines: list(Engine)
:param Policy policy: policy to validate. Uses the engines assigned
policy if none specified.
:param kwargs: see :func:`~policy_validation_settings` for keyword
arguments and default values.
:raises ElementNotFound: engine or policy specified does not exist
:raises CreateElementFailed: failure to create the task
:return: the task
:rtype: ValidatePolicyTask
"""
json = {
"name": name,
"resources": [eng.href for eng in engines],
"policy": policy.href if policy is not None else policy,
"comment": comment,
}
if kwargs:
json.update(policy_validation_settings(**kwargs))
return ElementCreator(cls, json)
[docs]
class RefreshMasterEnginePolicyTask(ScheduledTaskMixin, Element):
"""
Refresh a Master Engine and virtual policy task.
.. note:: This task is only relevant for Master Engines types.
"""
typeof = "refresh_master_and_virtual_policy_task"
[docs]
@classmethod
def create(cls, name, master_engines, comment=None, **kwargs):
"""
Create a refresh task for master engines.
:param str name: name of task
:param master_engines: list of master engines for this task
:type master_engines: list(MasterEngine)
:param str comment: optional comment
:param kwargs: to support new attributes like preserve_connections, snapshot_creation.
:raises CreateElementFailed: failed to create the task, i.e. no
valid engines provided
:return: the task
:rtype: RefreshMasterEnginePolicyTask
"""
json = {
"name": name,
"comment": comment,
"resources": [eng.href for eng in master_engines if isinstance(eng, MasterEngine)],
}
for k, v in kwargs.items():
json.update({k: v})
return ElementCreator(cls, json)
[docs]
class DeleteLogTask(ScheduledTaskMixin, Element):
"""
A delete log task defines a way to purge log data from the SMC.
When defining the task, you specify which servers to delete from
(typically management AND log server/s), and which log types to
delete.
.. note:: Log tasks currently support pre-defined time ranges such
as 'yesterday', 'last_week', etc. If creating custom time ranges
for tasks, use the SMC.
"""
typeof = "delete_log_task"
[docs]
@classmethod
def create(
cls,
name,
servers=None,
time_range="yesterday",
all_logs=False,
filter_for_delete=None,
comment=None,
**kwargs
):
"""
Create a new delete log task. Provide True to all_logs to delete
all log types. Otherwise provide kwargs to specify each log by
type of interest.
:param str name: name for this task
:param servers: servers to back up. Servers must be instances of
management servers or log servers. If no value is provided, all
servers are backed up.
:type servers: list(ManagementServer or LogServer)
:param str time_range: specify a time range for the deletion. Valid
options are 'yesterday', 'last_full_week_sun_sat',
'last_full_week_mon_sun', 'last_full_month' (default 'yesterday')
:param FilterExpression filter_for_delete: optional filter for deleting.
(default: FilterExpression('Match All')
:param bool all_logs: if True, all log types will be deleted. If this
is True, kwargs are ignored (default: False)
:param kwargs: see :func:`~log_target_types` for keyword
arguments and default values.
:raises ElementNotFound: specified servers were not found
:raises CreateElementFailed: failure to create the task
:return: the task
:rtype: DeleteLogTask
"""
if not servers:
servers = [svr.href for svr in ManagementServer.objects.all()]
servers.extend([svr.href for svr in LogServer.objects.all()])
else:
servers = [svr.href for svr in servers]
filter_for_delete = (
filter_for_delete.href if filter_for_delete else FilterExpression("Match All").href
)
json = {
"name": name,
"resources": servers,
"time_limit_type": time_range,
"start_time": 0,
"end_time": 0,
"file_format": "unknown",
"filter_for_delete": filter_for_delete,
"comment": comment,
}
json.update(**log_target_types(all_logs, **kwargs))
return ElementCreator(cls, json)
[docs]
class ServerBackupTask(ScheduledTaskMixin, Element):
"""
A task that will back up the Management Server/s, Log Server/s and
optionally the Log Server data.
:ivar bool log_data_must_be_saved: whether to back up logs
"""
typeof = "backup_task"
[docs]
@classmethod
def create(cls, name, servers, backup_log_data=False, encrypt_password=None, comment=None,
path=None, script=None):
"""
Create a new server backup task. This task provides the ability
to backup individual or all management and log servers under
SMC management.
:param str name: name of task
:param servers: servers to back up. Servers must be instances of
management servers or log servers. If no value is provided all
servers are backed up.
:type servers: list(ManagementServer or LogServer)
:param bool backup_log_data: Should the log files be backed up. This
field is only relevant if a Log Server is backed up.
:param str encrypt_password: Provide an encrypt password if you want
this backup to be encrypted.
:param str comment: optional comment
:param path: The path to store the backup
:param script: Script to execute after backup has been generated
:raises ElementNotFound: specified servers were not found
:raises CreateElementFailed: failure to create the task
:return: the task
:rtype: ServerBackupTask
"""
if not servers:
servers = [svr.href for svr in ManagementServer.objects.all()]
servers.extend([svr.href for svr in LogServer.objects.all()])
else:
servers = [svr.href for svr in servers]
json = {
"resources": servers,
"name": name,
"password": encrypt_password if encrypt_password else None,
"log_data_must_be_saved": backup_log_data,
"backup_comment": comment,
}
if min_smc_version("7.0"):
# server_target_path, script_to_execute attributes are supported in the latest version.
json.update(server_target_path=path,
script_to_execute=script)
return ElementCreator(cls, json)
@property
def servers(self):
return self.data.get("resources")
[docs]
class SGInfoTask(ScheduledTaskMixin, Element):
"""
An SGInfo task is used for obtaining support data from the engine/s.
.. note:: An sginfo can be executed directly on an engine node by calling
the node.sginfo() method directly.
:ivar bool include_core_files: whether to include core files in output
:ivar bool include_slapcat_output: include slapcat in output
.. warning:: For an sginfo to be readable, the engine must not have the
'encrypt_configuration' field enabled on the engine or the data will
be unreadable.
"""
typeof = "sginfo_task"
[docs]
@classmethod
def create(
cls, name, engines, include_core_files=False, include_slapcat_output=False, comment=None
):
"""
Create an sginfo task.
:param str name: name of task
:param engines: list of engines to apply the sginfo task
:type engines: list(Engine)
:param bool include_core_files: include core files in the
sginfo backup (default: False)
:param bool include_slapcat_output: include output from a
slapcat command in output (default: False)
:raises ElementNotFound: engine not found
:raises CreateElementFailed: create the task failed
:return: the task
:rtype: SGInfoTask
"""
json = {
"name": name,
"comment": comment,
"resources": [engine.href for engine in engines],
"include_core_files": include_core_files,
"include_slapcat_output": include_slapcat_output,
}
return ElementCreator(cls, json)
[docs]
class SystemSnapsotTask(ScheduledTaskMixin, Element):
"""
A read-only task that will make a snapshot of all system
elements after a updating a dynamic package on SMC.
"""
typeof = "create_system_snapshot_task"
[docs]
class DeleteOldRunTask(ScheduledTaskMixin, Element):
"""
A read-only task to delete the task history from already run
tasks. This is generally a recommended task to run on a monthly
basis to purge the old task data.
"""
typeof = "delete_old_executed_task"
[docs]
class DisableUnusedAdminTask(ScheduledTaskMixin, Element):
"""
A read-only task to disable any administrator account that has not been
used within the time set in the Administrator password policy.
"""
typeof = "disable_unused_admin_task"
[docs]
class DeleteOldSnapshotsTask(ScheduledTaskMixin, Element):
"""
A read-only management server task to delete snapshots since the last
scheduled run. For example, if this task is configured to run once per
month, snapshots older than 1 month will be deleted.
"""
typeof = "delete_old_snapshots_task"
[docs]
class RenewInternalCertificatesTask(ScheduledTaskMixin, Element):
"""
A read-only management server task that renews certificates used in
systems communications and send alerts about expiring certificates.
"""
typeof = "renew_internal_certificates_task"
[docs]
class RenewGatewayCertificatesTask(ScheduledTaskMixin, Element):
"""
A read-only management server task that renews certificates on internal
gateways which have automatic certificate renewal enabled.
"""
typeof = "renew_gw_certificates_task"
[docs]
class RenewInternalCATask(ScheduledTaskMixin, Element):
"""
A read-only management server task that renews certificate authorities
used in system communications and send alerts about expiring certificate
authorities.
"""
typeof = "renew_internal_ca_task"
[docs]
class FetchCertificateRevocationTask(ScheduledTaskMixin, Element):
"""
A read-only management server task to download updated certificate
revocation lists.
"""
typeof = "fetch_certificate_revocation_task"
[docs]
class ExportLogTask(ScheduledTaskMixin, Element):
"""
An export log task defines a way to export log data from the SMC.
When defining the task, you specify which servers to export from
(typically management AND log server/s), and which log types to
export.
.. note:: Log tasks currently support pre-defined time ranges such
as 'yesterday', 'last_week', etc. If creating custom time ranges
for tasks, use the SMC.
"""
typeof = "export_log_task"
[docs]
@classmethod
def create(
cls,
name,
servers=None,
time_range="yesterday",
file_name=None,
file_format="xml",
is_local_location=True,
filter_for_export=None,
all_logs=False,
overwrite_file_flag="overwrite",
server_directory_lst=None,
comment=None,
**kwargs
):
"""
Create a new export log task. Provide True to all_logs to export
all log types. Otherwise provide kwargs to specify each log by
type of interest.
:param str name: name for this task
:param servers: servers to back up. Servers must be instances of
management servers or log servers. If no value is provided, all
servers are backed up.
:type servers: list(ManagementServer or LogServer)
:param str time_range: specify a time range for the export. Valid
options are 'yesterday', 'last_full_week_sun_sat',
'last_full_week_mon_sun', 'last_full_month' (default 'yesterday')
:param str file_name: name of the file to export
:param str file_format: format of the file to export
options are:
***csv*** export in CSV format.
***xml*** export in XML format.
***zip*** export in ZIP format.
***cef*** export in CEF format.
***leef*** export in LEEF format.
***esm*** export in ESM format.
***snoop*** export IPS recordings as SNOOP.
***pcap*** export IPS recordings as PCAP.
:param FilterExpression filter_for_export: The Filter expression for the archive/export task
to be able to filter logs to consider. (default: FilterExpression('Match All')
:param bool all_logs: if True, all log types will be deleted. If this
is True, kwargs are ignored (default: False)
:param list server_directory_lst: see :func:`~server_directory` for keyword
arguments and default values.
:param bool is_local_location: Flag to know if the archive/export file will be stored on
the log server or locally.
:param str overwrite_file_flag: The overwrite options:
***append***: append option.
***overwrite***: overwrite option.
***use_number_in_file_name*** create an unique file with number as name.
***fail_task***: fail the task.
:param kwargs: see :func:`~log_target_types` for keyword
arguments and default values.
:raises ElementNotFound: specified servers were not found
:raises CreateElementFailed: failure to create the task
:return: the task
:rtype: ExportLogTask
"""
if not servers:
servers = [svr.href for svr in ManagementServer.objects.all()]
servers.extend([svr.href for svr in LogServer.objects.all()])
else:
servers = [svr.href for svr in servers]
json = {
"name": name,
"resources": servers,
"time_limit_type": time_range,
"start_time": 0,
"end_time": 0,
"file_name": file_name,
"file_format": file_format,
"is_local_location": is_local_location,
"overwrite_file_flag": overwrite_file_flag,
"filter_for_export": filter_for_export.href if filter_for_export is not None else None,
"comment": comment,
}
json.update(**log_target_types(all_logs, **kwargs))
json.update(**server_directories_settings(server_directory_lst))
return ElementCreator(cls, json)
[docs]
class ArchiveLogTask(ScheduledTaskMixin, Element):
"""
An archive log task defines a way to archive log data from the SMC.
When defining the task, you specify which servers to archive
(typically management AND log server/s), and which log types to
archive.
.. note:: Log tasks currently support pre-defined time ranges such
as 'yesterday', 'last_week', etc. If creating custom time ranges
for tasks, use the SMC.
"""
typeof = "archive_log_task"
[docs]
@classmethod
def create(
cls,
name,
servers=None,
time_range="yesterday",
filter_for_export=None,
filter_for_delete=None,
delete_source_data=False,
all_logs=False,
server_directory_lst=None,
is_local_location=False,
comment=None,
**kwargs
):
"""
Create a new archive log task. Provide True to all_logs to archive
all log types. Otherwise provide kwargs to specify each log by
type of interest.
:param str name: name for this task
:param servers: servers to back up. Servers must be instances of
management servers or log servers. If no value is provided, all
servers are backed up.
:type servers: list(ManagementServer or LogServer)
:param str time_range: specify a time range for the archive. Valid
options are 'yesterday', 'last_full_week_sun_sat',
'last_full_week_mon_sun', 'last_full_month' (default 'yesterday')
:param FilterExpression filter_for_export: The Filter expression for the archive/export task
to be able to filter logs to consider. (default: FilterExpression('Match All')
:param FilterExpression filter_for_delete: The Filter expression for the archive/delete task
to be able to filter logs for deletion. (default: FilterExpression('Match None')
:param bool delete_source_data: Flag to know if after the archive operation,
logs need to be deleted.
:param bool all_logs: if True, all log types will be archived. If this
is True, kwargs are ignored (default: False)
:param list server_directory_lst: Server directories used to retrieve logs.
see :func:`~server_directory` for keyword arguments and default values.
:param bool is_local_location: Flag to know if the archive/export file will be stored on
the log server or locally.
:param kwargs: see :func:`~log_target_types` for keyword
arguments and default values.
:raises ElementNotFound: specified servers were not found
:raises CreateElementFailed: failure to create the task
:return: the task
:rtype: ArchiveLogTask
"""
if not servers:
servers = [svr.href for svr in ManagementServer.objects.all()]
servers.extend([svr.href for svr in LogServer.objects.all()])
else:
servers = [svr.href for svr in servers]
json = {
"name": name,
"resources": servers,
"time_limit_type": time_range,
"file_format": "unknown",
"start_time": 0,
"end_time": 0,
"is_local_location": is_local_location,
"filter_for_export": filter_for_export.href if filter_for_export is not None else None,
"filter_for_delete": filter_for_delete.href if filter_for_delete is not None else None,
"delete_source_data": delete_source_data,
"comment": comment,
}
json.update(**log_target_types(all_logs, **kwargs))
json.update(**server_directories_settings(server_directory_lst))
return ElementCreator(cls, json)
[docs]
class TrafficCaptureInterfaceSettings(NestedDict):
def __init__(self, data):
super(TrafficCaptureInterfaceSettings, self).__init__(data=data)
[docs]
@classmethod
def create(cls, node_ref=None, pint_ref=None, filter=None):
"""
:param Node node_ref: Individual engine node on which traffic is being captured. Required.
:param PhysicalInterface pint_ref: Physical Interface Link on which traffic is being
captured. Required.
:param str filter: IP Address for filtering. By default, no ip address filtering.
:return: TrafficCaptureInterfaceSettings
"""
interface_setting = {
"node_ref": element_resolver(node_ref),
"pint_ref": element_resolver(pint_ref),
"filter": filter
}
return cls(interface_setting)
[docs]
@classmethod
def create_interface_setting_for_all_node(cls, engine, pint_ref=None, filter=None):
"""
This method is used to create interface settings for all nodes of the engine with
the same filter for the given interface.
:param Engine engine: Engine on which traffic is being captured. Required.
:param PhysicalInterface pint_ref: Physical Interface Link on which traffic is being
captured. Required.
:param str filter: IP Address for filtering. By default, no ip address filtering.
:return: list(TrafficCaptureInterfaceSettings)
"""
list_of_interface_settings = []
for node in engine.nodes:
interface_setting = {
"node_ref": element_resolver(node),
"pint_ref": element_resolver(pint_ref),
"filter": filter
}
list_of_interface_settings.append(cls(interface_setting))
return list_of_interface_settings
[docs]
class TrafficCaptureTask(ScheduledTaskMixin, Element):
typeof = "traffic_capture_task"
[docs]
@classmethod
def create(
cls,
name,
interface_settings=None,
max_file_size_in_mb=500,
capture_headers_only=False,
description=None,
duration_in_sec=3600,
sg_info_option=False,
destination_file=None,
comment=None,
):
"""
This creates the Traffic Capture task element. It captures traffic on specific engine's
physical interface with some parameters given below.
:param str name: name of the traffic capture task.
:param list(TrafficCaptureInterfaceSettings) interface_settings: The traffic capture
interface settings to capture traffic.
:param int max_file_size_in_mb: The Maximum size of file the Traffic to be captured in MB.
:param bool capture_headers_only: Decide whether to capture only header or not.
By default: False
:param str description: The Description about traffic capture task.
:param int duration_in_sec: The duration in second traffic capture task will run.
:param int sg_info_option: This flag will decide to include sginfo.
By default: False.
:param str destination_file: The destination file path of traffic capture file to store.
:param str comment: optional comment.
@return:
"""
if is_api_version_less_than("6.10"):
raise UnsupportedEngineFeature(
"The traffic capture task is not supported for API versions less than 6.10.")
interface_settings = interface_settings if interface_settings else []
json = {
"max_file_size": max_file_size_in_mb,
"capture_headers_only": capture_headers_only,
"description": description,
"duration": duration_in_sec,
"sg_info_option": sg_info_option,
"comment": comment,
"destination_file": destination_file,
"name": name,
"interface_setting": [setting.data for setting in interface_settings]
}
return ElementCreator(cls, json)
@property
def max_file_size(self):
return self.data.get("max_file_size")
@property
def capture_headers_only(self):
return self.data.get("capture_headers_only")
@property
def duration(self):
return self.data.get("duration")
@property
def interface_setting(self):
return self.data.get("interface_setting")
@property
def sg_info_option(self):
return self.data.get("sg_info_option")