Python API Reference#

Decorators#

kinetic.run(accelerator: str = 'tpu-v5e-1', container_image: str | None = None, base_image_repo: str | None = None, zone: str | None = None, project: str | None = None, capture_env_vars: list[str] | None = None, cluster: str | None = None, backend: str | None = None, namespace: str | None = None, volumes: dict[str, Data] | None = None, spot: bool = False, output_dir: str | None = None, debug: bool = False) Callable[[Callable[[...], Any]], Callable[[...], Any]][source]#

Execute function on remote TPU/GPU.

Parameters:
  • accelerator – TPU/GPU type (e.g., ‘tpu-v3-8’, ‘tpu-v5litepod-4’, ‘gpu-l4’, ‘gpu-a100’)

  • container_image – Controls the container image used for execution. None or “bundled” (default) builds a custom image with all dependencies baked in via Cloud Build. “prebuilt” uses a prebuilt base image and installs user requirements at pod startup via uv pip install. Any other string is treated as a custom container image URI.

  • base_image_repo – Docker Hub repository for prebuilt base images (e.g., “mycompany/kinetic”). Defaults to KINETIC_BASE_IMAGE_REPO env var, then “kinetic”. Only used when container_image is “prebuilt”.

  • zone – GCP zone. Falls back to KINETIC_ZONE, then the active profile’s zone (from ~/.kinetic/profiles.json), then ‘us-central1-a’.

  • project – GCP project. Falls back to KINETIC_PROJECT, then the active profile’s project, then GOOGLE_CLOUD_PROJECT.

  • capture_env_vars – List of environment variable names or patterns (ending in *) to propagate to the remote environment. Defaults to None.

  • cluster – GKE cluster name. Falls back to KINETIC_CLUSTER, then the active profile’s cluster, then the built-in default.

  • backend – Backend to use (‘gke’ or ‘pathways’)

  • namespace – Kubernetes namespace. Falls back to KINETIC_NAMESPACE, then the active profile’s namespace, then ‘default’.

  • volumes – Dict mapping absolute mount paths to Data objects, e.g. {“/data”: Data(“./dataset/”)}. Data is downloaded to these paths on the pod before function execution.

  • spot – If True, use preemptible/spot VMs for the job.

  • output_dir – GCS directory where job outputs should be saved. Propagated to the remote worker as the KINETIC_OUTPUT_DIR environment variable. Defaults to gs://{bucket_name}/outputs/{job_id}.

  • debug – If True, enable debugpy remote debugging. The pod will start a debugpy server and wait for a VS Code debugger to attach before executing the function. Port-forwarding is set up automatically.

Returns:

A decorator that returns a wrapper function. When called, the wrapper executes the function remotely and blocks until completion (sync mode). The wrapper also has the following methods:

  • run_async(*args, **kwargs): Submits the job for remote execution and returns a JobHandle immediately (async mode).

  • run_async_map(inputs, **kwargs): Fans out across accelerators for a collection of inputs, returning a BatchHandle.

Data API#

class kinetic.Data(path: str, fuse: bool = False, hf_trust_remote_code: bool = False)[source]#

Bases: object

A reference to data that should be available on the remote pod.

Wraps a local file/directory path or a GCS URI. When passed as a function argument or used in the volumes decorator parameter, Data is resolved to a plain filesystem path on the remote side. The user’s function code never needs to know about Data — it just receives paths.

By default, data is downloaded into the container before execution. Pass fuse=True to lazily mount data from GCS via the GCS FUSE CSI driver instead — useful for large datasets where only a subset of files are read at runtime.

Parameters:
  • path – Local file/directory path (absolute or relative) or GCS URI (gs://bucket/prefix).

  • fuse – If True, mount the data via GCS FUSE instead of downloading it. The data is read on demand — only files that are actually opened are fetched from cloud storage. Requires the GCS FUSE CSI driver addon on the GKE cluster (kinetic up enables it by default).

Note

For GCS URIs, a trailing slash indicates a directory (prefix). Data(“gs://my-bucket/dataset/”) is treated as a directory, while Data(“gs://my-bucket/dataset”) is treated as a single object. If you intend to reference a GCS directory, always include the trailing slash.

Examples:

# Local directory
Data("./my_dataset/")

# Local file
Data("./config.json")

# GCS directory — trailing slash required
Data("gs://my-bucket/datasets/imagenet/")

# GCS single object
Data("gs://my-bucket/datasets/weights.h5")

# FUSE-mounted directory (lazy loading)
Data("./large_dataset/", fuse=True)

# FUSE-mounted GCS data
Data("gs://my-bucket/datasets/imagenet/", fuse=True)

# Hugging Face Dataset (downloads on pod)
Data("hf://imdb?split=train")

# Hugging Face Dataset with remote code execution allowed
Data("hf://custom/repo", hf_trust_remote_code=True)
__init__(path: str, fuse: bool = False, hf_trust_remote_code: bool = False)[source]#
property path: str#
property fuse: bool#
property hf_trust_remote_code: bool#
property is_gcs: bool#
property is_hf: bool#
property is_dir: bool#
content_hash() str[source]#

SHA-256 hash of all file contents in deterministic order.

Uses two-level hashing for parallelism: each file is hashed independently (SHA-256 of relpath + contents), then per-file digests are combined in sorted-walk (DFS) order into a final hash.

Includes a type prefix (“dir:” or “file:”) to prevent collisions between a single file and a directory containing only that file.

Symlinked directories are not recursed into (followlinks=False) to prevent infinite recursion from circular symlinks. Symlinked files are read and their resolved contents are hashed, so the hash reflects the actual data visible at runtime.

Detached Jobs#

class kinetic.JobHandle(job_id: str, backend: str, project: str, cluster_name: str, zone: str, namespace: str, bucket_name: str, k8s_name: str, image_uri: str, accelerator: str, func_name: str, display_name: str, created_at: str, group_id: str | None = None, group_kind: str | None = None, group_index: int | None = None, debug: bool = False)[source]#

Bases: object

Durable description of a submitted remote job.

All fields are JSON-serializable strings. No func object or closure state is stored — only the metadata needed to observe, collect, and clean up the job from any machine.

job_id: str#
backend: str#
project: str#
cluster_name: str#
zone: str#
namespace: str#
bucket_name: str#
k8s_name: str#
image_uri: str#
accelerator: str#
func_name: str#
display_name: str#
created_at: str#
group_id: str | None = None#
group_kind: str | None = None#
group_index: int | None = None#
debug: bool = False#
classmethod from_job_context(ctx, backend_name: str, namespace: str, k8s_name: str) JobHandle[source]#

Build a JobHandle from a live JobContext.

classmethod from_dict(d: dict[str, Any]) JobHandle[source]#

Reconstruct a JobHandle from a plain dict.

Unknown keys are silently ignored so that handles persisted by a future version (with extra fields) can still be loaded.

to_dict() dict[str, str][source]#

Serialize the handle to a JSON-safe payload.

status() JobStatus[source]#

Return the current execution status of the job.

logs(follow: bool = False) str | None[source]#

Return logs or stream them to stdout until the job terminates.

tail(n: int = 100) str[source]#

Return the last n log lines from the active pod.

debug_attach(local_port: int = 5678, working_dir: str | None = None) Popen[source]#

Wait for debugpy, start port-forward, and print VS Code config.

Returns the port-forward subprocess so the caller can manage its lifecycle (e.g. terminate it after result() completes).

Parameters:
  • local_port – Local port to forward debugpy traffic to.

  • working_dir – Local working directory for VS Code path mappings. If None, a placeholder is used.

Returns:

The subprocess.Popen handle for the kubectl port-forward process. The caller should call kinetic.debug.cleanup_port_forward(proc) when done.

result(timeout: float | None = None, cleanup: bool | None = None, cleanup_timeout: float = 180, cleanup_poll_interval: float = 2, stream_logs: bool | None = None, on_status_change: Callable[[JobStatus], None] | None = None) Any[source]#

Wait for the job result and return it or re-raise the user exception.

Parameters:
  • timeout – Maximum seconds to wait. None means wait forever. If reached, TimeoutError is raised but the job keeps running and the handle remains valid.

  • cleanup – When True, delete the k8s resource and GCS artifacts after a result payload is successfully downloaded. Defaults to True for normal jobs and False for debug jobs.

  • cleanup_timeout – Maximum seconds to wait for the k8s resource deletion to be confirmed.

  • cleanup_poll_interval – Seconds between deletion-confirmation polls.

  • stream_logs – When True, stream live pod logs to the terminal while waiting for the job to complete. Defaults to False for debug jobs to avoid Rich panel conflicts.

  • on_status_change – Optional callback invoked with the new JobStatus each time the polled status differs from the previous one, including the first observation and the final terminal status. Exceptions raised by the callback are logged and swallowed so they never break result collection.

Returns:

The function’s return value.

Raises:
  • TimeoutError – If timeout is exceeded.

  • RuntimeError – If the job failed without uploading a result.

  • Exception – Re-raised from the remote function on user failure.

cancel(cleanup_timeout: float = 180, cleanup_poll_interval: float = 2) None[source]#

Cancel the running job by deleting its Kubernetes resource.

cleanup(k8s: bool = True, gcs: bool = True, cleanup_timeout: float = 180, cleanup_poll_interval: float = 2) None[source]#

Clean up Kubernetes resources and/or uploaded GCS artifacts.

Parameters:
  • k8s – Delete the Kubernetes job/LWS resource.

  • gcs – Delete uploaded GCS artifacts.

  • cleanup_timeout – Maximum seconds to wait for the k8s resource deletion to be confirmed.

  • cleanup_poll_interval – Seconds between deletion-confirmation polls.

__init__(job_id: str, backend: str, project: str, cluster_name: str, zone: str, namespace: str, bucket_name: str, k8s_name: str, image_uri: str, accelerator: str, func_name: str, display_name: str, created_at: str, group_id: str | None = None, group_kind: str | None = None, group_index: int | None = None, debug: bool = False) None#
kinetic.attach(job_id: str, project: str | None = None, cluster: str | None = None) JobHandle[source]#

Reconstruct a persisted handle from GCS.

Parameters:
  • job_id – The job identifier (e.g. “job-a1b2c3d4”).

  • project – GCP project. Falls back to KINETIC_PROJECT, then the active profile’s project, then GOOGLE_CLOUD_PROJECT.

  • cluster – GKE cluster name. Falls back to KINETIC_CLUSTER, then the active profile’s cluster, then the built-in default.

Returns:

A hydrated JobHandle ready for status(), result(), etc.

kinetic.list_jobs(project: str | None = None, zone: str | None = None, cluster: str | None = None, namespace: str | None = None) list[JobHandle][source]#

List live jobs by hydrating durable handles from discovered k8s jobs.

Queries Kubernetes for GKE Jobs and Pathways LWS resources that carry the app=kinetic / app=kinetic-pathways labels, then downloads each job’s handle.json from GCS. Jobs whose handle.json is missing are skipped with a warning.

Each field falls back through KINETIC_* env vars, the active profile, and finally the built-in defaults — matching kinetic.run.

Batched Jobs#

class kinetic.BatchHandle(group_id: str, name: str | None, tags: dict[str, str], jobs: list[~kinetic.jobs.JobHandle | None], _bucket_name: str = '', _project: str = '', _submission_complete: ~threading.Event = <factory>, _submission_error: BaseException | None = None, _lock: ~_thread.lock = <factory>, _submission_errors: dict[int, Exception] = <factory>, _cached_failures: list[~kinetic.jobs.JobHandle] | None = None)[source]#

Bases: object

Handle for a collection of submitted jobs.

Created by run_async_map() or reconstructed by kinetic.attach_batch(). Provides collection-level observation, result gathering, and cleanup.

group_id: str#
name: str | None#
tags: dict[str, str]#
jobs: list[JobHandle | None]#
statuses() list[tuple[int, JobStatus]][source]#

Return (index, status) for each submitted job.

status_counts() dict[str, int][source]#

Return a count of jobs in each status.

wait(*, timeout: float | None = None) None[source]#

Block until all jobs reach a terminal state.

as_completed(*, poll_interval: float = 5.0, timeout: float | None = None) Iterator[JobHandle][source]#

Yield jobs as they reach terminal states, in completion order.

Unlike the simple approach of waiting for all submissions first, this streams results as soon as each job reaches a terminal state — even while more inputs are still being submitted.

Parameters:
  • poll_interval – Seconds between status polls.

  • timeout – Maximum seconds to wait. Raises TimeoutError if exceeded.

results(*, timeout: float | None = None, ordered: bool = True, cleanup: bool = True, return_exceptions: bool = False) list[Any][source]#

Collect results from all jobs.

Parameters:
  • timeout – Maximum seconds to wait for all jobs.

  • ordered – If True, return in input order. If False, return in completion order.

  • cleanup – If True, clean up each child’s K8s and GCS resources (the group manifest is preserved). Note that cleaning up causes failures() to return an empty list as job statuses become NOT_FOUND.

  • return_exceptions – If True, failed positions contain the exception object. If False, raise BatchError on any failure.

Returns:

List of results (input order when ordered=True, completion order otherwise).

failures() list[JobHandle][source]#

Return handles for jobs that failed.

Only includes jobs whose status is FAILED. Jobs that are NOT_FOUND (e.g. after cleanup) are excluded because the status is ambiguous — use statuses() for finer control.

After results() has been called, this returns the cached failure list from that collection pass, so it remains accurate even if cleanup has deleted K8s resources.

See also

submission_failures: returns per-input errors for inputs that failed at submission time (jobs[idx] is None).

property submission_failures: dict[int, Exception]#

Return a copy of per-input submission errors (index -> exception).

These are inputs where the submission itself failed (e.g. validation error, network error). The corresponding jobs[idx] slot is None. These errors are included in results() output but are not reflected by failures() which only inspects live job statuses.

cancel() None[source]#

Cancel all non-terminal jobs in the collection.

cleanup(*, k8s: bool = True, gcs: bool = True) None[source]#

Clean up all jobs and optionally the group manifest.

Parameters:
  • k8s – Delete K8s resources for each child.

  • gcs – Delete GCS artifacts for each child and the group manifest.

__init__(group_id: str, name: str | None, tags: dict[str, str], jobs: list[~kinetic.jobs.JobHandle | None], _bucket_name: str = '', _project: str = '', _submission_complete: ~threading.Event = <factory>, _submission_error: BaseException | None = None, _lock: ~_thread.lock = <factory>, _submission_errors: dict[int, Exception] = <factory>, _cached_failures: list[~kinetic.jobs.JobHandle] | None = None) None#
class kinetic.BatchError(group_id: str, failures: list[JobHandle], partial_results: list[Any])[source]#

Bases: Exception

Raised when a batch collection has failed children.

group_id#

The collection’s group identifier.

failures#

List of JobHandles for failed children.

partial_results#

List where successful positions contain the result and failed positions contain None.

__init__(group_id: str, failures: list[JobHandle], partial_results: list[Any])[source]#
kinetic.attach_batch(group_id: str, project: str | None = None, cluster: str | None = None, poll_interval: float = 10.0, poll_timeout: float | None = None) BatchHandle[source]#

Reattach to an existing batch collection by group_id.

Downloads the group manifest from GCS, reconstructs JobHandle objects for each child, and returns a fully usable BatchHandle.

If the manifest has fewer children than total_expected (i.e. the original map() is still submitting), the returned handle polls the manifest in a background thread until all children appear or poll_timeout is reached.

Parameters:
  • group_id – The collection identifier (e.g. “grp-a1b2c3d4”).

  • project – GCP project. Falls back to KINETIC_PROJECT, then the active profile’s project, then GOOGLE_CLOUD_PROJECT.

  • cluster – GKE cluster name. Falls back to KINETIC_CLUSTER, then the active profile’s cluster, then the built-in default.

  • poll_interval – Seconds between manifest polls when the batch is partially submitted.

  • poll_timeout – Maximum seconds to poll for remaining children. None means poll indefinitely.

Returns:

A hydrated BatchHandle ready for results(), etc.

Job Status#

class kinetic.job_status.JobStatus(*values)[source]#

Bases: Enum

Observable status of a remote job.

PENDING = 'PENDING'#
RUNNING = 'RUNNING'#
SUCCEEDED = 'SUCCEEDED'#
FAILED = 'FAILED'#
NOT_FOUND = 'NOT_FOUND'#