yellowdog_ray.raydog Autoscaler

RayDog Autoscaler: Ray cluster creation and autoscaling using YellowDog.

class yellowdog_ray.raydog.autoscaler.AutoRayDog(provider_config: dict[str, Any], cluster_name: str, tag_store: TagStore)[source]

Bases: object

Connect to YellowDog and use it to set up Ray clusters.

create_head_node_task(flavour: str, ray_start_script: str, capture_taskoutput: bool = False) str[source]

Create the head node task for the cluster.

create_worker_node_tasks(flavour: str, ray_start_script: str, count: int, capture_taskoutput: bool = False) list[str][source]

Create the worker node tasks for a given worker pool.

create_worker_pool(flavour: str, node_type: str, node_config: dict[str, Any], count: int, userdata: str, metrics_enabled: bool = False) None[source]

Create a new worker pool for the given type of node.

find_raydog_cluster() bool[source]

Try to find an existing RayDog cluster in YellowDog. Return True if the cluster was found, False otherwise.

get_ip_addresses(node_id: str) tuple[str, str][source]

Extract the public and private IP addresses for the node.

has_worker_pool(flavour: str) bool[source]

Is there an existing worker pool for this type of node?

shut_down() None[source]

Shut down the Ray cluster.

class yellowdog_ray.raydog.autoscaler.RayDogNodeProvider(provider_config: dict[str, Any], cluster_name: str)[source]

Bases: NodeProvider

The RayDog implementation of a Ray autoscaling provider.

static bootstrap_config(cluster_config: dict[str, Any]) dict[str, Any][source]

Bootstraps the cluster config by adding/updating relevant properties, prior to the constructor being called.

create_node(node_config: dict[str, Any], tags: dict[str, str], count: int) dict[str, Any] | None[source]

Creates a number of nodes within the namespace.

create_node_with_resources_and_labels(node_config: dict[str, Any], tags: dict[str, str], count: int, resources: dict[str, float], labels: dict[str, str]) dict[str, Any] | None[source]

Create nodes with a given resource and label config. This is the method actually called by the autoscaler. Prefer to implement this when possible directly, otherwise it delegates to the create_node() implementation.

Optionally may throw a ray.autoscaler.node_launch_exception.NodeLaunchException.

external_ip(node_id: str) str[source]

Returns the external IP of the given node.

internal_ip(node_id: str) str[source]

Returns the internal IP (Ray IP) of the given node.

is_running(node_id: str) bool[source]

Is the specified node is running?

is_terminated(node_id: str) bool[source]

Is the specified node terminated?

node_tags(node_id: str) dict[str, str][source]

Returns the tags of the given node ID.

non_terminated_nodes(tag_filters: dict[str, str]) list[str][source]

Return a list of node IDs filtered by the specified tags dict.

This list must not include terminated nodes. For performance reasons, providers are allowed to cache the result of a call to non_terminated_nodes() to serve single-node queries (e.g. is_running(node_id)). This means that non_terminated_nodes() must be called again to refresh results.

Examples:
>>> from ray.autoscaler.node_provider import NodeProvider
>>> from ray.autoscaler.tags import TAG_RAY_NODE_KIND
>>> provider = NodeProvider(...)
>>> provider.non_terminated_nodes(
...     {TAG_RAY_NODE_KIND: "worker"})
["node-1", "node-2"]
prepare_for_head_node(cluster_config: dict[str, Any]) dict[str, Any][source]

Returns a new cluster config with custom configs for head node.

set_node_tags(node_id: str, tags: dict) None[source]

Sets the tag values (string dict) for the specified node.

terminate_node(node_id: str) None[source]

Terminates the specified node.

terminate_nodes(node_ids: list[str]) None[source]

Terminates a set of nodes.

class yellowdog_ray.raydog.autoscaler.TagStore(cluster_name: str)[source]

Bases: object

Manage the tags used to control everything. The tag store is a dedicated Redis/Valkey server running on the head node.

close() None[source]

Close SSH tunnel and Redis connections.

connect(remote_server: str | None, port: int, auth_config: dict[str, str] = None) None[source]

Connect to the Redis tag server on the head node.

find_matches(longlist: list[str] | None, tag_name: str, tag_value: str) list[str][source]
get_all_tags(node_id: str) dict[str, str][source]
get_tag(node_id: str, tag_name: str) str | None[source]
refresh() None[source]

Read all tags from the head node.

update_tags(node_id: str, new_tags: dict[str, str]) None[source]