Agent SkillsAgent Skills
tylertitsworth

flyte-kuberay

@tylertitsworth/flyte-kuberay
tylertitsworth
0
0 forks
Updated 4/1/2026
View on GitHub

Flyte + Ray integration via flytekitplugins-ray β€” RayJobConfig, head/worker resources, autoscaling, PodTemplate, and @dynamic cluster sizing. Use when running Ray workloads as Flyte tasks.

Installation

$npx agent-skills-cli install @tylertitsworth/flyte-kuberay
Claude Code
Cursor
Copilot
Codex
Antigravity

Details

Pathflyte-kuberay/SKILL.md
Branchmain
Scoped Name@tylertitsworth/flyte-kuberay

Usage

After installing, this skill will be available to your AI coding assistant.

Verify installation:

npx agent-skills-cli list

Skill Instructions


name: flyte-kuberay description: "Flyte + Ray integration via flytekitplugins-ray β€” RayJobConfig, head/worker resources, autoscaling, PodTemplate, and @dynamic cluster sizing. Use when running Ray workloads as Flyte tasks."

Flyte Γ— KubeRay Integration

The flytekitplugins-ray package lets you run Ray workloads as Flyte tasks. Flyte creates an ephemeral Ray cluster (via the KubeRay operator) for each task execution, then submits your function as a Ray job.

Requirements:

  • KubeRay operator installed in the K8s cluster
  • flytekitplugins-ray in your task's container image
  • Flyte's Ray backend plugin enabled (see flyte-deployment skill)

Version compatibility: flytekitplugins-ray >= 1.16 requires flytekit > 1.14.5 and kuberay >= 1.1.0

Execution Flow

Understanding how Flyte orchestrates Ray is essential for debugging:

@task(task_config=RayJobConfig(...))
         β”‚
         β–Ό
Flytekit serializes RayJobConfig β†’ TaskTemplate (Protobuf)
         β”‚
         β–Ό
FlytePropeller sees task type "ray"
         β”‚
         β–Ό
Ray backend plugin creates a RayJob CRD in K8s
         β”‚
         β–Ό
KubeRay operator provisions Ray cluster (head + workers)
         β”‚
         β–Ό
KubeRay submits your task function via Ray Job Submission API
         β”‚
         β–Ό
ray.init() connects to the cluster automatically
         β”‚
         β–Ό
Task function executes (Ray remote calls fan out to workers)
         β”‚
         β–Ό
On completion: outputs serialized, cluster torn down (if shutdown_after_job_finishes=True)

Key implications:

  • ray.init() is called automatically in pre_execute β€” don't call it yourself
  • The address is set to the head node automatically
  • Your task function runs on the Ray head node (as the driver)
  • @ray.remote functions and actors are scheduled on worker nodes
  • The cluster is ephemeral β€” it's created and destroyed per task execution

RayJobConfig

RayJobConfig Settings

SettingPurposeDefault
worker_node_configList of worker group configsrequired
head_node_configHead node configurationNone (uses defaults)
enable_autoscalingEnable Ray autoscalerFalse
runtime_envRay runtime environment (pip packages, env vars, working dir)None
addressRay cluster address (auto-set, rarely override)None
shutdown_after_job_finishesTear down cluster after task completesFalse
ttl_seconds_after_finishedSeconds before cleaning up a finished clusterNone

HeadNodeConfig Settings

SettingPurposeDefault
ray_start_paramsParameters passed to ray start on headNone
requestsResource requests (CPU, memory, GPU)None (uses task-level)
limitsResource limitsNone
pod_templateCustom PodTemplate for advanced pod configNone

WorkerNodeConfig Settings

SettingPurposeDefault
group_nameWorker group name (must be unique per group)required
replicasNumber of worker pods1
min_replicasMin replicas for autoscalingNone
max_replicasMax replicas for autoscalingNone
ray_start_paramsParameters passed to ray start on workersNone
requestsResource requests per workerNone
limitsResource limits per workerNone
pod_templateCustom PodTemplate for workersNone

Resource Configuration

Resources can be set at three levels (most specific wins):

  1. Task-level requests/limits in @task() β€” applies to ALL pods (head + workers)
  2. HeadNodeConfig requests/limits β€” head pod only
  3. WorkerNodeConfig requests/limits β€” per worker group

GPU Training Example

from flytekit import ImageSpec, Resources, task, workflow
from flytekitplugins.ray import RayJobConfig, HeadNodeConfig, WorkerNodeConfig

training_image = ImageSpec(
    name="ray-training",
    packages=["flytekitplugins-ray", "ray[default,train]", "torch", "transformers"],
    apt_packages=["wget"],  # kuberay readiness probe
    registry="ghcr.io/my-org",
)

ray_config = RayJobConfig(
    head_node_config=HeadNodeConfig(
        ray_start_params={"num-cpus": "0"},  # don't schedule work on head
        requests=Resources(cpu="4", mem="16Gi"),
    ),
    worker_node_config=[
        WorkerNodeConfig(
            group_name="gpu-workers",
            replicas=4,
            requests=Resources(cpu="8", mem="32Gi", gpu="1"),
            limits=Resources(gpu="1"),
        ),
    ],
    runtime_env={"env_vars": {"WANDB_PROJECT": "my-project"}},
    shutdown_after_job_finishes=True,
    ttl_seconds_after_finished=300,
)

@task(
    task_config=ray_config,
    container_image=training_image,
)
def distributed_training(model_name: str, epochs: int) -> float:
    import ray.train
    from ray.train.torch import TorchTrainer
    from ray.train import ScalingConfig

    trainer = TorchTrainer(
        train_loop_per_worker=train_func,
        scaling_config=ScalingConfig(num_workers=4, use_gpu=True),
        train_loop_config={"model_name": model_name, "epochs": epochs},
    )
    result = trainer.fit()
    return result.metrics["eval_loss"]

Multiple Worker Groups (Heterogeneous)

ray_config = RayJobConfig(
    head_node_config=HeadNodeConfig(
        requests=Resources(cpu="4", mem="8Gi"),
    ),
    worker_node_config=[
        # GPU workers for training
        WorkerNodeConfig(
            group_name="gpu-workers",
            replicas=4,
            requests=Resources(cpu="8", mem="32Gi", gpu="1"),
        ),
        # CPU workers for data preprocessing
        WorkerNodeConfig(
            group_name="cpu-workers",
            replicas=8,
            requests=Resources(cpu="4", mem="16Gi"),
        ),
    ],
    shutdown_after_job_finishes=True,
)

Autoscaling

ray_config = RayJobConfig(
    worker_node_config=[
        WorkerNodeConfig(
            group_name="gpu-workers",
            replicas=2,       # initial replicas
            min_replicas=1,   # scale down to 1
            max_replicas=8,   # scale up to 8
            requests=Resources(cpu="8", mem="32Gi", gpu="1"),
        ),
    ],
    enable_autoscaling=True,
    shutdown_after_job_finishes=True,
)

PodTemplate for Advanced Configuration

Use PodTemplate for node selectors, tolerations, volumes, and other pod-level settings:

from flytekit import PodTemplate
from kubernetes.client import (
    V1PodSpec, V1Container, V1Volume, V1VolumeMount,
    V1EmptyDirVolumeSource, V1Toleration,
)

gpu_pod_template = PodTemplate(
    pod_spec=V1PodSpec(
        containers=[
            V1Container(
                name="ray-worker",
                volume_mounts=[
                    V1VolumeMount(name="shm", mount_path="/dev/shm"),
                    V1VolumeMount(name="data", mount_path="/data"),
                ],
            ),
        ],
        volumes=[
            V1Volume(name="shm", empty_dir=V1EmptyDirVolumeSource(
                medium="Memory", size_limit="16Gi"
            )),
            V1Volume(name="data", persistent_volume_claim={
                "claimName": "training-data"
            }),
        ],
        tolerations=[
            V1Toleration(key="nvidia.com/gpu", operator="Exists", effect="NoSchedule"),
        ],
        node_selector={"nvidia.com/gpu.product": "NVIDIA-A100-SXM4-80GB"},
    ),
)

ray_config = RayJobConfig(
    worker_node_config=[
        WorkerNodeConfig(
            group_name="gpu-workers",
            replicas=4,
            requests=Resources(cpu="8", mem="32Gi", gpu="1"),
            pod_template=gpu_pod_template,
        ),
    ],
    shutdown_after_job_finishes=True,
)

Dynamic Cluster Sizing

Use @dynamic to determine cluster size at runtime:

from flytekit import dynamic

@dynamic(container_image=training_image)
def adaptive_training(model_name: str, dataset_size: int) -> float:
    # Determine GPU count based on dataset size
    num_gpus = 4 if dataset_size > 1_000_000 else 2

    ray_config = RayJobConfig(
        worker_node_config=[
            WorkerNodeConfig(
                group_name="gpu-workers",
                replicas=num_gpus,
                requests=Resources(cpu="8", mem="32Gi", gpu="1"),
            ),
        ],
        shutdown_after_job_finishes=True,
    )

    return train_task(model_name=model_name).with_overrides(
        task_config=ray_config,
        requests=Resources(cpu="4", mem="8Gi"),
    )

Common Patterns

Ray Data Processing Pipeline

@task(
    task_config=RayJobConfig(
        worker_node_config=[
            WorkerNodeConfig(group_name="data-workers", replicas=16,
                             requests=Resources(cpu="4", mem="16Gi")),
        ],
        shutdown_after_job_finishes=True,
    ),
    container_image=data_image,
)
def preprocess_dataset(input_path: str) -> str:
    import ray
    ds = ray.data.read_parquet(input_path)
    ds = ds.map(tokenize_fn).repartition(100)
    output_path = "/data/processed"
    ds.write_parquet(output_path)
    return output_path

Ray Serve Deployment Validation

@task(
    task_config=RayJobConfig(
        worker_node_config=[
            WorkerNodeConfig(group_name="serve-workers", replicas=2,
                             requests=Resources(cpu="4", mem="16Gi", gpu="1")),
        ],
        shutdown_after_job_finishes=True,
    ),
    container_image=serve_image,
)
def validate_serving(model_path: str) -> dict:
    import ray.serve
    # Deploy and test model serving, return latency/throughput metrics
    ...

Full Training Pipeline

@workflow
def training_pipeline(
    raw_data: str, model_name: str, epochs: int
) -> float:
    processed = preprocess_dataset(input_path=raw_data)     # Ray Data task
    metrics = distributed_training(                          # Ray Train task
        model_name=model_name, epochs=epochs
    )
    return metrics

Runtime Environment

The runtime_env field installs dependencies at Ray cluster startup:

RayJobConfig(
    runtime_env={
        "pip": ["numpy==1.26.0", "pandas"],
        "env_vars": {"TOKENIZERS_PARALLELISM": "false"},
        "working_dir": "./src",  # uploaded to cluster
    },
    ...
)

# Install specific packages at cluster startup:
RayJobConfig(
    runtime_env={
        "pip": ["numpy==1.26.0", "pandas", "scikit-learn"],
    },
    ...
)

Prefer ImageSpec over runtime_env for heavy dependencies (PyTorch, transformers). runtime_env is best for lightweight, frequently-changing deps. Heavy pip installs at cluster startup add significant latency.

Flyte Backend Plugin Configuration

The Flyte admin must enable the Ray plugin. In Flyte's Helm values:

# flyte-binary or flyte-core values
configmap:
  enabled_plugins:
    tasks:
      task-plugins:
        enabled-plugins:
          - container
          - ray
        default-for-task-types:
          container: container
          ray: ray

Troubleshooting

Cluster Not Creating

SymptomCauseFix
Task stuck in QUEUEDKubeRay operator not installedInstall KubeRay operator in cluster
RayJob CRD not foundAPI version mismatchEnsure flyte >= 1.11.1 with kuberay >= 1.1.0
Pods pendingInsufficient cluster resourcesCheck node resources, adjust requests
GPU pods pendingNo GPU nodes / wrong resource keyVerify nvidia.com/gpu resource, check node labels

Task Failures

SymptomCauseFix
ray.init() connection refusedHead node not readyIncrease ttl_seconds_after_finished, check head pod logs
Worker OOM killedMemory limits too lowIncrease worker requests.mem / limits.mem
ModuleNotFoundErrorMissing from image or runtime_envAdd to ImageSpec packages or runtime_env.pip
Ingress webhook errorsMissing ingress controllerInstall nginx-ingress or disable Ray dashboard ingress
Task timeoutCluster creation too slowPre-pull images, increase Flyte task timeout

Debugging

# Check RayJob CRDs created by Flyte
kubectl get rayjobs -n <flyte-execution-ns>

# Check Ray cluster status
kubectl get rayclusters -n <flyte-execution-ns>

# Check head/worker pod status
kubectl get pods -n <flyte-execution-ns> -l ray.io/cluster=<cluster-name>

# View Ray head logs (driver output)
kubectl logs -n <flyte-execution-ns> <head-pod-name>

# Check KubeRay operator logs
kubectl logs -n ray-system deploy/kuberay-operator

Production Patterns

For production deployment patterns including image pre-pulling, shared memory for NCCL, Kueue integration, spot instance workers, S3/Longhorn checkpointing, and task cleanup, see references/production-patterns.md.

Debug Script

Run scripts/flyte-ray-debug.sh <execution-name> [namespace] to inspect the full RayJob β†’ RayCluster β†’ pod pipeline for a Flyte execution, including status, events, and logs.

References

Cross-References

  • flyte-sdk β€” Flytekit task/workflow authoring, ImageSpec, type system
  • flyte-deployment β€” Flyte cluster setup and plugin configuration
  • kuberay β€” KubeRay operator CRDs, RayJob spec, troubleshooting
  • ray-train β€” Ray Train distributed training config
  • ray-core β€” Ray remote functions, actors, object store
  • gpu-operator β€” GPU driver and device plugin for Ray GPU workers
  • kueue β€” Queue Flyte-submitted Ray jobs
  • wandb β€” Experiment tracking from Flyte+Ray training tasks

Reference