Agent SkillsAgent Skills
tylertitsworth

flyte-kuberay

@tylertitsworth/flyte-kuberay
tylertitsworth
0
0 forks
Updated 4/1/2026
View on GitHub

Flyte + Ray integration via flytekitplugins-ray — RayJobConfig, head/worker resources, autoscaling, PodTemplate, and @dynamic cluster sizing. Use when running Ray workloads as Flyte tasks.

Installation

$npx agent-skills-cli install @tylertitsworth/flyte-kuberay
Claude Code
Cursor
Copilot
Codex
Antigravity

Details

Pathflyte-kuberay/SKILL.md
Branchmain
Scoped Name@tylertitsworth/flyte-kuberay

Usage

After installing, this skill will be available to your AI coding assistant.

Verify installation:

npx agent-skills-cli list

Skill Instructions


name: flyte-kuberay description: "Flyte + Ray integration via flytekitplugins-ray — RayJobConfig, head/worker resources, autoscaling, PodTemplate, and @dynamic cluster sizing. Use when running Ray workloads as Flyte tasks."

Flyte × KubeRay Integration

The flytekitplugins-ray package lets you run Ray workloads as Flyte tasks. Flyte creates an ephemeral Ray cluster (via the KubeRay operator) for each task execution, then submits your function as a Ray job.

Requirements:

  • KubeRay operator installed in the K8s cluster
  • flytekitplugins-ray in your task's container image
  • Flyte's Ray backend plugin enabled (see flyte-deployment skill)

Version compatibility: flytekitplugins-ray >= 1.16 requires flytekit > 1.14.5 and kuberay >= 1.1.0

Execution Flow

Understanding how Flyte orchestrates Ray is essential for debugging:

@task(task_config=RayJobConfig(...))
         │
         ▼
Flytekit serializes RayJobConfig → TaskTemplate (Protobuf)
         │
         ▼
FlytePropeller sees task type "ray"
         │
         ▼
Ray backend plugin creates a RayJob CRD in K8s
         │
         ▼
KubeRay operator provisions Ray cluster (head + workers)
         │
         ▼
KubeRay submits your task function via Ray Job Submission API
         │
         ▼
ray.init() connects to the cluster automatically
         │
         ▼
Task function executes (Ray remote calls fan out to workers)
         │
         ▼
On completion: outputs serialized, cluster torn down (if shutdown_after_job_finishes=True)

Key implications:

  • ray.init() is called automatically in pre_execute — don't call it yourself
  • The address is set to the head node automatically
  • Your task function runs on the Ray head node (as the driver)
  • @ray.remote functions and actors are scheduled on worker nodes
  • The cluster is ephemeral — it's created and destroyed per task execution

RayJobConfig

RayJobConfig Settings

SettingPurposeDefault
worker_node_configList of worker group configsrequired
head_node_configHead node configurationNone (uses defaults)
enable_autoscalingEnable Ray autoscalerFalse
runtime_envRay runtime environment (pip packages, env vars, working dir)None
addressRay cluster address (auto-set, rarely override)None
shutdown_after_job_finishesTear down cluster after task completesFalse
ttl_seconds_after_finishedSeconds before cleaning up a finished clusterNone

HeadNodeConfig Settings

SettingPurposeDefault
ray_start_paramsParameters passed to ray start on headNone
requestsResource requests (CPU, memory, GPU)None (uses task-level)
limitsResource limitsNone
pod_templateCustom PodTemplate for advanced pod configNone

WorkerNodeConfig Settings

SettingPurposeDefault
group_nameWorker group name (must be unique per group)required
replicasNumber of worker pods1
min_replicasMin replicas for autoscalingNone
max_replicasMax replicas for autoscalingNone
ray_start_paramsParameters passed to ray start on workersNone
requestsResource requests per workerNone
limitsResource limits per workerNone
pod_templateCustom PodTemplate for workersNone

Resource Configuration

Resources can be set at three levels (most specific wins):

  1. Task-level requests/limits in @task() — applies to ALL pods (head + workers)
  2. HeadNodeConfig requests/limits — head pod only
  3. WorkerNodeConfig requests/limits — per worker group

GPU Training Example

from flytekit import ImageSpec, Resources, task, workflow
from flytekitplugins.ray import RayJobConfig, HeadNodeConfig, WorkerNodeConfig

training_image = ImageSpec(
    name="ray-training",
    packages=["flytekitplugins-ray", "ray[default,train]", "torch", "transformers"],
    apt_packages=["wget"],  # kuberay readiness probe
    registry="ghcr.io/my-org",
)

ray_config = RayJobConfig(
    head_node_config=HeadNodeConfig(
        ray_start_params={"num-cpus": "0"},  # don't schedule work on head
        requests=Resources(cpu="4", mem="16Gi"),
    ),
    worker_node_config=[
        WorkerNodeConfig(
            group_name="gpu-workers",
            replicas=4,
            requests=Resources(cpu="8", mem="32Gi", gpu="1"),
            limits=Resources(gpu="1"),
        ),
    ],
    runtime_env={"env_vars": {"WANDB_PROJECT": "my-project"}},
    shutdown_after_job_finishes=True,
    ttl_seconds_after_finished=300,
)

@task(
    task_config=ray_config,
    container_image=training_image,
)
def distributed_training(model_name: str, epochs: int) -> float:
    import ray.train
    from ray.train.torch import TorchTrainer
    from ray.train import ScalingConfig

    trainer = TorchTrainer(
        train_loop_per_worker=train_func,
        scaling_config=ScalingConfig(num_workers=4, use_gpu=True),
        train_loop_config={"model_name": model_name, "epochs": epochs},
    )
    result = trainer.fit()
    return result.metrics["eval_loss"]

Multiple Worker Groups (Heterogeneous)

ray_config = RayJobConfig(
    head_node_config=HeadNodeConfig(
        requests=Resources(cpu="4", mem="8Gi"),
    ),
    worker_node_config=[
        # GPU workers for training
        WorkerNodeConfig(
            group_name="gpu-workers",
            replicas=4,
            requests=Resources(cpu="8", mem="32Gi", gpu="1"),
        ),
        # CPU workers for data preprocessing
        WorkerNodeConfig(
            group_name="cpu-workers",
            replicas=8,
            requests=Resources(cpu="4", mem="16Gi"),
        ),
    ],
    shutdown_after_job_finishes=True,
)

Autoscaling

ray_config = RayJobConfig(
    worker_node_config=[
        WorkerNodeConfig(
            group_name="gpu-workers",
            replicas=2,       # initial replicas
            min_replicas=1,   # scale down to 1
            max_replicas=8,   # scale up to 8
            requests=Resources(cpu="8", mem="32Gi", gpu="1"),
        ),
    ],
    enable_autoscaling=True,
    shutdown_after_job_finishes=True,
)

PodTemplate for Advanced Configuration

Use PodTemplate for node selectors, tolerations, volumes, and other pod-level settings:

from flytekit import PodTemplate
from kubernetes.client import (
    V1PodSpec, V1Container, V1Volume, V1VolumeMount,
    V1EmptyDirVolumeSource, V1Toleration,
)

gpu_pod_template = PodTemplate(
    pod_spec=V1PodSpec(
        containers=[
            V1Container(
                name="ray-worker",
                volume_mounts=[
                    V1VolumeMount(name="shm", mount_path="/dev/shm"),
                    V1VolumeMount(name="data", mount_path="/data"),
                ],
            ),
        ],
        volumes=[
            V1Volume(name="shm", empty_dir=V1EmptyDirVolumeSource(
                medium="Memory", size_limit="16Gi"
            )),
            V1Volume(name="data", persistent_volume_claim={
                "claimName": "training-data"
            }),
        ],
        tolerations=[
            V1Toleration(key="nvidia.com/gpu", operator="Exists", effect="NoSchedule"),
        ],
        node_selector={"nvidia.com/gpu.product": "NVIDIA-A100-SXM4-80GB"},
    ),
)

ray_config = RayJobConfig(
    worker_node_config=[
        WorkerNodeConfig(
            group_name="gpu-workers",
            replicas=4,
            requests=Resources(cpu="8", mem="32Gi", gpu="1"),
            pod_template=gpu_pod_template,
        ),
    ],
    shutdown_after_job_finishes=True,
)

Dynamic Cluster Sizing

Use @dynamic to determine cluster size at runtime:

from flytekit import dynamic

@dynamic(container_image=training_image)
def adaptive_training(model_name: str, dataset_size: int) -> float:
    # Determine GPU count based on dataset size
    num_gpus = 4 if dataset_size > 1_000_000 else 2

    ray_config = RayJobConfig(
        worker_node_config=[
            WorkerNodeConfig(
                group_name="gpu-workers",
                replicas=num_gpus,
                requests=Resources(cpu="8", mem="32Gi", gpu="1"),
            ),
        ],
        shutdown_after_job_finishes=True,
    )

    return train_task(model_name=model_name).with_overrides(
        task_config=ray_config,
        requests=Resources(cpu="4", mem="8Gi"),
    )

Common Patterns

Ray Data Processing Pipeline

@task(
    task_config=RayJobConfig(
        worker_node_config=[
            WorkerNodeConfig(group_name="data-workers", replicas=16,
                             requests=Resources(cpu="4", mem="16Gi")),
        ],
        shutdown_after_job_finishes=True,
    ),
    container_image=data_image,
)
def preprocess_dataset(input_path: str) -> str:
    import ray
    ds = ray.data.read_parquet(input_path)
    ds = ds.map(tokenize_fn).repartition(100)
    output_path = "/data/processed"
    ds.write_parquet(output_path)
    return output_path

Ray Serve Deployment Validation

@task(
    task_config=RayJobConfig(
        worker_node_config=[
            WorkerNodeConfig(group_name="serve-workers", replicas=2,
                             requests=Resources(cpu="4", mem="16Gi", gpu="1")),
        ],
        shutdown_after_job_finishes=True,
    ),
    container_image=serve_image,
)
def validate_serving(model_path: str) -> dict:
    import ray.serve
    # Deploy and test model serving, return latency/throughput metrics
    ...

Full Training Pipeline

@workflow
def training_pipeline(
    raw_data: str, model_name: str, epochs: int
) -> float:
    processed = preprocess_dataset(input_path=raw_data)     # Ray Data task
    metrics = distributed_training(                          # Ray Train task
        model_name=model_name, epochs=epochs
    )
    return metrics

Runtime Environment

The runtime_env field installs dependencies at Ray cluster startup:

RayJobConfig(
    runtime_env={
        "pip": ["numpy==1.26.0", "pandas"],
        "env_vars": {"TOKENIZERS_PARALLELISM": "false"},
        "working_dir": "./src",  # uploaded to cluster
    },
    ...
)

# Install specific packages at cluster startup:
RayJobConfig(
    runtime_env={
        "pip": ["numpy==1.26.0", "pandas", "scikit-learn"],
    },
    ...
)

Prefer ImageSpec over runtime_env for heavy dependencies (PyTorch, transformers). runtime_env is best for lightweight, frequently-changing deps. Heavy pip installs at cluster startup add significant latency.

Flyte Backend Plugin Configuration

The Flyte admin must enable the Ray plugin. In Flyte's Helm values:

# flyte-binary or flyte-core values
configmap:
  enabled_plugins:
    tasks:
      task-plugins:
        enabled-plugins:
          - container
          - ray
        default-for-task-types:
          container: container
          ray: ray

Troubleshooting

Cluster Not Creating

SymptomCauseFix
Task stuck in QUEUEDKubeRay operator not installedInstall KubeRay operator in cluster
RayJob CRD not foundAPI version mismatchEnsure flyte >= 1.11.1 with kuberay >= 1.1.0
Pods pendingInsufficient cluster resourcesCheck node resources, adjust requests
GPU pods pendingNo GPU nodes / wrong resource keyVerify nvidia.com/gpu resource, check node labels

Task Failures

SymptomCauseFix
ray.init() connection refusedHead node not readyIncrease ttl_seconds_after_finished, check head pod logs
Worker OOM killedMemory limits too lowIncrease worker requests.mem / limits.mem
ModuleNotFoundErrorMissing from image or runtime_envAdd to ImageSpec packages or runtime_env.pip
Ingress webhook errorsMissing ingress controllerInstall nginx-ingress or disable Ray dashboard ingress
Task timeoutCluster creation too slowPre-pull images, increase Flyte task timeout

Debugging

# Check RayJob CRDs created by Flyte
kubectl get rayjobs -n <flyte-execution-ns>

# Check Ray cluster status
kubectl get rayclusters -n <flyte-execution-ns>

# Check head/worker pod status
kubectl get pods -n <flyte-execution-ns> -l ray.io/cluster=<cluster-name>

# View Ray head logs (driver output)
kubectl logs -n <flyte-execution-ns> <head-pod-name>

# Check KubeRay operator logs
kubectl logs -n ray-system deploy/kuberay-operator

Production Patterns

For production deployment patterns including image pre-pulling, shared memory for NCCL, Kueue integration, spot instance workers, S3/Longhorn checkpointing, and task cleanup, see references/production-patterns.md.

Debug Script

Run scripts/flyte-ray-debug.sh <execution-name> [namespace] to inspect the full RayJob → RayCluster → pod pipeline for a Flyte execution, including status, events, and logs.

References

Cross-References

  • flyte-sdk — Flytekit task/workflow authoring, ImageSpec, type system
  • flyte-deployment — Flyte cluster setup and plugin configuration
  • kuberay — KubeRay operator CRDs, RayJob spec, troubleshooting
  • ray-train — Ray Train distributed training config
  • ray-core — Ray remote functions, actors, object store
  • gpu-operator — GPU driver and device plugin for Ray GPU workers
  • kueue — Queue Flyte-submitted Ray jobs
  • wandb — Experiment tracking from Flyte+Ray training tasks

Reference