Agent SkillsAgent Skills
tylertitsworth

ray-core

@tylertitsworth/ray-core
tylertitsworth
0
0 forks
Updated 4/1/2026
View on GitHub

Ray Core β€” @ray.remote tasks, actors, object store, Compiled Graphs (high-performance multi-GPU DAGs with NCCL), distributed patterns, fault tolerance, placement groups, and debugging. Use when writing distributed Python with Ray. NOT for Ray Serve/Train/Data (see those skills).

Installation

$npx agent-skills-cli install @tylertitsworth/ray-core
Claude Code
Cursor
Copilot
Codex
Antigravity

Details

Pathray-core/SKILL.md
Branchmain
Scoped Name@tylertitsworth/ray-core

Usage

After installing, this skill will be available to your AI coding assistant.

Verify installation:

npx agent-skills-cli list

Skill Instructions


name: ray-core description: "Ray Core β€” @ray.remote tasks, actors, object store, Compiled Graphs (high-performance multi-GPU DAGs with NCCL), distributed patterns, fault tolerance, placement groups, and debugging. Use when writing distributed Python with Ray. NOT for Ray Serve/Train/Data (see those skills)."

Ray Core

Distributed computing primitives for Python. Three building blocks: tasks, actors, and objects.

Docs: https://docs.ray.io/en/latest/ray-core/walkthrough.html
Version: Ray 2.54.0 β€” Resource isolation (cgroup support in ray.init()), CUDA IPC transport for RDT, tensor-level deduplication for NIXL, TPU v7x support.

Initialization

import ray

# Local mode
ray.init()

# Connect to existing cluster
ray.init(address="ray://head-svc:10001")  # via Ray client
ray.init(address="auto")                   # auto-detect (inside Ray pod)

# Configuration
ray.init(
    num_cpus=8,                    # override detected CPUs (local only)
    num_gpus=2,                    # override detected GPUs
    object_store_memory=10**10,    # 10GB object store
    runtime_env={                  # per-job dependencies
        "pip": ["torch==2.5.0"],
        "env_vars": {"KEY": "val"},
        "working_dir": "./src",
    },
    logging_level="INFO",
    dashboard_host="0.0.0.0",
)

Runtime Environments

Install dependencies dynamically per-job, per-task, or per-actor:

# Per-job (in ray.init)
ray.init(runtime_env={"pip": ["transformers"], "env_vars": {"HF_HOME": "/cache"}})

# Per-task override
@ray.remote(runtime_env={"pip": ["scipy"]})
def special_task(): ...

# Per-actor override
actor = MyActor.options(runtime_env={"pip": ["xgboost"]}).remote()

Fields: pip, conda, env_vars, working_dir, py_modules, excludes, container.

Tasks

Stateless remote functions. Decorate with @ray.remote, call with .remote():

import ray

@ray.remote
def train_model(config):
    # runs on a remote worker
    return {"loss": 0.1, "config": config}

# Launch 4 tasks in parallel
futures = [train_model.remote({"lr": lr}) for lr in [0.1, 0.01, 0.001, 0.0001]]
results = ray.get(futures)  # blocks until all complete

Resource Requirements

# CPU only (default: 1 CPU)
@ray.remote(num_cpus=4)
def cpu_task(): ...

# GPU task
@ray.remote(num_gpus=1)
def gpu_task(): ...

# Multiple GPUs + memory
@ray.remote(num_gpus=2, memory=16 * 1024**3)  # memory in bytes
def large_gpu_task(): ...

# Custom resources
@ray.remote(resources={"TPU": 1})
def tpu_task(): ...

# Override at call time
gpu_task.options(num_gpus=2).remote()

Task Options

@ray.remote(
    num_cpus=1,
    num_gpus=0,
    memory=0,                    # bytes, 0 = no reservation
    max_retries=3,               # retries on system failure (-1 = infinite)
    retry_exceptions=[ValueError],  # also retry on these exceptions
    num_returns=2,               # multiple return values
    scheduling_strategy="SPREAD",
)
def my_task(): ...

Actors

Stateful distributed objects. Each actor runs in its own process, methods execute serially:

@ray.remote(num_gpus=1)
class ModelServer:
    def __init__(self, model_path):
        self.model = load_model(model_path)
        self.request_count = 0

    def predict(self, batch):
        self.request_count += 1
        return self.model(batch)

    def get_stats(self):
        return {"requests": self.request_count}

# Create actor (starts a new process)
server = ModelServer.remote("/models/bert")

# Call methods (async, returns ObjectRef)
result = server.predict.remote(data)
print(ray.get(result))

Actor Options

@ray.remote(
    num_cpus=1,
    num_gpus=1,
    max_restarts=3,          # auto-restart on crash (-1 = infinite)
    max_task_retries=-1,     # retry methods on actor crash (-1 = infinite)
    max_concurrency=10,      # concurrent method execution (async actors)
    lifetime="detached",     # survives creator death, needs ray.kill()
    name="my_server",        # named actor, retrievable via ray.get_actor()
    namespace="prod",        # actor namespace
)
class MyActor: ...

Async Actors

For I/O-bound workloads, use async methods with max_concurrency:

@ray.remote(max_concurrency=10)
class AsyncWorker:
    async def fetch(self, url):
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as resp:
                return await resp.text()

Named and Detached Actors

# Create a named, detached actor
counter = Counter.options(name="global_counter", lifetime="detached").remote()

# Retrieve from anywhere in the cluster
counter = ray.get_actor("global_counter")
ray.get(counter.get.remote())

# Cleanup
ray.kill(counter)

Object Store

Ray's distributed shared-memory object store (Plasma) passes data between tasks/actors.

# Explicitly put data into object store
large_data = ray.put(np.zeros((10000, 10000)))  # returns ObjectRef

# Pass refs to tasks (zero-copy on same node)
result = process.remote(large_data)  # no serialization if same node

# Get values back
value = ray.get(result)

# Wait for first N ready (non-blocking)
ready, not_ready = ray.wait(futures, num_returns=1, timeout=5.0)

Key Rules

  • Objects are immutable once in the store β€” numpy arrays become read-only (copy before writing)
  • Small returns (≀100KB) inlined directly to caller β€” no store overhead
  • Large objects in shared memory β€” zero-copy reads on same node
  • Object refs can be passed as arguments (lazy execution, no intermediate fetch)
  • Garbage collected when no refs remain
  • Serialization: Pickle protocol 5 + cloudpickle for lambdas/nested functions
  • NumPy arrays use out-of-band data (Pickle 5) β€” stored directly in shared memory

Fault Tolerance

MechanismConfigBehavior
Task retriesmax_retries=3Retry on system failure (node death, OOM)
Exception retriesretry_exceptions=[Exc]Also retry on specified app exceptions
Actor restartmax_restarts=3Reconstruct actor (reruns __init__)
Actor task retrymax_task_retries=-1Retry pending methods after actor restart
Object reconstructionAutomaticRe-execute task that created a lost object

At-least-once semantics: Retried tasks/methods may execute more than once. Use for idempotent or read-only workloads, or implement checkpointing for stateful actors.

Scheduling Strategies

from ray.util.scheduling_strategies import (
    PlacementGroupSchedulingStrategy,
    NodeAffinitySchedulingStrategy,
)

# DEFAULT β€” hybrid locality + load balancing (recommended for most cases)
task.options(scheduling_strategy="DEFAULT").remote()

# SPREAD β€” distribute across nodes evenly
task.options(scheduling_strategy="SPREAD").remote()

# Node affinity β€” pin to specific node
task.options(scheduling_strategy=NodeAffinitySchedulingStrategy(
    node_id=ray.get_runtime_context().get_node_id(),
    soft=False,
)).remote()

Placement Groups

Reserve resources atomically across nodes (gang scheduling):

from ray.util.placement_group import placement_group
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy

# Reserve 4 GPU bundles, pack on fewest nodes
pg = placement_group([{"GPU": 1, "CPU": 4}] * 4, strategy="PACK")
ray.get(pg.ready())  # wait for resources

# Schedule on the placement group
@ray.remote(num_gpus=1)
def train_worker(rank): ...

futures = [
    train_worker.options(
        scheduling_strategy=PlacementGroupSchedulingStrategy(
            placement_group=pg, placement_group_bundle_index=i
        )
    ).remote(i)
    for i in range(4)
]
StrategyBehavior
PACKPack bundles on fewest nodes (locality)
SPREADSpread bundles across nodes (fault tolerance)
STRICT_PACKAll bundles on one node (fails if impossible)
STRICT_SPREADEach bundle on different node (fails if impossible)

Cross-Language Support

Ray supports Java and C++ tasks/actors that interoperate with Python:

# Call a Java actor from Python
java_actor = ray.cross_language.java_actor_class("com.example.MyActor").remote()
result = java_actor.method.remote(arg)

Generator Tasks

Stream results incrementally from a task using num_returns="dynamic". Ray returns a DynamicObjectRefGenerator that yields one ObjectRef per iteration:

@ray.remote(num_returns="dynamic")
def generate_data(n):
    for i in range(n):
        yield i  # yield the value directly β€” Ray stores it and returns an ObjectRef

gen_ref = generate_data.remote(100)  # ObjectRef[DynamicObjectRefGenerator]
for ref in gen_ref:                   # iterate the ref directly
    print(ray.get(ref))

Note: Do not yield ray.put(value) β€” returning ray.put() refs from tasks is an anti-pattern that harms fault tolerance (see references/patterns.md).

References

  • patterns.md β€” Ray Core patterns and anti-patterns for tasks, actors, and object store
  • troubleshooting.md β€” Cluster startup failures, task and actor issues, memory problems, and networking

Cross-References

  • ray-data β€” Distributed data processing with Ray Data
  • ray-serve β€” Model serving with Ray Serve
  • ray-train β€” Distributed training with Ray Train
  • kuberay β€” Deploy Ray on Kubernetes
  • gpu-operator β€” GPU driver and device plugin for Ray GPU tasks
  • prometheus-grafana β€” Scrape Ray cluster Prometheus metrics
  • nccl β€” NCCL for Ray collective communication operations

Compiled Graphs (Beta, Ray 2.44+)

Static execution model for high-performance multi-GPU workloads. Reduces task orchestration overhead from ~1ms (classic Ray) to <50ΞΌs by pre-allocating resources and using shared memory / NCCL channels.

Install: pip install "ray[cgraph]"

import ray, ray.dag

@ray.remote(num_gpus=1)
class Worker:
    def forward(self, tensor):
        return tensor * 2

a, b = Worker.remote(), Worker.remote()

# Define static DAG with InputNode
with ray.dag.InputNode() as inp:
    dag = a.forward.bind(inp)
    dag = b.forward.bind(dag)

# Compile: pre-allocates channels, prepares NCCL communicators
cdag = dag.experimental_compile()

# Execute: <50ΞΌs overhead per invocation
ref = cdag.execute(torch.zeros(1024, device="cuda"))
result = ray.get(ref)

cdag.teardown()  # release resources, actors remain alive

GPU-to-GPU NCCL transport β€” annotate DAG edges to use NCCL instead of shared memory:

with ray.dag.InputNode() as inp:
    dag = sender.send.bind(inp)
    dag = dag.with_tensor_transport("nccl")  # NCCL P2P transfer
    dag = receiver.recv.bind(dag)
cdag = dag.experimental_compile()

When to use: LLM inference pipelines, disaggregated prefill/decode, tensor-parallel model serving, any workload that repeatedly executes the same actor graph with sub-millisecond latency requirements. Not for: dynamic task graphs, tasks that change structure per-invocation, or CPU-only workloads where 1ms overhead is acceptable.

See references/compiled-graphs.md for asyncio support, CPU-to-GPU transport hints, execution timeouts, failure semantics, and multi-output patterns.

Debugging

# Cluster resource status
ray status

# Dashboard (default port 8265)
# Expose dashboard via Ingress/Service on port 8265

# List tasks/actors
ray list tasks
ray list actors
ray summary actors

# Timeline profiling
ray timeline  # generates chrome://tracing compatible JSON
# Inside a task/actor
ctx = ray.get_runtime_context()
ctx.get_node_id()
ctx.get_task_id()
ctx.get_actor_id()
ctx.get_job_id()
ctx.get_runtime_env()

Common Errors

ErrorCauseFix
RayOutOfMemoryErrorObject store fullReduce object lifetimes, increase object_store_memory
assignment destination is read-onlyWriting to object-store numpy arrayarr = arr.copy() before mutation
ObjectLostErrorNode holding object diedEnable task retries / object reconstruction
ActorDiedErrorActor process crashedSet max_restarts, check OOM / exceptions
ModuleNotFoundErrorMissing deps on workerUse runtime_env with pip packages

Patterns and Anti-Patterns

For distributed computing patterns and common anti-patterns, see references/patterns.md.

References

  • compiled-graphs.md β€” Compiled Graphs: asyncio, NCCL transport, CPU-GPU hints, timeouts, failure semantics, multi-output DAGs
  • patterns.md β€” Distributed computing patterns and anti-patterns
  • troubleshooting.md β€” Common errors and debugging