Ray Core β @ray.remote tasks, actors, object store, Compiled Graphs (high-performance multi-GPU DAGs with NCCL), distributed patterns, fault tolerance, placement groups, and debugging. Use when writing distributed Python with Ray. NOT for Ray Serve/Train/Data (see those skills).
Installation
Details
Usage
After installing, this skill will be available to your AI coding assistant.
Verify installation:
npx agent-skills-cli listSkill Instructions
name: ray-core description: "Ray Core β @ray.remote tasks, actors, object store, Compiled Graphs (high-performance multi-GPU DAGs with NCCL), distributed patterns, fault tolerance, placement groups, and debugging. Use when writing distributed Python with Ray. NOT for Ray Serve/Train/Data (see those skills)."
Ray Core
Distributed computing primitives for Python. Three building blocks: tasks, actors, and objects.
Docs: https://docs.ray.io/en/latest/ray-core/walkthrough.html
Version: Ray 2.54.0 β Resource isolation (cgroup support in ray.init()), CUDA IPC transport for RDT, tensor-level deduplication for NIXL, TPU v7x support.
Initialization
import ray
# Local mode
ray.init()
# Connect to existing cluster
ray.init(address="ray://head-svc:10001") # via Ray client
ray.init(address="auto") # auto-detect (inside Ray pod)
# Configuration
ray.init(
num_cpus=8, # override detected CPUs (local only)
num_gpus=2, # override detected GPUs
object_store_memory=10**10, # 10GB object store
runtime_env={ # per-job dependencies
"pip": ["torch==2.5.0"],
"env_vars": {"KEY": "val"},
"working_dir": "./src",
},
logging_level="INFO",
dashboard_host="0.0.0.0",
)
Runtime Environments
Install dependencies dynamically per-job, per-task, or per-actor:
# Per-job (in ray.init)
ray.init(runtime_env={"pip": ["transformers"], "env_vars": {"HF_HOME": "/cache"}})
# Per-task override
@ray.remote(runtime_env={"pip": ["scipy"]})
def special_task(): ...
# Per-actor override
actor = MyActor.options(runtime_env={"pip": ["xgboost"]}).remote()
Fields: pip, conda, env_vars, working_dir, py_modules, excludes, container.
Tasks
Stateless remote functions. Decorate with @ray.remote, call with .remote():
import ray
@ray.remote
def train_model(config):
# runs on a remote worker
return {"loss": 0.1, "config": config}
# Launch 4 tasks in parallel
futures = [train_model.remote({"lr": lr}) for lr in [0.1, 0.01, 0.001, 0.0001]]
results = ray.get(futures) # blocks until all complete
Resource Requirements
# CPU only (default: 1 CPU)
@ray.remote(num_cpus=4)
def cpu_task(): ...
# GPU task
@ray.remote(num_gpus=1)
def gpu_task(): ...
# Multiple GPUs + memory
@ray.remote(num_gpus=2, memory=16 * 1024**3) # memory in bytes
def large_gpu_task(): ...
# Custom resources
@ray.remote(resources={"TPU": 1})
def tpu_task(): ...
# Override at call time
gpu_task.options(num_gpus=2).remote()
Task Options
@ray.remote(
num_cpus=1,
num_gpus=0,
memory=0, # bytes, 0 = no reservation
max_retries=3, # retries on system failure (-1 = infinite)
retry_exceptions=[ValueError], # also retry on these exceptions
num_returns=2, # multiple return values
scheduling_strategy="SPREAD",
)
def my_task(): ...
Actors
Stateful distributed objects. Each actor runs in its own process, methods execute serially:
@ray.remote(num_gpus=1)
class ModelServer:
def __init__(self, model_path):
self.model = load_model(model_path)
self.request_count = 0
def predict(self, batch):
self.request_count += 1
return self.model(batch)
def get_stats(self):
return {"requests": self.request_count}
# Create actor (starts a new process)
server = ModelServer.remote("/models/bert")
# Call methods (async, returns ObjectRef)
result = server.predict.remote(data)
print(ray.get(result))
Actor Options
@ray.remote(
num_cpus=1,
num_gpus=1,
max_restarts=3, # auto-restart on crash (-1 = infinite)
max_task_retries=-1, # retry methods on actor crash (-1 = infinite)
max_concurrency=10, # concurrent method execution (async actors)
lifetime="detached", # survives creator death, needs ray.kill()
name="my_server", # named actor, retrievable via ray.get_actor()
namespace="prod", # actor namespace
)
class MyActor: ...
Async Actors
For I/O-bound workloads, use async methods with max_concurrency:
@ray.remote(max_concurrency=10)
class AsyncWorker:
async def fetch(self, url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.text()
Named and Detached Actors
# Create a named, detached actor
counter = Counter.options(name="global_counter", lifetime="detached").remote()
# Retrieve from anywhere in the cluster
counter = ray.get_actor("global_counter")
ray.get(counter.get.remote())
# Cleanup
ray.kill(counter)
Object Store
Ray's distributed shared-memory object store (Plasma) passes data between tasks/actors.
# Explicitly put data into object store
large_data = ray.put(np.zeros((10000, 10000))) # returns ObjectRef
# Pass refs to tasks (zero-copy on same node)
result = process.remote(large_data) # no serialization if same node
# Get values back
value = ray.get(result)
# Wait for first N ready (non-blocking)
ready, not_ready = ray.wait(futures, num_returns=1, timeout=5.0)
Key Rules
- Objects are immutable once in the store β numpy arrays become read-only (copy before writing)
- Small returns (β€100KB) inlined directly to caller β no store overhead
- Large objects in shared memory β zero-copy reads on same node
- Object refs can be passed as arguments (lazy execution, no intermediate fetch)
- Garbage collected when no refs remain
- Serialization: Pickle protocol 5 + cloudpickle for lambdas/nested functions
- NumPy arrays use out-of-band data (Pickle 5) β stored directly in shared memory
Fault Tolerance
| Mechanism | Config | Behavior |
|---|---|---|
| Task retries | max_retries=3 | Retry on system failure (node death, OOM) |
| Exception retries | retry_exceptions=[Exc] | Also retry on specified app exceptions |
| Actor restart | max_restarts=3 | Reconstruct actor (reruns __init__) |
| Actor task retry | max_task_retries=-1 | Retry pending methods after actor restart |
| Object reconstruction | Automatic | Re-execute task that created a lost object |
At-least-once semantics: Retried tasks/methods may execute more than once. Use for idempotent or read-only workloads, or implement checkpointing for stateful actors.
Scheduling Strategies
from ray.util.scheduling_strategies import (
PlacementGroupSchedulingStrategy,
NodeAffinitySchedulingStrategy,
)
# DEFAULT β hybrid locality + load balancing (recommended for most cases)
task.options(scheduling_strategy="DEFAULT").remote()
# SPREAD β distribute across nodes evenly
task.options(scheduling_strategy="SPREAD").remote()
# Node affinity β pin to specific node
task.options(scheduling_strategy=NodeAffinitySchedulingStrategy(
node_id=ray.get_runtime_context().get_node_id(),
soft=False,
)).remote()
Placement Groups
Reserve resources atomically across nodes (gang scheduling):
from ray.util.placement_group import placement_group
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
# Reserve 4 GPU bundles, pack on fewest nodes
pg = placement_group([{"GPU": 1, "CPU": 4}] * 4, strategy="PACK")
ray.get(pg.ready()) # wait for resources
# Schedule on the placement group
@ray.remote(num_gpus=1)
def train_worker(rank): ...
futures = [
train_worker.options(
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg, placement_group_bundle_index=i
)
).remote(i)
for i in range(4)
]
| Strategy | Behavior |
|---|---|
PACK | Pack bundles on fewest nodes (locality) |
SPREAD | Spread bundles across nodes (fault tolerance) |
STRICT_PACK | All bundles on one node (fails if impossible) |
STRICT_SPREAD | Each bundle on different node (fails if impossible) |
Cross-Language Support
Ray supports Java and C++ tasks/actors that interoperate with Python:
# Call a Java actor from Python
java_actor = ray.cross_language.java_actor_class("com.example.MyActor").remote()
result = java_actor.method.remote(arg)
Generator Tasks
Stream results incrementally from a task using num_returns="dynamic". Ray returns a
DynamicObjectRefGenerator that yields one ObjectRef per iteration:
@ray.remote(num_returns="dynamic")
def generate_data(n):
for i in range(n):
yield i # yield the value directly β Ray stores it and returns an ObjectRef
gen_ref = generate_data.remote(100) # ObjectRef[DynamicObjectRefGenerator]
for ref in gen_ref: # iterate the ref directly
print(ray.get(ref))
Note: Do not yield ray.put(value) β returning ray.put() refs from tasks is an
anti-pattern that harms fault tolerance (see references/patterns.md).
References
patterns.mdβ Ray Core patterns and anti-patterns for tasks, actors, and object storetroubleshooting.mdβ Cluster startup failures, task and actor issues, memory problems, and networking
Cross-References
- ray-data β Distributed data processing with Ray Data
- ray-serve β Model serving with Ray Serve
- ray-train β Distributed training with Ray Train
- kuberay β Deploy Ray on Kubernetes
- gpu-operator β GPU driver and device plugin for Ray GPU tasks
- prometheus-grafana β Scrape Ray cluster Prometheus metrics
- nccl β NCCL for Ray collective communication operations
Compiled Graphs (Beta, Ray 2.44+)
Static execution model for high-performance multi-GPU workloads. Reduces task orchestration overhead from ~1ms (classic Ray) to <50ΞΌs by pre-allocating resources and using shared memory / NCCL channels.
Install: pip install "ray[cgraph]"
import ray, ray.dag
@ray.remote(num_gpus=1)
class Worker:
def forward(self, tensor):
return tensor * 2
a, b = Worker.remote(), Worker.remote()
# Define static DAG with InputNode
with ray.dag.InputNode() as inp:
dag = a.forward.bind(inp)
dag = b.forward.bind(dag)
# Compile: pre-allocates channels, prepares NCCL communicators
cdag = dag.experimental_compile()
# Execute: <50ΞΌs overhead per invocation
ref = cdag.execute(torch.zeros(1024, device="cuda"))
result = ray.get(ref)
cdag.teardown() # release resources, actors remain alive
GPU-to-GPU NCCL transport β annotate DAG edges to use NCCL instead of shared memory:
with ray.dag.InputNode() as inp:
dag = sender.send.bind(inp)
dag = dag.with_tensor_transport("nccl") # NCCL P2P transfer
dag = receiver.recv.bind(dag)
cdag = dag.experimental_compile()
When to use: LLM inference pipelines, disaggregated prefill/decode, tensor-parallel model serving, any workload that repeatedly executes the same actor graph with sub-millisecond latency requirements. Not for: dynamic task graphs, tasks that change structure per-invocation, or CPU-only workloads where 1ms overhead is acceptable.
See references/compiled-graphs.md for asyncio support, CPU-to-GPU transport hints, execution timeouts, failure semantics, and multi-output patterns.
Debugging
# Cluster resource status
ray status
# Dashboard (default port 8265)
# Expose dashboard via Ingress/Service on port 8265
# List tasks/actors
ray list tasks
ray list actors
ray summary actors
# Timeline profiling
ray timeline # generates chrome://tracing compatible JSON
# Inside a task/actor
ctx = ray.get_runtime_context()
ctx.get_node_id()
ctx.get_task_id()
ctx.get_actor_id()
ctx.get_job_id()
ctx.get_runtime_env()
Common Errors
| Error | Cause | Fix |
|---|---|---|
RayOutOfMemoryError | Object store full | Reduce object lifetimes, increase object_store_memory |
assignment destination is read-only | Writing to object-store numpy array | arr = arr.copy() before mutation |
ObjectLostError | Node holding object died | Enable task retries / object reconstruction |
ActorDiedError | Actor process crashed | Set max_restarts, check OOM / exceptions |
ModuleNotFoundError | Missing deps on worker | Use runtime_env with pip packages |
Patterns and Anti-Patterns
For distributed computing patterns and common anti-patterns, see references/patterns.md.
References
compiled-graphs.mdβ Compiled Graphs: asyncio, NCCL transport, CPU-GPU hints, timeouts, failure semantics, multi-output DAGspatterns.mdβ Distributed computing patterns and anti-patternstroubleshooting.mdβ Common errors and debugging
More by tylertitsworth
View alluv β fast Python package/project manager, lockfiles, Python versions, uvx tool runner, Docker/CI integration. Use for Python dependency management. NOT for package publishing.
TensorRT-LLM β engine building, quantization (FP8/FP4/INT4/AWQ), Python LLM API, AutoDeploy, KV cache tuning, in-flight batching, disaggregated serving with HTTP cluster management, Ray orchestrator, sparse attention (RocketKV), Triton backend. Use when optimizing directly with TRT-LLM. NOT for NIM deployment or vLLM/SGLang setup.
NVIDIA Triton Inference Server β model repository, config.pbtxt, ensemble/BLS pipelines, backends (TensorRT/ONNX/Python), dynamic batching, model management API, perf_analyzer. Use when serving models with Triton Inference Server. NOT for K8s deployment patterns. NOT for NIM.
KubeRay operator β RayCluster, RayJob, RayService, GPU scheduling, autoscaling, auth tokens, Label Selector API, GCS fault tolerance, TLS, observability, and Kueue/Volcano integration. Use when deploying Ray on Kubernetes. NOT for Ray Core programming (see ray-core).
