Skip to content

Utils

Utility modules providing helper functions and common operations.


Base Utils

base_utils

new_parse_input_args(token_info)

Parse and validate the input token information.

Parameters:

Name Type Description Default
token_info dict or str

Dictionary containing token information or a file path to a YAML/JSON file with token information.

required

Returns:

Type Description
dict

Parsed token information for supported applications.

Raises:

Type Description
ValueError

If the token_info is invalid.

Note

When "unified" credentials are present, any "glp" or "new_central" entries in the same dict are ignored. Unified mode requires client credentials (client_id, client_secret, and workspace_id). An optional access_token can be provided to skip initial token creation; the client credentials are still required for token refresh when the access token expires. Pass base_url or cluster_name under unified to also enable Central API calls.

Source code in pycentral/utils/base_utils.py
def new_parse_input_args(token_info):
    """
    Parse and validate the input token information.

    Args:
        token_info (dict or str): Dictionary containing token information or a file path
            to a YAML/JSON file with token information.

    Returns:
        (dict): Parsed token information for supported applications.

    Raises:
        ValueError: If the token_info is invalid.

    Note:
        When "unified" credentials are present, any "glp" or "new_central" entries
        in the same dict are ignored. Unified mode requires client credentials
        (`client_id`, `client_secret`, and `workspace_id`). An optional
        `access_token` can be provided to skip initial token creation; the
        client credentials are still required for token refresh when the
        access token expires. Pass `base_url` or `cluster_name` under
        `unified` to also enable Central API calls.
    """
    token_info = load_token_info(token_info)

    if not token_info:
        raise ValueError(
            f"No valid token information provided. Supported apps: {', '.join(SUPPORTED_APPS)}"
        )

    apps_token_info = {}

    # Unified credentials supersede standalone glp/new_central — warn then discard them
    if "unified" in token_info:
        overridden = [k for k in token_info if k in ("glp", "new_central")]
        if overridden:

            warnings.warn(
                f"'unified' credentials were provided alongside {overridden}. "
                f"The {overridden} entries will be ignored; 'unified' takes precedence.",
                UserWarning,
                stacklevel=3,
            )

        unified = dict(token_info["unified"])

        # GLP is always the primary endpoint
        unified["glp_base_url"] = valid_url(
            unified.get("glp_base_url", GLP_URLS["BaseURL"])
        )

        _validate_token_creation_keys("unified", unified)

        unified["base_url"] = _resolve_base_url("unified", unified)
        unified.pop("cluster_name", None)

        # Precompute token URL for token creation/refresh
        if unified.get("workspace_id"):
            unified["_token_url"] = (
                f"{AUTHENTICATION['OAUTH_GLOBAL']}/{unified['workspace_id']}/token"
            )

        apps_token_info["unified"] = {**UNIFIED_DEFAULT_ARGS, **unified}

    else:
        for app, app_token_info in token_info.items():
            if app not in SUPPORTED_APPS:
                raise ValueError(
                    f"Unknown app name '{app}' provided. Supported apps: {', '.join(SUPPORTED_APPS)}"
                )

            app_token_info["base_url"] = _resolve_base_url(app, app_token_info)
            _validate_token_creation_keys(app, app_token_info)
            app_token_info["_token_url"] = AUTHENTICATION["OAUTH"]
            apps_token_info[app] = {**NEW_CENTRAL_C_DEFAULT_ARGS, **app_token_info}

    return apps_token_info

load_token_info(token_info)

Load token information from a file if it's a string path, or return the dictionary as is.

Parameters:

Name Type Description Default
token_info dict or str

Either a dictionary containing token information or a string path to a YAML/JSON file with token information.

required

Returns:

Type Description
dict

Parsed token information dictionary.

Raises:

Type Description
ValueError

If the file format is unsupported or the file cannot be parsed.

FileNotFoundError

If the specified file path does not exist.

Source code in pycentral/utils/base_utils.py
def load_token_info(token_info):
    """
    Load token information from a file if it's a string path, or return the dictionary as is.

    Args:
        token_info (dict or str): Either a dictionary containing token information or a
            string path to a YAML/JSON file with token information.

    Returns:
        (dict): Parsed token information dictionary.

    Raises:
        ValueError: If the file format is unsupported or the file cannot be parsed.
        FileNotFoundError: If the specified file path does not exist.
    """
    if isinstance(token_info, str):
        try:
            token_info = parse_input_file(token_info)
        except (ValueError, FileNotFoundError) as e:
            # Re-raise the exception with additional context
            raise type(e)(f"Failed to load token information: {e}") from e
    return token_info

build_url(base_url, path='', params='', query=None, fragment='')

Construct a complete URL based on multiple parts of the URL.

Parameters:

Name Type Description Default
base_url str

Base URL for an HTTP request.

required
path str

API endpoint path.

''
params str

API endpoint path parameters.

''
query dict

HTTP request URL query parameters.

None
fragment str

URL fragment identifier.

''

Returns:

Type Description
str

Parsed URL.

Source code in pycentral/utils/base_utils.py
def build_url(base_url, path="", params="", query=None, fragment=""):
    """
    Construct a complete URL based on multiple parts of the URL.

    Args:
        base_url (str): Base URL for an HTTP request.
        path (str, optional): API endpoint path.
        params (str, optional): API endpoint path parameters.
        query (dict, optional): HTTP request URL query parameters.
        fragment (str, optional): URL fragment identifier.

    Returns:
        (str): Parsed URL.
    """
    base_url = valid_url(base_url)
    parsed_baseurl = urlparse(base_url)
    scheme = parsed_baseurl.scheme
    netloc = parsed_baseurl.netloc

    query = query or {}
    query = urlencode(query)
    url = urlunparse((scheme, netloc, path, params, query, fragment))
    return url

console_logger(name, level='DEBUG')

Create an instance of python logging with a formatted output.

Sets the following format for log messages: <date> <time> - <name> - <level> - <message>

Parameters:

Name Type Description Default
name str

String displayed after date and time. Define it to identify from which part of the code the log message is generated.

required
level str

Logging level to display messages from a certain level.

'DEBUG'

Returns:

Type Description
Logger

An instance of the logging.Logger class.

Source code in pycentral/utils/base_utils.py
def console_logger(name, level="DEBUG"):
    """
    Create an instance of python logging with a formatted output.

    Sets the following format for log messages: `<date> <time> - <name> - <level> - <message>`

    Args:
        name (str): String displayed after date and time. Define it to identify
            from which part of the code the log message is generated.
        level (str, optional): Logging level to display messages from a certain level.

    Returns:
        (logging.Logger): An instance of the logging.Logger class.
    """
    channel_handler = logging.StreamHandler()
    log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    date_format = "%Y-%m-%d %H:%M:%S"
    if COLOR:
        cformat = "%(log_color)s" + log_format
        f = colorlog.ColoredFormatter(
            cformat,
            date_format,
            log_colors={
                "DEBUG": "bold_cyan",
                "INFO": "blue",
                "WARNING": "yellow",
                "ERROR": "red",
                "CRITICAL": "bold_red",
            },
        )
    else:
        f = logging.Formatter(log_format, date_format)
    channel_handler.setFormatter(f)

    logger = logging.getLogger(name)
    logger.setLevel(level)

    # Only add Handler if not already present
    if not logger.handlers:
        logger.addHandler(channel_handler)

    return logger

valid_url(url)

Verify and return the URL in a valid format.

If the URL is missing the https prefix, the function will prepend the prefix after verifying that it's a valid base URL of an Central cluster.

Parameters:

Name Type Description Default
url str

Base URL for an HTTP request.

required

Returns:

Type Description
str

Valid base URL.

Raises:

Type Description
ValueError

If the URL is invalid.

Source code in pycentral/utils/base_utils.py
def valid_url(url):
    """
    Verify and return the URL in a valid format.

    If the URL is missing the https prefix, the function will prepend the prefix
    after verifying that it's a valid base URL of an Central cluster.

    Args:
        url (str): Base URL for an HTTP request.

    Returns:
        (str): Valid base URL.

    Raises:
        ValueError: If the URL is invalid.
    """
    parsed_url = urlparse(url)
    if all([parsed_url.scheme, parsed_url.netloc]):
        return parsed_url.geturl()
    elif bool(parsed_url.scheme) is False and bool(parsed_url.path):
        parsed_url = parsed_url._replace(
            **{"scheme": "https", "netloc": parsed_url.path, "path": ""}
        )
        return parsed_url.geturl()
    else:
        raise ValueError(f"Invalid Base URL - {url}\n{URL_BASE_ERR_MESSAGE}")

save_access_token(app_name, access_token, token_file_path, logger)

Update the access token for a specific application in the credentials file.

Parameters:

Name Type Description Default
app_name str

Name of the application to update (e.g., "new_central", "glp").

required
access_token str

The new access token value.

required
token_file_path str

Path to the credentials file.

required
logger Logger

Logger instance to log messages.

required

Raises:

Type Description
FileNotFoundError

If the credentials file doesn't exist.

ValueError

If the app_name isn't found in the credentials file.

IOError

If there is an error writing to the credentials file.

Source code in pycentral/utils/base_utils.py
def save_access_token(app_name, access_token, token_file_path, logger):
    """
    Update the access token for a specific application in the credentials file.

    Args:
        app_name (str): Name of the application to update (e.g., "new_central", "glp").
        access_token (str): The new access token value.
        token_file_path (str): Path to the credentials file.
        logger (logging.Logger): Logger instance to log messages.

    Raises:
        FileNotFoundError: If the credentials file doesn't exist.
        ValueError: If the app_name isn't found in the credentials file.
        IOError: If there is an error writing to the credentials file.
    """
    if not os.path.isfile(token_file_path):
        raise FileNotFoundError(f"Credentials file not found: {token_file_path}")

    # Load credentials file using existing helper function
    file_data = parse_input_file(token_file_path)
    _, ext = os.path.splitext(token_file_path)
    is_json = ext.lower() == ".json"

    # Update the access token for the specified app
    if app_name not in file_data:
        raise ValueError(f"App '{app_name}' not found in credentials file")

    file_data[app_name]["access_token"] = access_token

    # Write updated data back to file
    try:
        with open(token_file_path, "w") as f:
            if is_json:
                json.dump(file_data, f, indent=4, sort_keys=False)
            else:
                yaml.dump(file_data, f, sort_keys=False)
            logger.info(
                f"Successfully saved {app_name}'s access token in {token_file_path}"
            )
    except OSError as e:
        raise OSError(f"Failed to write updated credentials to file: {e}") from e

Common Utils

common_utils

__setattrs__(self, config_attrs)

Dynamically set attributes of an object based on the provided dictionary.

Parameters:

Name Type Description Default
config_attrs dict

Dictionary whose keys will be added as attributes to the object with corresponding values.

required

Returns:

Type Description
dict

Dictionary of attribute names and their values.

Source code in pycentral/utils/common_utils.py
def __setattrs__(self, config_attrs):
    """Dynamically set attributes of an object based on the provided dictionary.

    Args:
        config_attrs (dict): Dictionary whose keys will be added as attributes
            to the object with corresponding values.

    Returns:
        (dict): Dictionary of attribute names and their values.
    """
    attr_data_dict = dict()
    for key, value in config_attrs.items():
        if hasattr(self, key):
            attr_data_dict[key] = getattr(self, key)
        else:
            attr_data_dict[key] = value

    return attr_data_dict

create_attrs(obj, data_dictionary)

Create class attributes from a dictionary.

Uses setattr() to set the value of attributes on the specified object. If an attribute already exists and its current value is not None, it keeps the previous value.

Parameters:

Name Type Description Default
obj object

Object instance to create/set attributes on.

required
data_dictionary dict

Dictionary containing keys that will become attributes.

required
Source code in pycentral/utils/common_utils.py
def create_attrs(obj, data_dictionary):
    """Create class attributes from a dictionary.

    Uses setattr() to set the value of attributes on the specified object.
    If an attribute already exists and its current value is not None,
    it keeps the previous value.

    Args:
        obj (object): Object instance to create/set attributes on.
        data_dictionary (dict): Dictionary containing keys that will become attributes.
    """
    # Used to create a deep copy of the dictionary
    dictionary_var = deepcopy(data_dictionary)

    # K is the argument and V is the value of the given argument
    for k, v in dictionary_var.items():
        # In case a key has '-' inside it's name.
        k = k.replace("-", "_")

        obj.__dict__[k] = v

parse_input_file(file_path)

Parse data from a YAML or JSON file.

Parameters:

Name Type Description Default
file_path str

Path to the file.

required

Returns:

Type Description
dict

Parsed data from the file.

Raises:

Type Description
FileNotFoundError

If the specified file does not exist.

ValueError

If the file format is unsupported or file cannot be loaded.

Source code in pycentral/utils/common_utils.py
def parse_input_file(file_path):
    """Parse data from a YAML or JSON file.

    Args:
        file_path (str): Path to the file.

    Returns:
        (dict): Parsed data from the file.

    Raises:
        FileNotFoundError: If the specified file does not exist.
        ValueError: If the file format is unsupported or file cannot be loaded.
    """
    if not os.path.isfile(file_path):
        raise FileNotFoundError(f"File not found: {file_path}")

    try:
        with open(file_path, "r") as file:
            if file_path.endswith(".yaml") or file_path.endswith(".yml"):
                return yaml.safe_load(file)
            elif file_path.endswith(".json"):
                return json.load(file)
            else:
                raise ValueError("Unsupported file format. Use YAML or JSON.")
    except Exception as e:
        raise ValueError(f"Failed to parse data from file: {e}")

Constants

constants

Constants used across the pycentral package.

This module contains constant values for API endpoints, cluster URLs, and configuration mappings used throughout the pycentral SDK.

Attributes:

Name Type Description
CLUSTER_BASE_URLS dict[str, str]

Public Central cluster names with their corresponding API Base URLs. You can update this dictionary to add your own private cluster details. You can learn more about Base URLs here.

SUPPORTED_CONFIG_PERSONAS dict[str, str]

Supported New Central Device Personas and their corresponding API values. You can learn more about Device Personas here.

AUTHENTICATION dict[str, str]

Authentication endpoints for OAuth flows for token creation.

GLP_URLS dict[str, str]

GreenLake Platform (GLP) API URLs and endpoints.

SCOPE_URLS dict[str, str]

Scope-related API URLs for site and device organization.


GLP Utils

glp_utils

rate_limit_check(input_array, input_size_limit, rate_per_minute)

Check and handle rate limiting for API requests.

Splits the input array into smaller chunks and calculates wait time to prevent rate limit errors.

Parameters:

Name Type Description Default
input_array list

Array of items to process.

required
input_size_limit int

Maximum size of each chunk.

required
rate_per_minute int

Maximum number of requests allowed per minute.

required

Returns:

Type Description
tuple

A tuple containing:

  • queue (list): List of sub-arrays split by input_size_limit.
  • wait_time (float): Seconds to wait between requests (0 if no wait needed).
Source code in pycentral/utils/glp_utils.py
def rate_limit_check(input_array, input_size_limit, rate_per_minute):
    """Check and handle rate limiting for API requests.

    Splits the input array into smaller chunks and calculates wait time
    to prevent rate limit errors.

    Args:
        input_array (list): Array of items to process.
        input_size_limit (int): Maximum size of each chunk.
        rate_per_minute (int): Maximum number of requests allowed per minute.

    Returns:
        (tuple): A tuple containing:

            - queue (list): List of sub-arrays split by input_size_limit.
            - wait_time (float): Seconds to wait between requests (0 if no wait needed).
    """
    print("Attempting to bypass rate limit")
    queue = []
    wait_time = []

    for i in range(0, len(input_array), input_size_limit):
        sub_array = input_array[i : i + input_size_limit]
        queue.append(sub_array)

    if len(queue) > rate_per_minute:
        wait_time = 60 / rate_per_minute
        print(
            "Array size exceeded,",
            wait_time,
            "second wait timer implemented per request to prevent errors",
        )
        print("Loading ...")
    else:
        wait_time = 0

    return queue, wait_time

check_progress(conn, id, module_instance, limit=None)

Check progress of an async GLP API operation.

Polls the status of an asynchronous operation until it completes, times out, or fails.

Parameters:

Name Type Description Default
conn NewCentralBase

PyCentral base connection object.

required
id str

Async transaction ID.

required
module_instance object

Instance of the module class (Devices or Subscriptions).

required
limit int

Rate limit for the module. If None, uses default based on module type (20 for Devices, 5 for Subscriptions).

None

Returns:

Type Description
tuple

A tuple containing:

  • success (bool): True if operation succeeded, False otherwise.
  • status (dict): API response with operation status details.

Raises:

Type Description
ValueError

If module_instance is not an instance of Devices or Subscriptions.

Source code in pycentral/utils/glp_utils.py
def check_progress(conn, id, module_instance, limit=None):
    """Check progress of an async GLP API operation.

    Polls the status of an asynchronous operation until it completes,
    times out, or fails.

    Args:
        conn (NewCentralBase): PyCentral base connection object.
        id (str): Async transaction ID.
        module_instance (object): Instance of the module class (Devices or Subscriptions).
        limit (int, optional): Rate limit for the module. If None, uses default
            based on module type (20 for Devices, 5 for Subscriptions).

    Returns:
        (tuple): A tuple containing:

            - success (bool): True if operation succeeded, False otherwise.
            - status (dict): API response with operation status details.

    Raises:
        ValueError: If module_instance is not an instance of Devices or Subscriptions.
    """
    if limit is None:
        if module_instance.__class__.__name__ == "Devices":
            limit = DEVICE_LIMIT
        elif module_instance.__class__.__name__ == "Subscriptions":
            limit = SUB_LIMIT
        else:
            raise ValueError(
                "module_instance must be an instance of Devices or Subscription"
            )

    updated = False
    while not updated:
        status = module_instance.get_status(conn, id)
        if status["code"] != 200:
            conn.logger.error(
                f"Bad request for get async status with transaction {id}!"
            )
            return (False, status)
        elif status["msg"]["status"] == "SUCCEEDED":
            updated = True
            return (True, status)
        elif status["msg"]["status"] == "TIMEOUT":
            updated = True
            conn.logger.error(
                f"Async operation timed out for transaction {id}!"
            )
            return (False, status)
        elif status["msg"]["status"] == "FAILED":
            updated = True
            conn.logger.error(f"Async operation failed for transaction {id}!")
            return (False, status)
        else:
            # Sleep time calculated by async rate limit.
            sleep_time = 60 / limit
            time.sleep(sleep_time)

Monitoring Utils

monitoring_utils

build_timestamp_filter(start_time=None, end_time=None, duration=None, fmt='rfc3339', max_period_days=DEFAULT_MAX_PERIOD_DAYS)

Build a formatted timestamp filter for API queries.

Returns timestamps for filtering based on the provided parameters.

Behavior
  • If start_time and end_time are given, parses and converts them to the requested format.
  • If duration is given, computes timestamps relative to now. - Max supported duration defaults to 90 days and can be overridden.

Parameters:

Name Type Description Default
start_time str

RFC3339 or Unix timestamp (ms or s) for start.

None
end_time str

RFC3339 or Unix timestamp (ms or s) for end.

None
duration str

Duration string like '3h', '2d', '1w', '1m' (hours, days, weeks, minutes).

None
fmt str

Output format, either 'rfc3339' or 'unix'.

'rfc3339'
max_period_days int | float

Maximum allowed duration, in days. Defaults to DEFAULT_MAX_PERIOD_DAYS.

DEFAULT_MAX_PERIOD_DAYS

Returns:

Type Description
tuple

A tuple of (start_time, end_time) formatted strings.

Raises:

Type Description
ValueError

If invalid parameter combinations are provided, the maximum duration is invalid, or duration exceeds the configured maximum.

Source code in pycentral/utils/monitoring_utils.py
def build_timestamp_filter(
    start_time=None,
    end_time=None,
    duration=None,
    fmt="rfc3339",
    max_period_days=DEFAULT_MAX_PERIOD_DAYS,
):
    """Build a formatted timestamp filter for API queries.

    Returns timestamps for filtering based on the provided parameters.

    Behavior:
        - If start_time and end_time are given, parses and converts them to the
          requested format.
        - If duration is given, computes timestamps relative to now.
                - Max supported duration defaults to 90 days and can be overridden.

    Args:
        start_time (str, optional): RFC3339 or Unix timestamp (ms or s) for start.
        end_time (str, optional): RFC3339 or Unix timestamp (ms or s) for end.
        duration (str, optional): Duration string like '3h', '2d', '1w', '1m'
            (hours, days, weeks, minutes).
        fmt (str, optional): Output format, either 'rfc3339' or 'unix'.
        max_period_days (int|float, optional): Maximum allowed duration, in days.
            Defaults to DEFAULT_MAX_PERIOD_DAYS.

    Returns:
        (tuple): A tuple of (start_time, end_time) formatted strings.

    Raises:
        ValueError: If invalid parameter combinations are provided, the maximum
            duration is invalid, or duration exceeds the configured maximum.
    """
    # --- Validation ---
    if (start_time or end_time) and duration:
        raise ValueError("Cannot specify start/end timestamps together with duration.")
    if (start_time and not end_time) or (end_time and not start_time):
        raise ValueError("Both start_time and end_time must be provided together.")
    if not duration and not (start_time and end_time):
        raise ValueError("Provide either both start_time and end_time or a duration.")
    if max_period_days is None or max_period_days <= 0:
        raise ValueError("max_period_days must be greater than 0.")

    # --- Case 1: Start + End ---
    if start_time and end_time:
        return (
            _format_timestamp(_parse_timestamp(start_time), fmt),
            _format_timestamp(_parse_timestamp(end_time), fmt),
        )

    # --- Case 2: Duration ---
    unit_map = {"w": "weeks", "d": "days", "h": "hours", "m": "minutes"}
    unit = duration[-1].lower()
    if unit not in unit_map:
        raise ValueError(
            "Duration must end with w, h, d, or m (weeks, hours, days, mins)."
        )

    delta = timedelta(**{unit_map[unit]: int(duration[:-1])})
    if delta > timedelta(days=max_period_days):
        raise ValueError(
            f"Maximum supported duration is {max_period_days} days."
        )

    now = datetime.now(timezone.utc)
    return _format_timestamp(now - delta, fmt), _format_timestamp(now, fmt)

generate_timestamp_str(start_time, end_time, duration, max_period_days=DEFAULT_MAX_PERIOD_DAYS)

Generate a timestamp filter string for API queries.

Parameters:

Name Type Description Default
start_time str

Start timestamp.

required
end_time str

End timestamp.

required
duration str

Duration string.

required
max_period_days int | float

Maximum allowed duration, in days.

DEFAULT_MAX_PERIOD_DAYS

Returns:

Type Description
str

Formatted filter string "timestamp gt and timestamp lt ".

Source code in pycentral/utils/monitoring_utils.py
def generate_timestamp_str(
    start_time, end_time, duration, max_period_days=DEFAULT_MAX_PERIOD_DAYS
):
    """Generate a timestamp filter string for API queries.

    Args:
        start_time (str): Start timestamp.
        end_time (str): End timestamp.
        duration (str): Duration string.
        max_period_days (int|float, optional): Maximum allowed duration, in days.

    Returns:
        (str): Formatted filter string "timestamp gt <start> and timestamp lt <end>".
    """
    start_time, end_time = build_timestamp_filter(
        start_time=start_time,
        end_time=end_time,
        duration=duration,
        max_period_days=max_period_days,
    )
    return f"timestamp gt {start_time} and timestamp lt {end_time}"

execute_get(central_conn, endpoint, params=None, version='latest')

Execute a GET request to the monitoring API.

Parameters:

Name Type Description Default
central_conn NewCentralBase

Central connection object.

required
endpoint str

API endpoint path.

required
params dict

Query parameters for the request.

None
version str

API version to use (e.g. "v1alpha1", "v1"). Defaults to "latest".

'latest'

Returns:

Type Description
dict

The message portion of the API response.

Raises:

Type Description
ParameterError

If central_conn is None or endpoint is invalid.

Exception

If the API call returns a non-200 status code.

Source code in pycentral/utils/monitoring_utils.py
def execute_get(central_conn, endpoint, params=None, version="latest"):
    """Execute a GET request to the monitoring API.

    Args:
        central_conn (NewCentralBase): Central connection object.
        endpoint (str): API endpoint path.
        params (dict, optional): Query parameters for the request.
        version (str, optional): API version to use (e.g. "v1alpha1", "v1"). Defaults to "latest".

    Returns:
        (dict): The message portion of the API response.

    Raises:
        ParameterError: If central_conn is None or endpoint is invalid.
        Exception: If the API call returns a non-200 status code.
    """
    if not central_conn:
        raise ParameterError("central_conn(Central connection) is required")

    if not isinstance(endpoint, str) or not endpoint:
        raise ParameterError("endpoint is required and must be a string")

    if endpoint.startswith("/"):
        # remove leading slash if present
        endpoint = endpoint.lstrip("/")

    path = generate_url(endpoint, "monitoring", version)
    resp = central_conn.command("GET", path, api_params=params or {})

    if resp["code"] != 200:
        raise Exception(
            f"Error retrieving data from {path}: {resp['code']} - {resp['msg']}"
        )
    return resp["msg"]

simplified_site_resp(site)

Simplify the site response structure for easier consumption.

Parameters:

Name Type Description Default
site dict

Raw site data from API response.

required

Returns:

Type Description
dict

Simplified site data with restructured health, devices, clients, and alerts.

Source code in pycentral/utils/monitoring_utils.py
def simplified_site_resp(site):
    """Simplify the site response structure for easier consumption.

    Args:
        site (dict): Raw site data from API response.

    Returns:
        (dict): Simplified site data with restructured health, devices, clients, and alerts.
    """
    site["health"] = _groups_to_dict(site.get("health", {}).get("groups", []))
    site["devices"] = {
        "count": site.get("devices", {}).get("count", 0),
        "health": _groups_to_dict(
            site.get("devices", {}).get("health", {}).get("groups", [])
        ),
    }
    site["clients"] = {
        "count": site.get("clients", {}).get("count", 0),
        "health": _groups_to_dict(
            site.get("clients", {}).get("health", {}).get("groups", [])
        ),
    }
    site["alerts"] = {
        "critical": site.get("alerts", {}).get("groups", [{}])[0].get("count", 0)
        if site.get("alerts", {}).get("groups")
        else 0,
        "total": site.get("alerts", {}).get("totalCount", 0),
    }
    site.pop("type", None)
    return site

clean_raw_trend_data(raw_results, data=None)

Clean and restructure raw trend data from API response.

Parameters:

Name Type Description Default
raw_results dict

Raw trend data containing 'graph' with 'keys' and 'samples'.

required
data dict

Existing data dictionary to append to.

None

Returns:

Type Description
dict

Dictionary with timestamps as keys and metric values as nested dictionaries.

Source code in pycentral/utils/monitoring_utils.py
def clean_raw_trend_data(raw_results, data=None):
    """Clean and restructure raw trend data from API response.

    Args:
        raw_results (dict): Raw trend data containing 'graph' with 'keys' and 'samples'.
        data (dict, optional): Existing data dictionary to append to.

    Returns:
        (dict): Dictionary with timestamps as keys and metric values as nested dictionaries.
    """
    if data is None:
        data = {}
    graph = raw_results.get("graph", {}) or {}
    keys = graph.get("keys", []) or []
    samples = graph.get("samples", []) or []

    for s in samples:
        ts = s.get("timestamp")
        if not ts:
            continue
        vals = s.get("data")
        if isinstance(vals, (list, tuple)):
            for k, v in zip(keys, vals):
                data.setdefault(ts, {})[k] = v
        else:
            target_key = keys[0] if keys else None
            if target_key:
                data.setdefault(ts, {})[target_key] = vals
            else:
                # fallback to a generic key if none provided
                data.setdefault(ts, {})["value"] = vals
    return data

merged_dict_to_sorted_list(merged)

Convert a merged dictionary to a sorted list of timestamped entries.

Parameters:

Name Type Description Default
merged dict

Dictionary with timestamps as keys.

required

Returns:

Type Description
list

Sorted list of dictionaries with 'timestamp' key and all associated values.

Source code in pycentral/utils/monitoring_utils.py
def merged_dict_to_sorted_list(merged):
    """Convert a merged dictionary to a sorted list of timestamped entries.

    Args:
        merged (dict): Dictionary with timestamps as keys.

    Returns:
        (list): Sorted list of dictionaries with 'timestamp' key and all associated values.
    """
    # try strict RFC3339 parsing (Z -> +00:00), fallback to lexicographic
    try:
        keys = sorted(
            merged.keys(),
            key=lambda t: datetime.fromisoformat(t.replace("Z", "+00:00")),
        )
    except Exception:
        keys = sorted(merged.keys())
    return [{"timestamp": ts, **merged[ts]} for ts in keys]

validate_central_conn_and_serial(central_conn, serial_number)

Validate central connection and device serial number.

Parameters:

Name Type Description Default
central_conn NewCentralBase

Central connection object (required).

required
serial_number str

Device serial number (required).

required

Raises:

Type Description
ParameterError

If central_conn is None or serial_number is missing/invalid.

Note

Internal SDK function

Source code in pycentral/utils/monitoring_utils.py
def validate_central_conn_and_serial(central_conn, serial_number):
    """
    Validate central connection and device serial number.

    Args:
        central_conn (NewCentralBase): Central connection object (required).
        serial_number (str): Device serial number (required).

    Raises:
        ParameterError: If central_conn is None or serial_number is missing/invalid.

    Note:
        Internal SDK function
    """
    if central_conn is None:
        raise ParameterError("central_conn is required")
    if not isinstance(serial_number, str) or not serial_number:
        raise ParameterError(
            "serial_number is required and must be a string"
        )

validate_query_length(name, value, max_length=MAX_QUERY_LEN)

Raise ParameterError if a query string parameter exceeds the allowed length.

Parameters:

Name Type Description Default
name str

Parameter name (used in error message).

required
value str | None

Parameter value to check.

required
max_length int

Maximum allowed character length. Defaults to MAX_QUERY_LEN.

MAX_QUERY_LEN

Raises:

Type Description
ParameterError

If value is not None and exceeds max_length.

Source code in pycentral/utils/monitoring_utils.py
def validate_query_length(name, value, max_length=MAX_QUERY_LEN):
    """Raise ParameterError if a query string parameter exceeds the allowed length.

    Args:
        name (str): Parameter name (used in error message).
        value (str|None): Parameter value to check.
        max_length (int, optional): Maximum allowed character length. Defaults to MAX_QUERY_LEN.

    Raises:
        ParameterError: If value is not None and exceeds max_length.
    """
    if value is not None and len(value) > max_length:
        raise ParameterError(f"{name} cannot exceed {max_length} characters")

validate_limit_and_next(limit, next_page, max_limit, next_name='next_page')

Raise ParameterError if pagination parameters are out of range.

Parameters:

Name Type Description Default
limit int

Requested page size.

required
next_page int

Page cursor/index (must be >= 1).

required
max_limit int

Maximum allowed value for limit.

required
next_name str

Name of the next_page parameter for error messages.

'next_page'

Raises:

Type Description
ParameterError

If limit exceeds max_limit or next_page is less than 1.

Source code in pycentral/utils/monitoring_utils.py
def validate_limit_and_next(limit, next_page, max_limit, next_name="next_page"):
    """Raise ParameterError if pagination parameters are out of range.

    Args:
        limit (int): Requested page size.
        next_page (int): Page cursor/index (must be >= 1).
        max_limit (int): Maximum allowed value for limit.
        next_name (str, optional): Name of the next_page parameter for error messages.

    Raises:
        ParameterError: If limit exceeds max_limit or next_page is less than 1.
    """
    if limit > max_limit:
        raise ParameterError(f"limit cannot exceed {max_limit}")
    if next_page < 1:
        raise ParameterError(f"{next_name} must be 1 or greater")

validate_required_value(name, value)

Raise ParameterError if a required parameter is missing or empty.

Parameters:

Name Type Description Default
name str

Parameter name (used in error message).

required
value Any

Parameter value to check.

required

Raises:

Type Description
ParameterError

If value is None or an empty string.

Source code in pycentral/utils/monitoring_utils.py
def validate_required_value(name, value):
    """Raise ParameterError if a required parameter is missing or empty.

    Args:
        name (str): Parameter name (used in error message).
        value (Any): Parameter value to check.

    Raises:
        ParameterError: If value is None or an empty string.
    """
    if value is None or value == "":
        raise ParameterError(f"{name} is required")

validate_serial_query(serial_number, max_len=MAX_SERIAL_LEN)

Raise ParameterError if an optional serial_number query parameter exceeds max_len.

Parameters:

Name Type Description Default
serial_number str | None

Serial number to validate. Skipped if None.

required
max_len int

Maximum allowed length. Defaults to MAX_SERIAL_LEN.

MAX_SERIAL_LEN

Raises:

Type Description
ParameterError

If serial_number is not None and exceeds max_len.

Source code in pycentral/utils/monitoring_utils.py
def validate_serial_query(serial_number, max_len=MAX_SERIAL_LEN):
    """Raise ParameterError if an optional serial_number query parameter exceeds max_len.

    Args:
        serial_number (str|None): Serial number to validate. Skipped if None.
        max_len (int, optional): Maximum allowed length. Defaults to MAX_SERIAL_LEN.

    Raises:
        ParameterError: If serial_number is not None and exceeds max_len.
    """
    if serial_number is not None and len(serial_number) > max_len:
        raise ParameterError(f"serial_number cannot exceed {max_len} characters")

normalize_metric(metric, allowed_metrics, name='metric')

Validate and normalize a metric string against a set of allowed values.

Parameters:

Name Type Description Default
metric str

Metric name provided by the caller.

required
allowed_metrics dict | set

Collection of allowed metric names (lowercase).

required
name str

Parameter name for error messages. Defaults to 'metric'.

'metric'

Returns:

Type Description
str

Normalized (stripped, lowercased) metric name.

Raises:

Type Description
ParameterError

If metric is not a non-empty string or is not in allowed_metrics.

Source code in pycentral/utils/monitoring_utils.py
def normalize_metric(metric, allowed_metrics, name="metric"):
    """Validate and normalize a metric string against a set of allowed values.

    Args:
        metric (str): Metric name provided by the caller.
        allowed_metrics (dict|set): Collection of allowed metric names (lowercase).
        name (str, optional): Parameter name for error messages. Defaults to 'metric'.

    Returns:
        (str): Normalized (stripped, lowercased) metric name.

    Raises:
        ParameterError: If metric is not a non-empty string or is not in allowed_metrics.
    """
    if not isinstance(metric, str) or not metric:
        raise ParameterError(f"{name} is required and must be a string")
    normalized = metric.strip().lower()
    if normalized not in allowed_metrics:
        supported = ", ".join(sorted(allowed_metrics))
        raise ParameterError(
            f"Unsupported {name} '{metric}'. Supported values: {supported}"
        )
    return normalized

build_trend_params(start_time=None, end_time=None, duration=None, site_id=None, extra_params=None)

Build a query-parameter dict for trend endpoints.

Validates site_id, merges extra_params, and generates a timestamp filter string when any time argument is provided.

Parameters:

Name Type Description Default
start_time int | str

Start time (epoch seconds or RFC3339).

None
end_time int | str

End time (epoch seconds or RFC3339).

None
duration str | int

Duration string (e.g. '3h') or seconds.

None
site_id str

Site identifier; validated against MAX_SITE_ID_LEN.

None
extra_params dict

Additional parameters to merge into the result.

None

Returns:

Type Description
dict | None

Parameter dict, or None if no parameters are set.

Raises:

Type Description
ParameterError

If site_id is too long or timestamp arguments are invalid.

Source code in pycentral/utils/monitoring_utils.py
def build_trend_params(
    start_time=None,
    end_time=None,
    duration=None,
    site_id=None,
    extra_params=None,
):
    """Build a query-parameter dict for trend endpoints.

    Validates site_id, merges extra_params, and generates a timestamp filter string
    when any time argument is provided.

    Args:
        start_time (int|str, optional): Start time (epoch seconds or RFC3339).
        end_time (int|str, optional): End time (epoch seconds or RFC3339).
        duration (str|int, optional): Duration string (e.g. '3h') or seconds.
        site_id (str, optional): Site identifier; validated against MAX_SITE_ID_LEN.
        extra_params (dict, optional): Additional parameters to merge into the result.

    Returns:
        (dict|None): Parameter dict, or None if no parameters are set.

    Raises:
        ParameterError: If site_id is too long or timestamp arguments are invalid.
    """
    params = {}
    if site_id is not None:
        validate_site_id(site_id)
        params["site-id"] = site_id
    if extra_params:
        params.update(extra_params)
    if start_time is None and end_time is None and duration is None:
        return params
    try:
        params["filter"] = generate_timestamp_str(
            start_time=start_time,
            end_time=end_time,
            duration=duration,
        )
    except ValueError as e:
        raise ParameterError(str(e)) from e
    return params

normalize_trend_response(response, return_raw_response=False)

Normalize a raw trend API response from any device type.

Dispatches to the appropriate cleaner based on the response shape: - Switch responses are wrapped in a "response" key and are handled by _clean_switch_trend_data. - AP/gateway responses use a "graph" key and are handled by clean_raw_trend_data. - Some gateway endpoints wrap the single trend object in a list; such single-element lists are unwrapped to a dict before processing.

If return_raw_response is True the response is returned as-is (after any single-element list unwrapping so callers always receive a dict).

Parameters:

Name Type Description Default
response dict | list

Raw API response.

required
return_raw_response bool

Return raw payload when True.

False

Returns:

Type Description
dict | list

Raw response or normalized sorted list of trend samples.

Source code in pycentral/utils/monitoring_utils.py
def normalize_trend_response(response, return_raw_response=False):
    """Normalize a raw trend API response from any device type.

    Dispatches to the appropriate cleaner based on the response shape:
    - Switch responses are wrapped in a ``"response"`` key and are handled
      by ``_clean_switch_trend_data``.
    - AP/gateway responses use a ``"graph"`` key and are handled by
      ``clean_raw_trend_data``.
    - Some gateway endpoints wrap the single trend object in a list; such
      single-element lists are unwrapped to a dict before processing.

    If return_raw_response is True the response is returned as-is (after any
    single-element list unwrapping so callers always receive a dict).

    Args:
        response (dict|list): Raw API response.
        return_raw_response (bool, optional): Return raw payload when True.

    Returns:
        (dict|list): Raw response or normalized sorted list of trend samples.
    """
    # Some gateway trend endpoints return their result wrapped in a list
    # (one item per metric series / sensor).  Unwrap single-element lists so
    # downstream logic treats them as a plain dict; aggregate multi-element
    # lists by merging all series into a single timeline.
    if isinstance(response, list):
        if len(response) == 1 and isinstance(response[0], dict):
            response = response[0]
        else:
            # Multi-series list (e.g. hardware-temperature with several sensors).
            if return_raw_response:
                return response
            data = {}
            for item in response:
                if isinstance(item, dict):
                    data = clean_raw_trend_data(item, data=data)
            return merged_dict_to_sorted_list(data)

    if return_raw_response:
        return response
    if not isinstance(response, dict):
        return response
    if "response" in response:
        return _clean_switch_trend_data(response)
    data = clean_raw_trend_data(response)
    return merged_dict_to_sorted_list(data)

execute_trend_request(central_conn, base_path, serial_number, metric, metric_map, resource_path=None, start_time=None, end_time=None, duration=None, site_id=None, extra_params=None, return_raw_response=False)

Execute a trend API request for a monitoring resource.

Parameters:

Name Type Description Default
central_conn NewCentralBase

Central connection object.

required
base_path str

Base endpoint path (e.g. 'aps', 'gateways').

required
serial_number str

Device serial number.

required
metric str

Metric name to retrieve.

required
metric_map dict

Mapping of metric names to endpoint path segments.

required
resource_path str

Sub-resource path segment (e.g. 'ports/1').

None
start_time int | str

Start time for range queries.

None
end_time int | str

End time for range queries.

None
duration str

Duration string (e.g. '3h').

None
site_id str

Site identifier.

None
extra_params dict

Additional query parameters.

None
return_raw_response bool

Return raw API payload when True.

False

Returns:

Type Description
dict | list

Raw response or normalized trend samples.

Source code in pycentral/utils/monitoring_utils.py
def execute_trend_request(
    central_conn,
    base_path,
    serial_number,
    metric,
    metric_map,
    resource_path=None,
    start_time=None,
    end_time=None,
    duration=None,
    site_id=None,
    extra_params=None,
    return_raw_response=False,
):
    """Execute a trend API request for a monitoring resource.

    Args:
        central_conn (NewCentralBase): Central connection object.
        base_path (str): Base endpoint path (e.g. 'aps', 'gateways').
        serial_number (str): Device serial number.
        metric (str): Metric name to retrieve.
        metric_map (dict): Mapping of metric names to endpoint path segments.
        resource_path (str, optional): Sub-resource path segment (e.g. 'ports/1').
        start_time (int|str, optional): Start time for range queries.
        end_time (int|str, optional): End time for range queries.
        duration (str, optional): Duration string (e.g. '3h').
        site_id (str, optional): Site identifier.
        extra_params (dict, optional): Additional query parameters.
        return_raw_response (bool, optional): Return raw API payload when True.

    Returns:
        (dict|list): Raw response or normalized trend samples.
    """
    validate_central_conn_and_serial(central_conn, serial_number)
    metric = normalize_metric(metric, metric_map)
    params = build_trend_params(
        start_time=start_time,
        end_time=end_time,
        duration=duration,
        site_id=site_id,
        extra_params=extra_params,
    )
    path = f"{base_path}/{serial_number}"
    if resource_path:
        path = f"{path}/{resource_path}"
    path = f"{path}/{metric_map[metric]}"
    response = execute_get(central_conn, endpoint=path, params=params)
    return normalize_trend_response(response, return_raw_response)

get_all_pages(method, limit, next_arg_name='next_page', **kwargs)

Fetch all pages from a paginated monitoring API method.

Calls method repeatedly, passing the current page cursor via the keyword argument named next_arg_name, until all items have been retrieved.

Parameters:

Name Type Description Default
method callable

Single-page fetch method. Must accept limit and the keyword named by next_arg_name, plus any additional **kwargs.

required
limit int

Page size to request on each call.

required
next_arg_name str

Name of the pagination cursor parameter. Defaults to 'next_page'.

'next_page'
**kwargs Any

Additional keyword arguments forwarded to method on every call.

{}

Returns:

Type Description
list[dict]

Aggregated list of all items across all pages.

Source code in pycentral/utils/monitoring_utils.py
def get_all_pages(method, limit, next_arg_name="next_page", **kwargs):
    """Fetch all pages from a paginated monitoring API method.

    Calls *method* repeatedly, passing the current page cursor via the keyword
    argument named *next_arg_name*, until all items have been retrieved.

    Args:
        method (callable): Single-page fetch method.  Must accept ``limit`` and
            the keyword named by *next_arg_name*, plus any additional **kwargs.
        limit (int): Page size to request on each call.
        next_arg_name (str, optional): Name of the pagination cursor parameter.
            Defaults to 'next_page'.
        **kwargs (Any): Additional keyword arguments forwarded to *method* on every call.

    Returns:
        (list[dict]): Aggregated list of all items across all pages.
    """
    items = []
    total = None
    next_page = 1

    while True:
        response = method(limit=limit, **{next_arg_name: next_page}, **kwargs)
        if total is None:
            total = response.get("total", 0)

        items.extend(response.get("items", []))
        if len(items) >= total:
            break

        next_value = response.get("next")
        if next_value is None:
            break
        next_page = int(next_value)

    return items

Profile Utils

profile_utils

validate_local(local)

Validate local profile attributes and prepare them for API requests.

Parameters:

Name Type Description Default
local dict or None

Local profile attributes dictionary containing scope_id (int) and persona (str).

required

Returns:

Type Description
dict

Validated local attributes dictionary with object_type set to "LOCAL".

Raises:

Type Description
ParameterError

If local is not a dictionary or missing required keys with correct types.

Source code in pycentral/utils/profile_utils.py
def validate_local(local):
    """Validate local profile attributes and prepare them for API requests.

    Args:
        local (dict or None): Local profile attributes dictionary containing
            scope_id (int) and persona (str).

    Returns:
        (dict): Validated local attributes dictionary with object_type set to "LOCAL".

    Raises:
        ParameterError: If local is not a dictionary or missing required keys
            with correct types.
    """
    required_keys = {"scope_id": int, "persona": str}
    local_attributes = dict()
    if local:
        if not isinstance(local, dict):
            raise ParameterError(
                "Invalid local profile attributes. Please provide a valid dictionary."
            )
        for key, expected_type in required_keys.items():
            if key not in local or not isinstance(local[key], expected_type):
                raise ParameterError(
                    f"Invalid local profile attributes. Key '{key}' must be of type {expected_type.__name__}."
                )
        local_attributes = {"object_type": "LOCAL"}
        local_attributes.update(local)
    return local_attributes

Scope Utils

scope_utils

fetch_attribute(obj, attribute)

Fetch the value associated with the provided attribute in the object.

Parameters:

Name Type Description Default
obj object

Object whose attribute has to be returned.

required
attribute str

Attribute within the object that has to be returned.

required

Returns:

Type Description
any

Value of the required attribute if it exists, None otherwise.

Source code in pycentral/utils/scope_utils.py
def fetch_attribute(obj, attribute):
    """Fetch the value associated with the provided attribute in the object.

    Args:
        obj (object): Object whose attribute has to be returned.
        attribute (str): Attribute within the object that has to be returned.

    Returns:
        (any): Value of the required attribute if it exists, None otherwise.
    """
    if hasattr(obj, attribute):
        return getattr(obj, attribute)
    return None

update_attribute(obj, attribute, new_value)

Update the value of the provided attribute in the object.

Parameters:

Name Type Description Default
obj object

Object whose attribute has to be updated.

required
attribute str

Attribute within the object that has to be updated.

required
new_value any

New value to set for the attribute.

required

Returns:

Type Description
bool

True if the attribute was successfully updated, False otherwise.

Source code in pycentral/utils/scope_utils.py
def update_attribute(obj, attribute, new_value):
    """Update the value of the provided attribute in the object.

    Args:
        obj (object): Object whose attribute has to be updated.
        attribute (str): Attribute within the object that has to be updated.
        new_value (any): New value to set for the attribute.

    Returns:
        (bool): True if the attribute was successfully updated, False otherwise.
    """
    if hasattr(obj, attribute):
        setattr(obj, attribute, new_value)
        return True
    return False

get_attributes(obj)

Return all attributes of the provided object.

Parameters:

Name Type Description Default
obj object

Object whose attributes have to be returned.

required

Returns:

Type Description
dict

Dictionary of attributes defined in the object.

Source code in pycentral/utils/scope_utils.py
def get_attributes(obj):
    """Return all attributes of the provided object.

    Args:
        obj (object): Object whose attributes have to be returned.

    Returns:
        (dict): Dictionary of attributes defined in the object.
    """
    return {k: v for k, v in obj.__dict__.items() if not callable(v)}

get_all_scope_elements(obj, scope)

Make GET API calls to Central to get all elements of the specified scope.

This method is supported for site, site collection, and device groups scopes.

Parameters:

Name Type Description Default
obj object

Class instance that will be used to make API calls to Central.

required
scope str

The type of the element. Valid values: "site", "site_collection", "device_group".

required

Returns:

Type Description
list or None

List of all scope elements, or None if there are errors.

Source code in pycentral/utils/scope_utils.py
def get_all_scope_elements(obj, scope):
    """Make GET API calls to Central to get all elements of the specified scope.

    This method is supported for site, site collection, and device groups scopes.

    Args:
        obj (object): Class instance that will be used to make API calls to Central.
        scope (str): The type of the element. Valid values: "site", "site_collection",
            "device_group".

    Returns:
        (list or None): List of all scope elements, or None if there are errors.
    """
    if scope not in SUPPORTED_SCOPES:
        obj.central_conn.logger.error(
            "Unknown scope provided. Please provide one of the supported scopes - "
            ", ".join(SUPPORTED_SCOPES)
        )
        return None
    limit = DEFAULT_LIMIT
    offset = 0
    scope_elements = []
    number_of_scope_elements = None
    while (
        number_of_scope_elements is None
        or len(scope_elements) < number_of_scope_elements
    ):
        resp = get_scope_elements(obj, scope, limit=limit, offset=offset)
        if resp["code"] == 200:
            offset += limit
            resp_message = resp["msg"]
            if number_of_scope_elements is None:
                number_of_scope_elements = resp_message["total"]

            scope_elements.extend(
                [scope_element for scope_element in resp_message["items"]]
            )
        else:
            obj.central_conn.logger.error(resp["msg"])
            break
    obj.central_conn.logger.info(
        f"Total {scope}s fetched from account: {len(scope_elements)}"
    )
    return scope_elements

get_scope_elements(obj, scope, limit=50, offset=0, filter_field='', sort='')

Make GET API calls to Central to get scope elements based on provided attributes.

This method is supported for site, site collection, and device groups scopes.

Parameters:

Name Type Description Default
obj object

Class instance that will be used to make API calls to Central.

required
scope str

The type of the element. Valid values: "site", "site_collection", "device_group".

required
limit int

Number of scope elements to be fetched.

50
offset int

Pagination start index.

0
filter_field str

Field for sorting. For sites: scopeName, address, state, country, city, deviceCount, collectionName, zipcode, timezone. For site_collection: scopeName, description, deviceCount, siteCount.

''
sort str

Direction of sorting. Accepted values: ASC or DESC.

''

Returns:

Type Description
dict or None

API response with scope elements, or None if there are errors.

Source code in pycentral/utils/scope_utils.py
def get_scope_elements(
    obj, scope, limit=50, offset=0, filter_field="", sort=""
):
    """Make GET API calls to Central to get scope elements based on provided attributes.

    This method is supported for site, site collection, and device groups scopes.

    Args:
        obj (object): Class instance that will be used to make API calls to Central.
        scope (str): The type of the element. Valid values: "site", "site_collection",
            "device_group".
        limit (int, optional): Number of scope elements to be fetched.
        offset (int, optional): Pagination start index.
        filter_field (str, optional): Field for sorting. For sites: scopeName, address,
            state, country, city, deviceCount, collectionName, zipcode, timezone.
            For site_collection: scopeName, description, deviceCount, siteCount.
        sort (str, optional): Direction of sorting. Accepted values: ASC or DESC.

    Returns:
        (dict or None): API response with scope elements, or None if there are errors.
    """
    if scope not in SUPPORTED_SCOPES:
        obj.central_conn.logger.error(
            "Unknown scope provided. Please provide one of the supported scopes - "
            ", ".join(SUPPORTED_SCOPES)
        )
        return None

    api_path = generate_url(SCOPE_URLS[scope.upper()])
    api_method = "GET"
    api_params = {"limit": limit, "offset": offset}

    if filter_field:
        api_params["filter"] = filter_field
    if sort:
        api_params["sort"] = sort

    resp = obj.central_conn.command(
        api_method=api_method, api_path=api_path, api_params=api_params
    )
    return resp

set_attributes(obj, attributes_dict, required_attributes, optional_attributes=None, object_attributes=None)

Set attributes of the given object based on the attributes dictionary.

Parameters:

Name Type Description Default
obj object

Class instance whose attributes will be set.

required
attributes_dict dict

Dictionary of attributes to set on the object.

required
required_attributes list

List of required attribute names.

required
optional_attributes dict

Dictionary of optional attributes with their default values.

None
object_attributes dict

Dictionary of object-type attributes with their default values.

None
Source code in pycentral/utils/scope_utils.py
def set_attributes(
    obj,
    attributes_dict,
    required_attributes,
    optional_attributes=None,
    object_attributes=None,
):
    """Set attributes of the given object based on the attributes dictionary.

    Args:
        obj (object): Class instance whose attributes will be set.
        attributes_dict (dict): Dictionary of attributes to set on the object.
        required_attributes (list): List of required attribute names.
        optional_attributes (dict, optional): Dictionary of optional attributes with
            their default values.
        object_attributes (dict, optional): Dictionary of object-type attributes with
            their default values.
    """
    for attr in required_attributes:
        value = attributes_dict[attr]
        setattr(obj, attr, value)

    if optional_attributes:
        for attr, default_value in optional_attributes.items():
            value = attributes_dict.get(attr)
            if not value:
                if isinstance(default_value, list):
                    value = copy.deepcopy(default_value)
                else:
                    value = default_value
            setattr(obj, attr, value)
    if object_attributes:
        for attr, default_value in object_attributes.items():
            if attr in attributes_dict:
                setattr(obj, attr, attributes_dict[attr])
            else:
                setattr(obj, attr, default_value)

get_scope_element(obj, scope, scope_id=None)

Make GET API calls to Central to find the specified scope element.

This method is supported for site, site collection, and device groups scopes.

Parameters:

Name Type Description Default
obj object

Class instance that will be used to make API calls to Central.

required
scope str

The type of the element. Valid values: "site", "site_collection", "device_group".

required
scope_id int

ID of the scope element to be returned.

None

Returns:

Type Description
dict or None

Attributes of the scope element if found, None otherwise.

Source code in pycentral/utils/scope_utils.py
def get_scope_element(obj, scope, scope_id=None):
    """Make GET API calls to Central to find the specified scope element.

    This method is supported for site, site collection, and device groups scopes.

    Args:
        obj (object): Class instance that will be used to make API calls to Central.
        scope (str): The type of the element. Valid values: "site", "site_collection",
            "device_group".
        scope_id (int, optional): ID of the scope element to be returned.

    Returns:
        (dict or None): Attributes of the scope element if found, None otherwise.
    """
    if scope not in SUPPORTED_SCOPES:
        obj.central_conn.logger.error(
            f"Unsupported scope '{scope}'. Supported scopes are: {', '.join(SUPPORTED_SCOPES)}"
        )
        return None
    if scope_id is None:
        obj.central_conn.logger.error("Scope ID must be provided.")
        return None

    scope_elements_list = get_all_scope_elements(obj=obj, scope=scope)
    if not scope_elements_list:
        return None

    for element in scope_elements_list:
        if element.get("scopeId") == str(scope_id):
            return element

    return None

rename_keys(api_dict, api_attribute_mapping)

Rename the keys of attributes from the API response.

Parameters:

Name Type Description Default
api_dict dict

Dictionary of information from Central API Response.

required
api_attribute_mapping dict

Dictionary mapping API keys to object attributes.

required

Returns:

Type Description
dict

Renamed dictionary with keys mapped to object attributes.

Raises:

Type Description
ValueError

If an unknown attribute is found in the API response.

Source code in pycentral/utils/scope_utils.py
def rename_keys(api_dict, api_attribute_mapping):
    """Rename the keys of attributes from the API response.

    Args:
        api_dict (dict): Dictionary of information from Central API Response.
        api_attribute_mapping (dict): Dictionary mapping API keys to object attributes.

    Returns:
        (dict): Renamed dictionary with keys mapped to object attributes.

    Raises:
        ValueError: If an unknown attribute is found in the API response.
    """
    api_dict = copy.deepcopy(api_dict)

    extra_keys = ["type", "scopeId"]
    for key in extra_keys:
        if key in api_dict:
            del api_dict[key]
    integer_attributes = ["id", "collectionId", "deviceCount"]
    renamed_dict = {}
    for key, value in api_dict.items():
        new_key = api_attribute_mapping.get(key)
        if new_key:
            if key in integer_attributes and value:
                value = int(value)
            elif (
                key == "timezone"
                and isinstance(value, dict)
                and "timezoneId" in value
            ):
                value = value["timezoneId"]
            renamed_dict[new_key] = value
        else:
            raise ValueError(f"Unknown attribute {key} found in API response")
    return renamed_dict

validate_find_scope_elements(ids=None, names=None, serials=None, scope='')

Validate the input parameters for finding scope elements.

Parameters:

Name Type Description Default
ids str or list

ID(s) of the element(s).

None
names str or list

Name(s) of the element(s).

None
serials str or list

Serial number(s) of the element(s) (only for devices).

None
scope str

Specific scope to search in (e.g., "site", "device").

''

Raises:

Type Description
ValueError

If validation fails due to multiple parameters or invalid scope for serials.

Source code in pycentral/utils/scope_utils.py
def validate_find_scope_elements(ids=None, names=None, serials=None, scope=""):
    """Validate the input parameters for finding scope elements.

    Args:
        ids (str or list, optional): ID(s) of the element(s).
        names (str or list, optional): Name(s) of the element(s).
        serials (str or list, optional): Serial number(s) of the element(s) (only for devices).
        scope (str, optional): Specific scope to search in (e.g., "site", "device").

    Raises:
        ValueError: If validation fails due to multiple parameters or invalid scope for serials.
    """
    # Ensure only one of ids, names, or serials is provided
    provided_params = [ids, names, serials]
    if sum(param is not None for param in provided_params) > 1:
        raise ValueError("Provide only one of 'ids', 'names', or 'serials'.")

    # If serials are provided, ensure the scope is "device" or no scope is provided
    if serials and scope and scope.lower() != "device":
        raise ValueError(
            "Serials can only be used with the 'device' scope or when no scope is provided."
        )

validate_iso_location(state=None, country=None, city=None)

Validate the input parameters for ISO 3166-1 short name format for location.

Parameters:

Name Type Description Default
state str

State name, i.e., "California".

None
country str

Country name, i.e., "United States".

None
city str

City name, i.e., "San Francisco".

None

Raises:

Type Description
ValueError

If validation fails invalid values.

Source code in pycentral/utils/scope_utils.py
def validate_iso_location(state=None, country=None, city=None):
    """Validate the input parameters for ISO 3166-1 short name format for location.

    Args:
        state (str, optional): State name, i.e., "California".
        country (str, optional): Country name, i.e., "United States".
        city (str, optional): City name, i.e., "San Francisco".

    Raises:
        ValueError: If validation fails invalid values.
    """
    # Ensure that at least one of state, country, or city is provided and they're all strings if provided
    if not any([state, country, city]) or not all(
        isinstance(param, str)
        for param in [state, country, city]
        if param is not None
    ):
        raise ValueError(
            "At least one of 'state', 'country', or 'city' must be provided and all must be strings if provided"
        )

    # Validate Country using pycountry
    if country:
        try:
            pycountry.countries.lookup(country)
        except LookupError:
            raise ValueError(
                f"Invalid country name: {country} Must be in ISO 3166 short name format, e.g., 'United States'"
            )
    # Validate State using pycountry
    if state:
        try:
            pycountry.subdivisions.lookup(state)
        except LookupError:
            raise ValueError(
                f"Invalid state name: {state} Must be in ISO 3166 short name format, e.g., 'California'"
            )

    # Validate City as a non-empty string if provided
    if city is not None and not city.strip():
        raise ValueError("City name must be a non-empty string.")

lookup_in_map(keys, lookup_map)

Perform lookup in a map for the given key(s).

Parameters:

Name Type Description Default
keys str, int, or list

Key(s) to look up.

required
lookup_map dict

Map to search in.

required

Returns:

Type Description
any or list or None

Found value(s) or None if not found.

Source code in pycentral/utils/scope_utils.py
def lookup_in_map(keys, lookup_map):
    """Perform lookup in a map for the given key(s).

    Args:
        keys (str, int, or list): Key(s) to look up.
        lookup_map (dict): Map to search in.

    Returns:
        (any or list or None): Found value(s) or None if not found.
    """
    if isinstance(keys, (str, int)):
        return lookup_map.get(keys)
    return [lookup_map.get(key) for key in keys if key in lookup_map]

Troubleshooting Utils

troubleshooting_utils

Utilities for troubleshooting operations

This module provides constants related to supported device types and mappings for troubleshooting methods to supported devices.

Attributes:

Name Type Description
SUPPORTED_DEVICE_TYPES list[str]

List of supported device types for troubleshooting operations.

TROUBLESHOOTING_METHOD_DEVICE_MAPPING dict[str, list[str]]

Mapping of troubleshooting method names to lists of compatible device types. Each key represents a supported troubleshooting test, and the value is a list of device types that support it.


URL utils

url_utils

get_prefix(category='configuration', version='latest')

Generate URL prefix for a given category and version.

Parameters:

Name Type Description Default
category str

API category name.

'configuration'
version str

API version.

'latest'

Returns:

Type Description
str

URL prefix in the format "category_value/version/".

Raises:

Type Description
ValueError

If category is not supported or version is invalid.

Source code in pycentral/utils/url_utils.py
def get_prefix(category="configuration", version="latest"):
    """Generate URL prefix for a given category and version.

    Args:
        category (str, optional): API category name.
        version (str, optional): API version.

    Returns:
        (str): URL prefix in the format "category_value/version/".

    Raises:
        ValueError: If category is not supported or version is invalid.
    """
    if category not in CATEGORIES:
        raise ValueError(
            f"Invalid category: {category}, Supported categories: {list(CATEGORIES.keys())}"
        )
    category_value = CATEGORIES[category]["value"]
    if version == "latest":
        version = CATEGORIES[category]["latest"]
    else:
        if version not in versions:
            raise ValueError(
                f"Invalid version: {version}. Allowed versions: {versions}"
            )
    return f"{category_value}/{version}/"

generate_url(api_endpoint, category='configuration', version='latest')

Generate complete API URL for a given endpoint, category, and version.

Parameters:

Name Type Description Default
api_endpoint str

The API endpoint path to append to the URL.

required
category str

API category name.

'configuration'
version str

API version.

'latest'

Returns:

Type Description
str

Complete API URL in the format "category[value]/version/api_endpoint".

Raises:

Type Description
ValueError

If category is not supported or version is invalid.

TypeError

If api_endpoint is not a string.

Source code in pycentral/utils/url_utils.py
def generate_url(api_endpoint, category="configuration", version="latest"):
    """Generate complete API URL for a given endpoint, category, and version.

    Args:
        api_endpoint (str): The API endpoint path to append to the URL.
        category (str, optional): API category name.
        version (str, optional): API version.

    Returns:
        (str): Complete API URL in the format "category[value]/version/api_endpoint".

    Raises:
        ValueError: If category is not supported or version is invalid.
        TypeError: If api_endpoint is not a string.
    """
    if category not in CATEGORIES:
        raise ValueError(
            f"Invalid category: {category}, Supported categories: {list(CATEGORIES.keys())}"
        )
    if api_endpoint is not None and not isinstance(api_endpoint, str):
        raise TypeError(
            f"Invalid type: {type(api_endpoint)} for api_endpoint, expected str"
        )
    category_value = CATEGORIES[category]["value"]
    if version == "latest":
        version = CATEGORIES[category]["latest"]
    else:
        if version not in versions:
            raise ValueError(
                f"Invalid version: {version}. Allowed versions: {versions}"
            )
    return f"{category_value}/{version}/{api_endpoint}"