change: added configuration mover
This commit is contained in:
@@ -0,0 +1,420 @@
|
||||
# zabbix_utils
|
||||
#
|
||||
# Copyright (C) 2001-2023 Zabbix SIA
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person
|
||||
# obtaining a copy of this software and associated documentation
|
||||
# files (the "Software"), to deal in the Software without restriction,
|
||||
# including without limitation the rights to use, copy, modify,
|
||||
# merge, publish, distribute, sublicense, and/or sell copies
|
||||
# of the Software, and to permit persons to whom the Software
|
||||
# is furnished to do so, subject to the following conditions:
|
||||
|
||||
# The above copyright notice and this permission notice shall be
|
||||
# included in all copies or substantial portions of the Software.
|
||||
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
|
||||
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
||||
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
||||
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
|
||||
# OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
||||
import ssl
|
||||
import json
|
||||
import base64
|
||||
import logging
|
||||
import urllib.request as ul
|
||||
from textwrap import shorten
|
||||
|
||||
from uuid import uuid4
|
||||
from os import environ as env
|
||||
from urllib.error import URLError
|
||||
|
||||
from typing import Callable, Optional, Any
|
||||
|
||||
from .types import APIVersion
|
||||
from .common import ModuleUtils
|
||||
from .logger import EmptyHandler, SensitiveFilter
|
||||
from .exceptions import APIRequestError, APINotSupported, ProcessingError
|
||||
from .version import __version__, __min_supported__, __max_supported__
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
log.addHandler(EmptyHandler())
|
||||
log.addFilter(SensitiveFilter())
|
||||
|
||||
|
||||
class APIObject():
|
||||
"""Zabbix API object.
|
||||
|
||||
Args:
|
||||
name (str): Zabbix API object name.
|
||||
parent (class): Zabbix API parent of the object.
|
||||
"""
|
||||
|
||||
def __init__(self, name: str, parent: Callable):
|
||||
self.object = name
|
||||
self.parent = parent
|
||||
|
||||
def __getattr__(self, name: str) -> Callable:
|
||||
"""Dynamic creation of an API method.
|
||||
|
||||
Args:
|
||||
name (str): Zabbix API object method name.
|
||||
|
||||
Raises:
|
||||
TypeError: Raises if gets unexpected arguments.
|
||||
|
||||
Returns:
|
||||
Callable: Zabbix API method.
|
||||
"""
|
||||
|
||||
# For compatibility with Python less 3.9 versions
|
||||
def removesuffix(string: str, suffix: str) -> str:
|
||||
return str(string[:-len(suffix)]) if suffix and string.endswith(suffix) else string
|
||||
|
||||
def func(*args: Any, **kwargs: Any) -> Any:
|
||||
if args and kwargs:
|
||||
raise TypeError("Only args or kwargs should be used.")
|
||||
|
||||
# Support '_' suffix to avoid conflicts with python keywords
|
||||
method = removesuffix(self.object, '_') + "." + removesuffix(name, '_')
|
||||
|
||||
# Support passing list of ids and params as a dict
|
||||
params = kwargs or (
|
||||
(args[0] if type(args[0]) in (list, dict,) else list(args)) if args else None)
|
||||
|
||||
log.debug("Executing %s method", method)
|
||||
|
||||
need_auth = method not in ModuleUtils.UNAUTH_METHODS
|
||||
|
||||
return self.parent.send_api_request(
|
||||
method,
|
||||
params,
|
||||
need_auth
|
||||
).get('result')
|
||||
|
||||
return func
|
||||
|
||||
|
||||
class ZabbixAPI():
|
||||
"""Provide interface for working with Zabbix API.
|
||||
|
||||
Args:
|
||||
url (str, optional): Zabbix API URL. Defaults to `http://localhost/zabbix/api_jsonrpc.php`.
|
||||
token (str, optional): Zabbix API token. Defaults to `None`.
|
||||
user (str, optional): Zabbix API username. Defaults to `None`.
|
||||
password (str, optional): Zabbix API user's password. Defaults to `None`.
|
||||
http_user (str, optional): Basic Authentication username. Defaults to `None`.
|
||||
http_password (str, optional): Basic Authentication password. Defaults to `None`.
|
||||
skip_version_check (bool, optional): Skip version compatibility check. Defaults to `False`.
|
||||
validate_certs (bool, optional): Specifying certificate validation. Defaults to `True`.
|
||||
timeout (int, optional): Connection timeout to Zabbix API. Defaults to `30`.
|
||||
"""
|
||||
|
||||
__version = None
|
||||
__use_token = False
|
||||
__session_id = None
|
||||
__basic_cred = None
|
||||
|
||||
def __init__(self, url: Optional[str] = None, token: Optional[str] = None,
|
||||
user: Optional[str] = None, password: Optional[str] = None,
|
||||
http_user: Optional[str] = None, http_password: Optional[str] = None,
|
||||
skip_version_check: bool = False, validate_certs: bool = True,
|
||||
ssl_context: Optional[ssl.SSLContext] = None, timeout: int = 30):
|
||||
|
||||
url = url or env.get('ZABBIX_URL') or 'http://localhost/zabbix/api_jsonrpc.php'
|
||||
user = user or env.get('ZABBIX_USER') or None
|
||||
password = password or env.get('ZABBIX_PASSWORD') or None
|
||||
token = token or env.get('ZABBIX_TOKEN') or None
|
||||
|
||||
self.url = ModuleUtils.check_url(url)
|
||||
self.validate_certs = validate_certs
|
||||
self.timeout = timeout
|
||||
|
||||
# HTTP Auth unsupported since Zabbix 7.2
|
||||
if http_user and http_password:
|
||||
self.__basic_auth(http_user, http_password)
|
||||
|
||||
if ssl_context is not None:
|
||||
if not isinstance(ssl_context, ssl.SSLContext):
|
||||
raise TypeError(
|
||||
'Parameter "ssl_context" must be an "ssl.SSLContext".') from None
|
||||
self.ssl_context = ssl_context
|
||||
|
||||
self.__check_version(skip_version_check)
|
||||
|
||||
if self.version > 7.0 and http_user and http_password:
|
||||
raise APINotSupported("HTTP authentication unsupported since Zabbix 7.2.")
|
||||
|
||||
if token or user or password:
|
||||
self.login(token, user, password)
|
||||
|
||||
def __getattr__(self, name: str) -> Callable:
|
||||
"""Dynamic creation of an API object.
|
||||
|
||||
Args:
|
||||
name (str): Zabbix API method name.
|
||||
|
||||
Returns:
|
||||
APIObject: Zabbix API object instance.
|
||||
"""
|
||||
|
||||
return APIObject(name, self)
|
||||
|
||||
def __enter__(self) -> Callable:
|
||||
return self
|
||||
|
||||
def __exit__(self, *args) -> None:
|
||||
self.logout()
|
||||
|
||||
def __basic_auth(self, user: str, password: str) -> None:
|
||||
"""Enable Basic Authentication using.
|
||||
|
||||
Args:
|
||||
user (str): Basic Authentication username.
|
||||
password (str): Basic Authentication password.
|
||||
"""
|
||||
|
||||
log.debug(
|
||||
"Enable Basic Authentication with username:%s password:%s",
|
||||
user,
|
||||
ModuleUtils.HIDING_MASK
|
||||
)
|
||||
|
||||
self.__basic_cred = base64.b64encode(
|
||||
f"{user}:{password}".encode()
|
||||
).decode()
|
||||
|
||||
def api_version(self) -> APIVersion:
|
||||
"""Return object of Zabbix API version.
|
||||
|
||||
Returns:
|
||||
APIVersion: Object of Zabbix API version
|
||||
"""
|
||||
|
||||
if self.__version is None:
|
||||
self.__version = APIVersion(self.apiinfo.version())
|
||||
return self.__version
|
||||
|
||||
@property
|
||||
def version(self) -> APIVersion:
|
||||
"""Return object of Zabbix API version.
|
||||
|
||||
Returns:
|
||||
APIVersion: Object of Zabbix API version.
|
||||
"""
|
||||
|
||||
return self.api_version()
|
||||
|
||||
def login(self, token: Optional[str] = None, user: Optional[str] = None,
|
||||
password: Optional[str] = None) -> None:
|
||||
"""Login to Zabbix API.
|
||||
|
||||
Args:
|
||||
token (str, optional): Zabbix API token. Defaults to `None`.
|
||||
user (str, optional): Zabbix API username. Defaults to `None`.
|
||||
password (str, optional): Zabbix API user's password. Defaults to `None`.
|
||||
"""
|
||||
|
||||
if token:
|
||||
if self.version < 5.4:
|
||||
raise APINotSupported(
|
||||
message="Token usage",
|
||||
version=self.version
|
||||
)
|
||||
if user or password:
|
||||
raise ProcessingError(
|
||||
"Token cannot be used with username and password")
|
||||
self.__use_token = True
|
||||
self.__session_id = token
|
||||
return
|
||||
|
||||
if not user:
|
||||
raise ProcessingError("Username is missing")
|
||||
if not password:
|
||||
raise ProcessingError("User password is missing")
|
||||
|
||||
if self.version < 5.4:
|
||||
user_cred = {
|
||||
"user": user,
|
||||
"password": password
|
||||
}
|
||||
else:
|
||||
user_cred = {
|
||||
"username": user,
|
||||
"password": password
|
||||
}
|
||||
|
||||
log.debug(
|
||||
"Login to Zabbix API using username:%s password:%s", user, ModuleUtils.HIDING_MASK
|
||||
)
|
||||
self.__use_token = False
|
||||
self.__session_id = self.user.login(**user_cred)
|
||||
|
||||
log.debug("Connected to Zabbix API version %s: %s", self.version, self.url)
|
||||
|
||||
def logout(self) -> None:
|
||||
"""Logout from Zabbix API."""
|
||||
|
||||
if self.__session_id:
|
||||
if self.__use_token:
|
||||
self.__session_id = None
|
||||
self.__use_token = False
|
||||
return
|
||||
|
||||
log.debug("Logout from Zabbix API")
|
||||
self.user.logout()
|
||||
self.__session_id = None
|
||||
else:
|
||||
log.debug("You're not logged in Zabbix API")
|
||||
|
||||
def check_auth(self) -> bool:
|
||||
"""Check authentication status in Zabbix API.
|
||||
|
||||
Returns:
|
||||
bool: User authentication status (`True`, `False`)
|
||||
"""
|
||||
|
||||
if not self.__session_id:
|
||||
log.debug("You're not logged in Zabbix API")
|
||||
return False
|
||||
|
||||
if self.__use_token:
|
||||
log.debug("Check auth session using token in Zabbix API")
|
||||
refresh_resp = self.user.checkAuthentication(token=self.__session_id)
|
||||
else:
|
||||
log.debug("Check auth session using sessionid in Zabbix API")
|
||||
refresh_resp = self.user.checkAuthentication(sessionid=self.__session_id)
|
||||
|
||||
return bool(refresh_resp.get('userid'))
|
||||
|
||||
def send_api_request(self, method: str, params: Optional[dict] = None,
|
||||
need_auth=True) -> dict:
|
||||
"""Function for sending request to Zabbix API.
|
||||
|
||||
Args:
|
||||
method (str): Zabbix API method name.
|
||||
params (dict, optional): Params for request body. Defaults to `None`.
|
||||
need_auth (bool, optional): Authorization using flag. Defaults to `False`.
|
||||
|
||||
Raises:
|
||||
ProcessingError: Wrapping built-in exceptions during request processing.
|
||||
APIRequestError: Wrapping errors from Zabbix API.
|
||||
|
||||
Returns:
|
||||
dict: Dictionary with Zabbix API response.
|
||||
"""
|
||||
|
||||
request_json = {
|
||||
'jsonrpc': '2.0',
|
||||
'method': method,
|
||||
'params': params or {},
|
||||
'id': str(uuid4()),
|
||||
}
|
||||
|
||||
headers = {
|
||||
'Accept': 'application/json',
|
||||
'Content-Type': 'application/json-rpc',
|
||||
'User-Agent': f"{__name__}/{__version__}"
|
||||
}
|
||||
|
||||
if need_auth:
|
||||
if not self.__session_id:
|
||||
raise ProcessingError("You're not logged in Zabbix API")
|
||||
if self.version < 6.4:
|
||||
request_json['auth'] = self.__session_id
|
||||
elif self.version <= 7.0 and self.__basic_cred is not None:
|
||||
request_json['auth'] = self.__session_id
|
||||
else:
|
||||
headers["Authorization"] = f"Bearer {self.__session_id}"
|
||||
|
||||
if self.__basic_cred is not None:
|
||||
headers["Authorization"] = f"Basic {self.__basic_cred}"
|
||||
|
||||
log.debug(
|
||||
"Sending request to %s with body: %s",
|
||||
self.url,
|
||||
request_json
|
||||
)
|
||||
|
||||
req = ul.Request(
|
||||
self.url,
|
||||
data=json.dumps(request_json).encode("utf-8"),
|
||||
headers=headers,
|
||||
method='POST'
|
||||
)
|
||||
req.timeout = self.timeout
|
||||
|
||||
# Disable SSL certificate validation if needed.
|
||||
if not self.validate_certs:
|
||||
ctx = ssl.create_default_context()
|
||||
ctx.check_hostname = False
|
||||
ctx.verify_mode = ssl.CERT_NONE
|
||||
elif self.ssl_context is not None:
|
||||
ctx = self.ssl_context
|
||||
else:
|
||||
ctx = None
|
||||
|
||||
try:
|
||||
resp = ul.urlopen(req, context=ctx)
|
||||
resp_json = json.loads(resp.read().decode('utf-8'))
|
||||
except URLError as err:
|
||||
raise ProcessingError(f"Unable to connect to {self.url}:", err) from None
|
||||
except ValueError as err:
|
||||
raise ProcessingError("Unable to parse json:", err) from None
|
||||
|
||||
if method not in ModuleUtils.FILES_METHODS:
|
||||
log.debug(
|
||||
"Received response body: %s",
|
||||
resp_json
|
||||
)
|
||||
else:
|
||||
debug_json = resp_json.copy()
|
||||
if debug_json.get('result'):
|
||||
debug_json['result'] = shorten(debug_json['result'], 200, placeholder='...')
|
||||
log.debug(
|
||||
"Received response body (clipped): %s",
|
||||
json.dumps(debug_json, indent=4, separators=(',', ': '))
|
||||
)
|
||||
|
||||
if 'error' in resp_json:
|
||||
err = resp_json['error'].copy()
|
||||
err['body'] = request_json.copy()
|
||||
raise APIRequestError(err)
|
||||
|
||||
return resp_json
|
||||
|
||||
def __check_version(self, skip_check: bool) -> None:
|
||||
|
||||
skip_check_help = "If you're sure zabbix_utils will work properly with your current \
|
||||
Zabbix version you can skip this check by \
|
||||
specifying skip_version_check=True when create ZabbixAPI object."
|
||||
if self.version < __min_supported__:
|
||||
if skip_check:
|
||||
log.debug(
|
||||
"Version of Zabbix API [%s] is less than the library supports. %s",
|
||||
self.version,
|
||||
"Further library use at your own risk!"
|
||||
)
|
||||
else:
|
||||
raise APINotSupported(
|
||||
f"Version of Zabbix API [{self.version}] is not supported by the library. " +
|
||||
f"The oldest supported version is {__min_supported__}.0. " + skip_check_help
|
||||
)
|
||||
|
||||
if self.version > __max_supported__:
|
||||
if skip_check:
|
||||
log.debug(
|
||||
"Version of Zabbix API [%s] is more than the library was tested on. %s",
|
||||
self.version,
|
||||
"Recommended to update the library. Further library use at your own risk!"
|
||||
)
|
||||
else:
|
||||
raise APINotSupported(
|
||||
f"Version of Zabbix API [{self.version}] was not tested with the library. " +
|
||||
f"The latest tested version is {__max_supported__}.0. " + skip_check_help
|
||||
)
|
||||
Reference in New Issue
Block a user