Flyte + Ray integration via flytekitplugins-ray — RayJobConfig, head/worker resources, autoscaling, PodTemplate, and @dynamic cluster sizing. Use when running Ray workloads as Flyte tasks.
Installation
Details
Usage
After installing, this skill will be available to your AI coding assistant.
Verify installation:
npx agent-skills-cli listSkill Instructions
name: flyte-kuberay description: "Flyte + Ray integration via flytekitplugins-ray — RayJobConfig, head/worker resources, autoscaling, PodTemplate, and @dynamic cluster sizing. Use when running Ray workloads as Flyte tasks."
Flyte × KubeRay Integration
The flytekitplugins-ray package lets you run Ray workloads as Flyte tasks. Flyte creates an ephemeral Ray cluster (via the KubeRay operator) for each task execution, then submits your function as a Ray job.
Requirements:
- KubeRay operator installed in the K8s cluster
flytekitplugins-rayin your task's container image- Flyte's Ray backend plugin enabled (see flyte-deployment skill)
Version compatibility: flytekitplugins-ray >= 1.16 requires flytekit > 1.14.5 and kuberay >= 1.1.0
Execution Flow
Understanding how Flyte orchestrates Ray is essential for debugging:
@task(task_config=RayJobConfig(...))
│
▼
Flytekit serializes RayJobConfig → TaskTemplate (Protobuf)
│
▼
FlytePropeller sees task type "ray"
│
▼
Ray backend plugin creates a RayJob CRD in K8s
│
▼
KubeRay operator provisions Ray cluster (head + workers)
│
▼
KubeRay submits your task function via Ray Job Submission API
│
▼
ray.init() connects to the cluster automatically
│
▼
Task function executes (Ray remote calls fan out to workers)
│
▼
On completion: outputs serialized, cluster torn down (if shutdown_after_job_finishes=True)
Key implications:
ray.init()is called automatically inpre_execute— don't call it yourself- The
addressis set to the head node automatically - Your task function runs on the Ray head node (as the driver)
@ray.remotefunctions and actors are scheduled on worker nodes- The cluster is ephemeral — it's created and destroyed per task execution
RayJobConfig
RayJobConfig Settings
| Setting | Purpose | Default |
|---|---|---|
worker_node_config | List of worker group configs | required |
head_node_config | Head node configuration | None (uses defaults) |
enable_autoscaling | Enable Ray autoscaler | False |
runtime_env | Ray runtime environment (pip packages, env vars, working dir) | None |
address | Ray cluster address (auto-set, rarely override) | None |
shutdown_after_job_finishes | Tear down cluster after task completes | False |
ttl_seconds_after_finished | Seconds before cleaning up a finished cluster | None |
HeadNodeConfig Settings
| Setting | Purpose | Default |
|---|---|---|
ray_start_params | Parameters passed to ray start on head | None |
requests | Resource requests (CPU, memory, GPU) | None (uses task-level) |
limits | Resource limits | None |
pod_template | Custom PodTemplate for advanced pod config | None |
WorkerNodeConfig Settings
| Setting | Purpose | Default |
|---|---|---|
group_name | Worker group name (must be unique per group) | required |
replicas | Number of worker pods | 1 |
min_replicas | Min replicas for autoscaling | None |
max_replicas | Max replicas for autoscaling | None |
ray_start_params | Parameters passed to ray start on workers | None |
requests | Resource requests per worker | None |
limits | Resource limits per worker | None |
pod_template | Custom PodTemplate for workers | None |
Resource Configuration
Resources can be set at three levels (most specific wins):
- Task-level
requests/limitsin@task()— applies to ALL pods (head + workers) - HeadNodeConfig
requests/limits— head pod only - WorkerNodeConfig
requests/limits— per worker group
GPU Training Example
from flytekit import ImageSpec, Resources, task, workflow
from flytekitplugins.ray import RayJobConfig, HeadNodeConfig, WorkerNodeConfig
training_image = ImageSpec(
name="ray-training",
packages=["flytekitplugins-ray", "ray[default,train]", "torch", "transformers"],
apt_packages=["wget"], # kuberay readiness probe
registry="ghcr.io/my-org",
)
ray_config = RayJobConfig(
head_node_config=HeadNodeConfig(
ray_start_params={"num-cpus": "0"}, # don't schedule work on head
requests=Resources(cpu="4", mem="16Gi"),
),
worker_node_config=[
WorkerNodeConfig(
group_name="gpu-workers",
replicas=4,
requests=Resources(cpu="8", mem="32Gi", gpu="1"),
limits=Resources(gpu="1"),
),
],
runtime_env={"env_vars": {"WANDB_PROJECT": "my-project"}},
shutdown_after_job_finishes=True,
ttl_seconds_after_finished=300,
)
@task(
task_config=ray_config,
container_image=training_image,
)
def distributed_training(model_name: str, epochs: int) -> float:
import ray.train
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
trainer = TorchTrainer(
train_loop_per_worker=train_func,
scaling_config=ScalingConfig(num_workers=4, use_gpu=True),
train_loop_config={"model_name": model_name, "epochs": epochs},
)
result = trainer.fit()
return result.metrics["eval_loss"]
Multiple Worker Groups (Heterogeneous)
ray_config = RayJobConfig(
head_node_config=HeadNodeConfig(
requests=Resources(cpu="4", mem="8Gi"),
),
worker_node_config=[
# GPU workers for training
WorkerNodeConfig(
group_name="gpu-workers",
replicas=4,
requests=Resources(cpu="8", mem="32Gi", gpu="1"),
),
# CPU workers for data preprocessing
WorkerNodeConfig(
group_name="cpu-workers",
replicas=8,
requests=Resources(cpu="4", mem="16Gi"),
),
],
shutdown_after_job_finishes=True,
)
Autoscaling
ray_config = RayJobConfig(
worker_node_config=[
WorkerNodeConfig(
group_name="gpu-workers",
replicas=2, # initial replicas
min_replicas=1, # scale down to 1
max_replicas=8, # scale up to 8
requests=Resources(cpu="8", mem="32Gi", gpu="1"),
),
],
enable_autoscaling=True,
shutdown_after_job_finishes=True,
)
PodTemplate for Advanced Configuration
Use PodTemplate for node selectors, tolerations, volumes, and other pod-level settings:
from flytekit import PodTemplate
from kubernetes.client import (
V1PodSpec, V1Container, V1Volume, V1VolumeMount,
V1EmptyDirVolumeSource, V1Toleration,
)
gpu_pod_template = PodTemplate(
pod_spec=V1PodSpec(
containers=[
V1Container(
name="ray-worker",
volume_mounts=[
V1VolumeMount(name="shm", mount_path="/dev/shm"),
V1VolumeMount(name="data", mount_path="/data"),
],
),
],
volumes=[
V1Volume(name="shm", empty_dir=V1EmptyDirVolumeSource(
medium="Memory", size_limit="16Gi"
)),
V1Volume(name="data", persistent_volume_claim={
"claimName": "training-data"
}),
],
tolerations=[
V1Toleration(key="nvidia.com/gpu", operator="Exists", effect="NoSchedule"),
],
node_selector={"nvidia.com/gpu.product": "NVIDIA-A100-SXM4-80GB"},
),
)
ray_config = RayJobConfig(
worker_node_config=[
WorkerNodeConfig(
group_name="gpu-workers",
replicas=4,
requests=Resources(cpu="8", mem="32Gi", gpu="1"),
pod_template=gpu_pod_template,
),
],
shutdown_after_job_finishes=True,
)
Dynamic Cluster Sizing
Use @dynamic to determine cluster size at runtime:
from flytekit import dynamic
@dynamic(container_image=training_image)
def adaptive_training(model_name: str, dataset_size: int) -> float:
# Determine GPU count based on dataset size
num_gpus = 4 if dataset_size > 1_000_000 else 2
ray_config = RayJobConfig(
worker_node_config=[
WorkerNodeConfig(
group_name="gpu-workers",
replicas=num_gpus,
requests=Resources(cpu="8", mem="32Gi", gpu="1"),
),
],
shutdown_after_job_finishes=True,
)
return train_task(model_name=model_name).with_overrides(
task_config=ray_config,
requests=Resources(cpu="4", mem="8Gi"),
)
Common Patterns
Ray Data Processing Pipeline
@task(
task_config=RayJobConfig(
worker_node_config=[
WorkerNodeConfig(group_name="data-workers", replicas=16,
requests=Resources(cpu="4", mem="16Gi")),
],
shutdown_after_job_finishes=True,
),
container_image=data_image,
)
def preprocess_dataset(input_path: str) -> str:
import ray
ds = ray.data.read_parquet(input_path)
ds = ds.map(tokenize_fn).repartition(100)
output_path = "/data/processed"
ds.write_parquet(output_path)
return output_path
Ray Serve Deployment Validation
@task(
task_config=RayJobConfig(
worker_node_config=[
WorkerNodeConfig(group_name="serve-workers", replicas=2,
requests=Resources(cpu="4", mem="16Gi", gpu="1")),
],
shutdown_after_job_finishes=True,
),
container_image=serve_image,
)
def validate_serving(model_path: str) -> dict:
import ray.serve
# Deploy and test model serving, return latency/throughput metrics
...
Full Training Pipeline
@workflow
def training_pipeline(
raw_data: str, model_name: str, epochs: int
) -> float:
processed = preprocess_dataset(input_path=raw_data) # Ray Data task
metrics = distributed_training( # Ray Train task
model_name=model_name, epochs=epochs
)
return metrics
Runtime Environment
The runtime_env field installs dependencies at Ray cluster startup:
RayJobConfig(
runtime_env={
"pip": ["numpy==1.26.0", "pandas"],
"env_vars": {"TOKENIZERS_PARALLELISM": "false"},
"working_dir": "./src", # uploaded to cluster
},
...
)
# Install specific packages at cluster startup:
RayJobConfig(
runtime_env={
"pip": ["numpy==1.26.0", "pandas", "scikit-learn"],
},
...
)
Prefer ImageSpec over runtime_env for heavy dependencies (PyTorch, transformers). runtime_env is best for lightweight, frequently-changing deps. Heavy pip installs at cluster startup add significant latency.
Flyte Backend Plugin Configuration
The Flyte admin must enable the Ray plugin. In Flyte's Helm values:
# flyte-binary or flyte-core values
configmap:
enabled_plugins:
tasks:
task-plugins:
enabled-plugins:
- container
- ray
default-for-task-types:
container: container
ray: ray
Troubleshooting
Cluster Not Creating
| Symptom | Cause | Fix |
|---|---|---|
Task stuck in QUEUED | KubeRay operator not installed | Install KubeRay operator in cluster |
| RayJob CRD not found | API version mismatch | Ensure flyte >= 1.11.1 with kuberay >= 1.1.0 |
| Pods pending | Insufficient cluster resources | Check node resources, adjust requests |
| GPU pods pending | No GPU nodes / wrong resource key | Verify nvidia.com/gpu resource, check node labels |
Task Failures
| Symptom | Cause | Fix |
|---|---|---|
ray.init() connection refused | Head node not ready | Increase ttl_seconds_after_finished, check head pod logs |
| Worker OOM killed | Memory limits too low | Increase worker requests.mem / limits.mem |
ModuleNotFoundError | Missing from image or runtime_env | Add to ImageSpec packages or runtime_env.pip |
| Ingress webhook errors | Missing ingress controller | Install nginx-ingress or disable Ray dashboard ingress |
| Task timeout | Cluster creation too slow | Pre-pull images, increase Flyte task timeout |
Debugging
# Check RayJob CRDs created by Flyte
kubectl get rayjobs -n <flyte-execution-ns>
# Check Ray cluster status
kubectl get rayclusters -n <flyte-execution-ns>
# Check head/worker pod status
kubectl get pods -n <flyte-execution-ns> -l ray.io/cluster=<cluster-name>
# View Ray head logs (driver output)
kubectl logs -n <flyte-execution-ns> <head-pod-name>
# Check KubeRay operator logs
kubectl logs -n ray-system deploy/kuberay-operator
Production Patterns
For production deployment patterns including image pre-pulling, shared memory for NCCL, Kueue integration, spot instance workers, S3/Longhorn checkpointing, and task cleanup, see references/production-patterns.md.
Debug Script
Run scripts/flyte-ray-debug.sh <execution-name> [namespace] to inspect the full RayJob → RayCluster → pod pipeline for a Flyte execution, including status, events, and logs.
References
production-patterns.md— Image pre-pulling, resource tuning, and production Ray-on-Flyte patterns
Cross-References
- flyte-sdk — Flytekit task/workflow authoring, ImageSpec, type system
- flyte-deployment — Flyte cluster setup and plugin configuration
- kuberay — KubeRay operator CRDs, RayJob spec, troubleshooting
- ray-train — Ray Train distributed training config
- ray-core — Ray remote functions, actors, object store
- gpu-operator — GPU driver and device plugin for Ray GPU workers
- kueue — Queue Flyte-submitted Ray jobs
- wandb — Experiment tracking from Flyte+Ray training tasks
Reference
More by tylertitsworth
View alluv — fast Python package/project manager, lockfiles, Python versions, uvx tool runner, Docker/CI integration. Use for Python dependency management. NOT for package publishing.
TensorRT-LLM — engine building, quantization (FP8/FP4/INT4/AWQ), Python LLM API, AutoDeploy, KV cache tuning, in-flight batching, disaggregated serving with HTTP cluster management, Ray orchestrator, sparse attention (RocketKV), Triton backend. Use when optimizing directly with TRT-LLM. NOT for NIM deployment or vLLM/SGLang setup.
NVIDIA Triton Inference Server — model repository, config.pbtxt, ensemble/BLS pipelines, backends (TensorRT/ONNX/Python), dynamic batching, model management API, perf_analyzer. Use when serving models with Triton Inference Server. NOT for K8s deployment patterns. NOT for NIM.
KubeRay operator — RayCluster, RayJob, RayService, GPU scheduling, autoscaling, auth tokens, Label Selector API, GCS fault tolerance, TLS, observability, and Kueue/Volcano integration. Use when deploying Ray on Kubernetes. NOT for Ray Core programming (see ray-core).
