'''
Exports
=======
The following methods allow for interaction into the Tenable Vulnerability Management
:devportal:`exports <exports>` API endpoints.
Methods available on ``tio.exports``:
.. rst-class:: hide-signature
.. autoclass:: ExportsAPI
:members:
'''
from uuid import UUID
from json.decoder import JSONDecodeError
from typing_extensions import Literal
from typing import Dict, Union, List
from marshmallow import Schema
from tenable.base.endpoint import APIEndpoint
from .schema import AssetExportSchema, VulnExportSchema, ComplianceExportSchema
from .iterator import ExportsIterator
[docs]class ExportsAPI(APIEndpoint):
[docs] def cancel(self,
export_type: Literal['vulns', 'assets', 'compliance'],
export_uuid: UUID,
) -> str:
'''
Cancels the specified export job.
API Documentation for cancel export jobs with
:devportal:`assets <exports-assets-export-cancel>`,
:devportal:`compliance <io-exports-compliance-cancel>`, and
:devportal:`vulnerabilities <exports-vulns-export-cancel>` datatypes.
Args:
export_type:
The type of export job that we are to cancel.
export_uuid:
The export job's unique identifier.
Returns:
str:
The status of the job.
Example:
>>> tio.exports.cancel('vuln', '{UUID}')
'CANCELLED'
'''
return self._api.post(f'{export_type}/export/{export_uuid}/cancel',
box=True
).get('status')
[docs] def download_chunk(self,
export_type: Literal['vulns', 'assets', 'compliance'],
export_uuid: UUID,
chunk_id: int,
retries: int = 3
) -> List:
'''
Downloads an export chunk from the specified job.
API Documentation for downloading an export chunk for
:devportal:`assets <exports-assets-download-chunk>`,
:devportal:`compliance <io-exports-compliance-download>`, and
:devportal:`vulnerabilities <exports-vulns-download-chunk>`.
Args:
export_type:
The type of export job
export_uuid:
The export job's unique identifier.
chunk_id:
The identifier for the specific chunk to download.
Returns:
List:
The list of objects that entail the chunk of data requested.
Example:
>>> chunk = tio.exports.download_chunk('vulns', '{UUID}', 1)
'''
# We will attempt to download a chunk of data and convert it into JSON.
# If the conversion fails, then we will increment our own retry counter
# and attempt to download the chunk again. After 3 attempts, we will
# assume that the chunk is dead and return an empty list.
downloaded = False
counter = 0
resp = []
while not downloaded and counter <= retries:
try:
resp = self._api.get(
f'{export_type}/export/{export_uuid}/chunks/{chunk_id}'
).json()
downloaded = True
except JSONDecodeError:
self._log.warning((
f'{export_type} export {export_uuid} encountered an '
f'invalid chunk on chunk id {chunk_id}'
))
counter += 1
if len(resp) < 1:
self._log.warning((
f'{export_type} export {export_uuid} encoundered an empty '
f'chunk on chunk id {chunk_id}'
))
return resp
[docs] def status(self,
export_type: Literal['vulns', 'assets', 'compliance'],
export_uuid: UUID,
) -> Dict:
'''
Gets the status of the export job.
API Documentation for the status of an export job for the
:devportal:`assets <exports-assets-export-status>`,
:devportal:`compliance <io-exports-compliance-status>`, and
:devportal:`vulnerabilities <exports-vulns-export-status>` datatypes.
Args:
export_type (str):
The datatype of the export job.
export_uuid (str):
The UUID of the export job.
Examples:
>>> status = tio.exports.status('vulns', {UUID}')
'''
return self._api.get(f'{export_type}/export/{export_uuid}/status',
box=True,
)
[docs] def jobs(self,
export_type: Literal['vulns', 'assets'],
) -> Dict:
'''
Returns the list of jobs available for a given datatype.
API Documentation for the job listing APIs for
:devportal:`assets <exports-assets-export-status-recent>`, and
:devportal:`vulnerabilities <exports-vulns-export-status-recent>`
datatypes.
Args:
export_type (str):
The datatype of export to get the jobs for.
Examples:
>>> jobs = tio.exports.jobs('vulns')
'''
return self._api.get(f'{export_type}/export/status', box=True).exports
[docs] def initiate_export(self,
export_type: Literal['vulns', 'assets', 'compliance'],
**kwargs):
"""
Initiate an export job of the specified export type, and return the export UUID.
This method accepts the key-value arguments supported by the methods assets(), vulns(), and compliance()
for the matching export_type. For example, when the export_type is "assets", this function will only support the
kwargs supported by the assets() method; if export_type is "vulns", the method will accept only those
supported by the vulns() method, and so forth.
Args:
export_type (str):
The datatype of export to get the jobs for.
Examples:
Initiating an assets export with no extra params.
>>> export_uuid = tio.exports.initiate_export("assets")
Initiating a vulns export with the params supported by vulns()
>>> export_uuid = tio.exports.initiate_export("vulns", timeout=10)
"""
# Setting the schema for the specified export type.
if export_type == "vulns":
schema = VulnExportSchema()
elif export_type == "assets":
schema = AssetExportSchema()
elif export_type == "compliance":
schema = ComplianceExportSchema()
payload = schema.dump(schema.load(kwargs))
# Initiating the export and returning the returned export UUID.
return self._api.post(f'{export_type}/export', json=payload, box=True).export_uuid
def _export(self,
export_type: Literal['vulns', 'assets', 'compliance'],
schema: Schema,
**kwargs
) -> Union[ExportsIterator, UUID]:
'''
Get the list of jobs for for the specified datatype.
API Documentation for the job listings for
:devportal:`assets <exports-assets-request-export>`,
:devportal:`compliance <io-exports-compliance-create>`, and
:devportal:`vulnerabilities <exports-vulns-request-export>` datatypes.
'''
export_uuid = kwargs.pop('uuid', None)
use_iterator = kwargs.pop('use_iterator', True)
when_done = kwargs.pop('when_done', False)
Iterator = kwargs.pop('iterator', ExportsIterator) # noqa: PLC0103
timeout = kwargs.pop('timeout', None)
payload = schema.dump(schema.load(kwargs))
if not export_uuid:
export_uuid = self._api.post(f'{export_type}/export',
json=payload,
box=True
).export_uuid
self._log.debug(
f'{export_type} export job {export_uuid} initiated'
)
if use_iterator:
return Iterator(self._api,
type=export_type,
uuid=export_uuid,
_wait_for_complete=when_done,
timeout=timeout
)
return UUID(export_uuid)
[docs] def assets(self, **kwargs) -> Union[ExportsIterator, UUID]:
'''
Initiate an asset export.
:devportal:`API Documentation <exports-assets-request-export>`
Args:
last_scan_id (str, optional):
Scan uuid of the scan to be exported.
created_at (int, optional):
Assets created after this timestamp will be returned.
deleted_at (int, optional):
Assets deleted after this timestamp will be returned.
first_scan_time (int, optional):
Assets with a first_scan time later that this timestamp
will be returned.
last_assessed (int, optional):
Assets last scanned after this timestamp will be returned.
last_authenticated_scan_time (int, optional):
Assets last scanned with an authenticated scan after this
timestamp will be returned.
terminated_at (int, optional):
Assets terminated after this timestamp will be returned.
updated_at (int, optional):
Assets updated after this timestamp will be returned.
has_plugin_results (bool, optional):
Should assets only be returned if they have plugin results?
is_deleted (bool, optional):
Should we return only assets that have been deleted?
is_licensed (bool, optional):
Should we return only assets that are licensed?
is_terminated (bool, optional):
Should we return assets that have been terminated?
servicenow_sysid (bool, optional):
Should we return assets that have a ServiceNOW sysid?
if ``True`` only assets with an id will be returned.
if ``False`` only assets without an id will be returned.
include_open_ports (bool, optional):
Should we include open ports of assets in the exported chunks?
chunk_size (int, optional):
How many asset objects should be returned per chunk of data?
The default is ``1000``.
network_id (str, optional):
Only assets within the specified network UUID will be returned.
sources (list[str], optional):
Only assets with a source matching one of these source values
will be returned. Note that this value is case-sensitive.
tags (list[tuple[str, str]], optional):
A list of tag pairs to filter the results on. The tag pairs
should be presented as ``('CATEGORY', 'VALUE')``.
uuid (str, optional):
A predefined export UUID to use for generating an
ExportIterator. Using this parameter will ignore all of the
filter arguments.
use_iterator (bool, optional):
Determines if we should return an iterator, or simply the
export job UUID. The default is to return an iterator.
when_done (bool, optional):
When creating the iterator, setting this flag to true will tell
the iterator to wait until the export job has completed before
processing the first chunk. The default behaviour is to start
processing chunks of data as soon as they become available.
timeout (int, optional):
If specified, determines a timeout in seconds to wait for the
export job to sit in the queue before cancelling the job and
raising a ``TioExportsTimeout`` error. Once a job has started
to be processed, the timeout is ignored.
iterator (Iterator, optional):
Supports overloading the iterator class to be used to process
the datachunks.
Examples:
Iterating over the results of an asset export:
>>> for asset in tio.exports.assets():
... print(asset)
Getting hosts that have been updated within the last 24 hours
>>> assets = tio.exports.assets(
... updated_at=int(arrow.now().shift(days=-1).timestamp())
... )
Getting assets that have the the ``Region:Chicago`` tag:
>>> assets = tio.exports.assets(
... tags=[('Region', 'Chicago')]
... )
'''
return self._export('assets', AssetExportSchema(), **kwargs)
[docs] def compliance(self, **kwargs) -> Union[ExportsIterator, UUID]:
'''
Initiate a compliance export.
:devportal:`API Documentation <io-exports-compliance-create>`
Args:
asset (list[str], optional):
A list of assets to return compliance results for.
first_seen (int, optional):
Returns findings with a first seen time newer than the
specified unix timestamp.
last_seen (int, optional):
Returns findings with a last seen time newer than the
specified unix timestamp.
ipv4_addresses (list[str], optional):
Returns Compliance findings found for the provided list of ipv4 addresses.
ipv6_addresses (list[str], optional):
Returns Compliance findings found for the provided list of ipv6 addresses.
plugin_name (list[str], optional):
Returns Compliance findings for the specified list of plugin names.
plugin_id (list[int], optional):
Returns Compliance findings for the specified list of plugin IDs.
asset_tags (list[str], optional):
Returns Compliance findings for the specified list of asset tags.
audit_name (str, optional):
Restricts compliance findings to those associated with the specified audit.
audit_file_name (str, optional):
Restricts compliance findings to those associated with the specified audit file name.
compliance_results (list[str], optional):
Restricts compliance findings to those associated with the specified list of compliance results,
such as PASSED, FAILED, SKIPPED, ERROR, UNKNOWN etc.
last_observed (int,optional):
Restricts compliance findings to those that were last observed on or after the specified unix timestamp.
indexed_at (int, optional):
Restricts compliance findings to those that were updated or indexed into Tenable Vulnerability Management
on or after the specified unix timestamp.
since (int, optional):
Same as indexed_at. Restricts compliance findings to those that were updated or indexed into Tenable
Vulnerability Management on or after the specified unix timestamp.
state (list[str], optional):
Restricts compliance findings to those associated with the provided list of states, such as open, reopened and fixed.
num_findings (int):
The number of findings to return per chunk of data. If left
unspecified, the default is ``5000``.
uuid (str, optional):
A predefined export UUID to use for generating an
ExportIterator. Using this parameter will ignore all of the
filter arguments.
use_iterator (bool, optional):
Determines if we should return an iterator, or simply the
export job UUID. The default is to return an iterator.
when_done (bool, optional):
When creating the iterator, setting this flag to true will tell
the iterator to wait until the export job has completed before
processing the first chunk. The default behaviour is to start
processing chunks of data as soon as they become available.
timeout (int, optional):
If specified, determines a timeout in seconds to wait for the
export job to sit in the queue before cancelling the job and
raising a ``TioExportsTimeout`` error. Once a job has started
to be processed, the timeout is ignored.
iterator (Iterator, optional):
Supports overloading the iterator class to be used to process
the datachunks.
Examples:
>>> for findings in tio.exports.compliance():
... print(finding)
'''
return self._export('compliance', ComplianceExportSchema(), **kwargs)
[docs] def vulns(self, **kwargs) -> Union[ExportsIterator, UUID]:
'''
Initiate a vulnerability export.
:devportal:`API Documentation <exports-vulns-request-export>`
Args:
first_found (int, optional):
Findings first discovered after this timestamp will be
returned.
indexed_at (int, optional):
Findings indexed into Tenable Vulnerability Management after this timestamp will
be returned.
last_fixed (int, optional):
Findings fixed after this timestamp will be returned. Note
that this filter only applies to fixed data and should not be
used when searching for active findings.
last_found (int, optional):
Findings last observed after this timestamp will be returned.
since (int, optional):
Findings last observed in any state after this timestamp will
be returned. Cannot be used with ``last_found``,
``first_found``, or ``last_fixed``.
plugin_family (list[str], optional):
Only return findings from the specified plugin families.
plugin_id (list[int], optional):
Only return findings from the specified plugin ids.
plugin_type (str, optional):
Only return findings with the specified plugin type.
scan_uuid (uuid, optional):
Only return findings with the specified scan UUID.
source (list[str], optional):
Only return vulnerabilities for assets that have the specified scan source.
severity_modification_type (list[str], optional):
Only return vulnerabilities with the specified severity modification type.
severity (list[str], optional):
Only return findings with the specified severities.
state (list[str], optional):
Only return findings with the specified states.
vpr_score (dict, optional):
Only returns findings that meet the specified VPR criteria.
The filter is formatted as a dictionary with the mathematical
operation as the key. Supported operations are:
.. list-table:: Supported Operations
:widths: auto
:header-rows: 1
* - Operation
- Type
- Description
* - ``eq``
- list[float]
- List of VPR scores that the findings must match.
* - ``neq``
- list[float]
- List of VPR scores that the findings can not match.
* - ``gt``
- float
- VPR scores must be greater than the specified value.
* - ``gte``
- float
- VPR scores must be greater than or equal to the
specified value.
* - ``lt``
- float
- VPR scores must be less than the specified value.
* - ``lte``
- float
- VPR scores must be less than or equal to the
specified value.
network_id (str, optional):
Only findings within the specified network UUID will be
returned.
cidr_range (str, optional):
Restrict the export to only vulns assigned to assets within the
CIDR range specified.
tags (list[tuple[str, str]], optional):
A list of tag pairs to filter the results on. The tag pairs
should be presented as ``('CATEGORY', 'VALUE')``.
include_unlicensed (bool, optional):
Should findings for assets that are not licensed be included in
the results?
num_assets (int, optional):
As findings are grouped by asset, how many assets's findings
should exist within each data chunk? If left unspecified the
default is ``500``.
uuid (str, optional):
A predefined export UUID to use for generating an
ExportIterator. Using this parameter will ignore all of the
filter arguments.
use_iterator (bool, optional):
Determines if we should return an iterator, or simply the
export job UUID. The default is to return an iterator.
when_done (bool, optional):
When creating the iterator, setting this flag to true will tell
the iterator to wait until the export job has completed before
processing the first chunk. The default behaviour is to start
processing chunks of data as soon as they become available.
timeout (int, optional):
If specified, determines a timeout in seconds to wait for the
export job to sit in the queue before cancelling the job and
raising a ``TioExportsTimeout`` error. Once a job has started
to be processed, the timeout is ignored.
iterator (Iterator, optional):
Supports overloading the iterator class to be used to process
the datachunks.
Examples:
Examples:
Iterating over the results of an vuln export:
>>> for vuln in tio.exports.vulns():
... print(vuln)
Getting findings that have been observed within the last 24 hours
>>> vulns = tio.exports.vulns(
... since=int(arrow.now().shift(days=-1).timestamp())
... )
Getting findings that have the the ``Region:Chicago`` tag:
>>> vulns = tio.exports.vulns(
... tags=[('Region', 'Chicago')]
... )
'''
return self._export('vulns', VulnExportSchema(), **kwargs)