Skip to content

Streaming

Streaming Module

streaming

Streaming(central_conn, event, reconnect_delay=5, max_retries=None, filters=None)

Minimal WebSocket streaming client for Central.

Responsibilities
  • Build the WSS URL for the selected streaming endpoint.
  • Maintain a single WebSocket connection with optional auto-reconnect.
  • Decode protobuf payloads and deliver them to a user callback.
  • Allow graceful stop and cleanup of the WebSocket connection.

Parameters:

Name Type Description Default
central_conn NewCentralBase

Central connection object, used for tokens, base URL and logging.

required
event str

Streaming event name. Must be one of the keys in SUPPORTED_EVENTS (for example, "audit-trail-events").

required
reconnect_delay int

Delay in seconds before attempting to reconnect after an unexpected disconnection. Defaults to 5.

5
max_retries int | None

Maximum number of reconnection attempts after an unexpected disconnection. None (default) means retry indefinitely.

None
filters str | list[str]

Either a single filter string or a list of filter strings. If a list is provided, its elements will be joined with commas to form the header value.

None

Raises:

Type Description
ValueError

If an unsupported event is provided or filters are of an unexpected type.

Source code in pycentral/streaming/streaming.py
def __init__(
    self,
    central_conn,
    event,
    reconnect_delay=5,
    max_retries=None,
    filters=None,
):
    self.central_conn = central_conn
    # cache the commonly used app route and token key to simplify lookups
    self.app_route = self.central_conn._app_routes["new_central"]
    self.token_key = self.app_route["token_key"]
    if event not in SUPPORTED_EVENTS:
        raise ValueError(
            f"Unsupported event: {event}. Supported events: {list(SUPPORTED_EVENTS.keys())}"
        )
    self.endpoint = event
    self.decoder = SUPPORTED_EVENTS[event]
    self.reconnect_delay = reconnect_delay
    self.max_retries = max_retries
    self.logger = central_conn.logger
    self.ws = None
    self.user_callback = None

    self.stop_event = threading.Event()  # Thread-safe stop flag
    self._original_sigint = None

    self.filters = self._normalize_filters(filters)

stream(callback=None)

Start streaming messages for the configured event.

This method establishes the WebSocket connection, listens for messages, and optionally auto-reconnects on unexpected closure until stop() is called or a fatal error occurs.

Parameters:

Name Type Description Default
callback callable

Function to be invoked for each decoded message. It must accept a single argument (dict) representing the decoded protobuf message. If not provided, decoded messages are logged.

None

Raises:

Type Description
KeyboardInterrupt

If interrupted by the user (Ctrl+C) while streaming in a foreground loop.

Source code in pycentral/streaming/streaming.py
def stream(self, callback=None):
    """Start streaming messages for the configured event.

    This method establishes the WebSocket connection, listens for
    messages, and optionally auto-reconnects on unexpected closure
    until `stop()` is called or a fatal error occurs.

    Args:
        callback (callable, optional): Function to be invoked for each
            decoded message. It must accept a single argument
            (dict) representing the decoded protobuf message.
            If not provided, decoded messages are logged.

    Raises:
        KeyboardInterrupt: If interrupted by the user (Ctrl+C) while
            streaming in a foreground loop.
    """
    self.user_callback = callback
    self.stop_event.clear()

    self._setup_signal_handler()

    retry_count = 0
    try:
        while not self.stop_event.is_set():
            try:
                url = self._get_wss_url()
                self.ws = websocket.WebSocketApp(
                    url,
                    header=self._build_headers(),
                    on_open=self._on_open,
                    on_close=self._on_close,
                    on_error=self._on_error,
                    on_message=self._on_message,
                )

                self.logger.info(f"Connecting to {url.split('?')[0]}...")
                self.ws.run_forever(
                    sslopt={"cert_reqs": ssl.CERT_NONE},
                    ping_interval=_PING_INTERVAL,
                    ping_timeout=_PING_TIMEOUT,
                )

                if self.stop_event.is_set():
                    break

                retry_count += 1
                if self.max_retries is not None and retry_count >= self.max_retries:
                    self.logger.error(
                        f"Max retries ({self.max_retries}) reached. Stopping."
                    )
                    self.stop_event.set()
                    break

                self.logger.info(
                    f"Connection closed. Reconnecting in {self.reconnect_delay}s… "
                    f"(attempt {retry_count}"
                    + (f"/{self.max_retries}" if self.max_retries is not None else "")
                    + ")"
                )
                if self.stop_event.wait(timeout=self.reconnect_delay):
                    break

            except Exception as e:
                self.logger.error(
                    f"Unexpected error in streaming loop: {e}"
                )
                if self.ws:
                    self.ws.close()
                retry_count += 1
                if self.max_retries is not None and retry_count >= self.max_retries:
                    self.logger.error(
                        f"Max retries ({self.max_retries}) reached. Stopping."
                    )
                    self.stop_event.set()
                    break
                if self.stop_event.wait(timeout=self.reconnect_delay):
                    break
    finally:
        self._restore_signal_handler()
        self._cleanup()

stop()

Request the streaming loop to stop and close the WebSocket.

This method can be called from another thread or from within the user callback to stop streaming gracefully.

Source code in pycentral/streaming/streaming.py
def stop(self):
    """Request the streaming loop to stop and close the WebSocket.

    This method can be called from another thread or from within
    the user callback to stop streaming gracefully.
    """
    self.logger.info("Stop requested.")
    self.stop_event.set()
    self._cleanup()