Batched Jobs#
run_async() is the tool for a single long-running job. When you
need to run the same function over many inputs, such as a hyperparameter
sweep, one job per dataset shard, an evaluation grid — wiring that up
by hand means a loop that calls run_async(), your own bookkeeping for
which handles are still live, your own error aggregation, your own
cleanup. run_async_map() is that loop, done for you.
You call run_async_map() on a @kinetic.run()-decorated function with a list of
inputs. It returns a single BatchHandle that represents the whole
collection: one place to observe progress, collect results in input
order, handle failures, cancel siblings, and tear everything down. The
underlying jobs are independent Kinetic jobs — each one gets a real
JobHandle, runs on its own pod, and writes its own artifacts to GCS.
This page builds on the single-job workflow covered in
Detached Jobs. Familiarity with JobHandle and the
PENDING/RUNNING/SUCCEEDED/FAILED/NOT_FOUND lifecycle is
assumed.
A first fan-out#
Pass a @kinetic.run()-decorated function and a list of inputs to
run_async_map(). It returns a BatchHandle immediately while jobs are
submitted in the background.
import kinetic
@kinetic.run(accelerator="tpu-v5e-1")
def train(lr):
import keras
model = keras.Sequential([keras.layers.Dense(64, activation="relu"),
keras.layers.Dense(1)])
model.compile(optimizer=keras.optimizers.Adam(learning_rate=lr), loss="mse")
history = model.fit(x_train, y_train, epochs=10, verbose=0)
return history.history["loss"][-1]
batch = train.run_async_map([0.001, 0.01, 0.1])
losses = batch.results()
print(losses) # [0.32, 0.28, 0.41] — one result per input, in order
Note
You must use run_async_map() to fan out. Calling the decorated function
directly will block until the job finishes and return the result directly,
so it cannot be used for concurrent execution of multiple inputs.
Input modes#
The input_mode parameter controls how each item in inputs is passed
to the function.
|
Item type |
How it’s called |
Example item |
|---|---|---|---|
|
|
|
|
|
|
|
|
|
anything else |
|
|
|
any |
|
always passed as a single arg |
|
|
|
|
|
|
|
|
Dict inputs (kwargs unpacking)#
When using "auto" mode, dicts with valid Python identifier keys are
unpacked as keyword arguments:
@kinetic.run(accelerator="tpu-v5e-1")
def train(lr, batch_size):
...
configs = [
{"lr": 0.001, "batch_size": 32},
{"lr": 0.01, "batch_size": 64},
]
batch = train.run_async_map(configs)
Preventing unpacking#
If your function takes a list or dict as a single argument, use
input_mode="single" to prevent automatic unpacking:
@kinetic.run(accelerator="cpu")
def process(items):
return sum(items)
batch = process.run_async_map([[1, 2, 3], [4, 5, 6]], input_mode="single")
Note
In "auto" mode, dicts with non-identifier keys (like
{"not-an-id": 1}) or Python keywords (like {"class": 1}) are passed
as a single positional argument rather than unpacked. Use
input_mode="kwargs" or input_mode="single" if you need explicit
control.
Monitoring a batch#
You can inspect progress at any time through the BatchHandle.
# Per-job status
for idx, status in batch.statuses():
print(f"Job {idx}: {status.value}")
# Aggregate counts
print(batch.status_counts())
# {'RUNNING': 2, 'SUCCEEDED': 1}
statuses() returns (index, JobStatus) pairs for each submitted job.
Slots that haven’t been submitted yet (when using bounded concurrency)
are skipped. Job statuses follow the same lifecycle as single jobs —
see Detached Jobs for details on PENDING, RUNNING,
SUCCEEDED, FAILED, and NOT_FOUND.
Collecting results#
results()#
The simplest way to collect all results. By default it blocks until every job finishes and returns results in input order.
# Input order (default)
losses = batch.results()
# losses[0] corresponds to inputs[0], losses[1] to inputs[1], etc.
For faster access to early finishers, use ordered=False to collect in
completion order:
losses = batch.results(ordered=False)
# Results appear in the order jobs finish, not input order
Parameters:
timeout(float | None, defaultNone): Maximum seconds to wait. RaisesTimeoutErrorif exceeded.ordered(bool, defaultTrue):Truereturns results aligned withinputs.Falsereturns results in the order jobs complete.cleanup(bool, defaultTrue): Delete each child’s Kubernetes resources and GCS artifacts after downloading its result. The group manifest is preserved soattach_batch()still works.return_exceptions(bool, defaultFalse): WhenTrue, failed positions contain the exception object instead of raisingBatchError. WhenFalse, any failure raisesBatchError.
Important
A TimeoutError does not cancel running jobs. They continue executing
on the cluster. Call batch.cancel() explicitly if you want to stop
them after a timeout.
as_completed()#
For processing results incrementally as jobs finish, use the
as_completed() iterator. It yields JobHandle objects in completion
order.
for job in batch.as_completed():
result = job.result()
print(f"{job.job_id} finished: {result}")
as_completed() streams results even while submission is still in
progress. With bounded concurrency, you can start processing the first
results before the last inputs have been submitted.
Parameters:
poll_interval(float, default5.0): Seconds between status polls.timeout(float | None, defaultNone): Maximum seconds to wait. RaisesTimeoutErrorif exceeded.
Handling failures#
When any job fails and return_exceptions=False (the default),
results() raises a BatchError.
try:
results = batch.results()
except kinetic.BatchError as e:
print(f"Batch {e.group_id}: {len(e.failures)} of {len(e.partial_results)} jobs failed")
for job in e.failures:
print(f" {job.job_id}: {job.status().value}")
# e.partial_results has results at successful positions, None at failed ones
BatchError provides three attributes:
group_id: The batch identifier.failures: List ofJobHandleobjects for the failed jobs.partial_results: A list aligned withinputswhere successful positions contain the result and failed positions containNone.
Tolerating failures#
Use return_exceptions=True to collect results without raising. Failed
positions contain the exception object.
results = batch.results(return_exceptions=True)
for i, r in enumerate(results):
if isinstance(r, Exception):
print(f"Job {i} failed: {r}")
else:
print(f"Job {i}: {r}")
Inspecting failures#
failures() returns handles for jobs with status FAILED. It
intentionally excludes NOT_FOUND because that status is ambiguous —
a job may be NOT_FOUND because its Kubernetes resources were cleaned
up, not because it failed. Use statuses() for finer-grained
inspection.
for job in batch.failures():
print(f"{job.job_id}: {job.tail(n=20)}")
Retries#
The retries parameter specifies how many additional attempts a job
gets after failure. The total number of attempts per input is
1 + retries.
batch = train.run_async_map(configs, retries=2)
# Each job gets up to 3 attempts (1 initial + 2 retries)
Retries are triggered when a job reaches
FAILEDorNOT_FOUNDstatus.Before each retry, Kinetic cleans up the previous attempt’s Kubernetes resources (GCS artifacts are preserved for debugging).
The group manifest tracks the attempt count per job, so
attach_batch()can distinguish retries from initial submissions.Submission errors (when the call to the function itself raises) are not retried. These are typically packaging or configuration errors that would fail again.
Note
When retries > 0, job submission runs in a background thread so
Kinetic can poll for failures and resubmit.
Concurrency control#
By default, run_async_map() limits the number of concurrently active
jobs to 64. Use max_concurrent to tune this.
# At most 8 jobs running at once
batch = train.run_async_map(configs, max_concurrent=8)
# Submit all jobs immediately (no concurrency limit)
batch = train.run_async_map(configs, max_concurrent=None)
Default:
64. New jobs are launched as running ones finish.None: All inputs are submitted immediately with no concurrency limit. When combined withretries=0(the default), submission happens synchronously in the calling thread beforemap()returns.Must be a positive integer when set. Passing
0or a negative value raisesValueError.
Note
Kinetic logs a warning when submitting more than 100 jobs with
max_concurrent=None, suggesting you set a limit to control resource
usage.
Cancellation and fail-fast#
Fail-fast behavior#
The fail_fast and cancel_running_on_fail parameters control what
happens when a job fails.
|
|
On first failure… |
|---|---|---|
|
|
All remaining jobs continue. Failures are collected at the end. |
|
|
No new jobs are launched. Already-running jobs continue to completion. |
|
|
No new jobs are launched. All running siblings are cancelled immediately. |
|
|
No effect. |
# Stop the batch as soon as any job fails, cancel all running siblings
batch = train.run_async_map(
configs,
fail_fast=True,
cancel_running_on_fail=True,
)
A “failure” here means either a submission error (
the call raised) or a runtime failure (the remote job reached
FAILED or NOT_FOUND status after exhausting retries).
Manual cancellation#
You can cancel all non-terminal jobs at any time, independent of the
fail_fast setting:
batch.cancel()
Cancellation deletes each job’s Kubernetes resource but preserves GCS artifacts for debugging.
Reattaching to a batch#
If your local process exits or you want to check on a batch from a
different machine, save the group_id and reattach later.
# Original session
batch = train.run_async_map(configs)
print(f"Batch ID: {batch.group_id}") # e.g., "grp-a1b2c3d4"
# Later, from any machine with access to the same GCP project
batch = kinetic.attach_batch("grp-a1b2c3d4")
results = batch.results()
attach_batch() downloads the group manifest from GCS and reconstructs
a JobHandle for each child. Index alignment is preserved: if the
original batch had 10 inputs and only 7 were submitted before a crash,
the returned batch.jobs list has 10 entries with None in the 3
unsubmitted slots.
Note
Kinetic logs a warning when a reattached batch has fewer children than expected, indicating partial submission.
Parameters:
group_id(str): The batch identifier (e.g.,"grp-a1b2c3d4").project(str | None, defaultNone): GCP project. Uses the default whenNone.cluster(str | None, defaultNone): GKE cluster name. Uses the default whenNone.
Cleanup#
There are two ways to clean up resources after a batch completes.
Automatic cleanup via results()#
By default, results(cleanup=True) deletes each child’s Kubernetes
resources and GCS artifacts after downloading its result. The group
manifest is preserved, so attach_batch() still works.
# Each child is cleaned up as its result is downloaded
results = batch.results() # cleanup=True is the default
Full teardown#
To delete everything — all children’s resources and the group manifest
itself — call cleanup() on the handle:
batch.cleanup(k8s=True, gcs=True)
Parameters:
k8s(bool, defaultTrue): Delete Kubernetes resources (Jobs/pods) for each child.gcs(bool, defaultTrue): Delete GCS artifacts for each child and the group manifest.
Important
After calling cleanup(gcs=True), the batch can no longer be reattached
via attach_batch() because the manifest has been deleted.
How it works#
Threading model#
When max_concurrent is set (the default is 64) or retries > 0,
run_async_map() launches a non-daemon background thread to manage
submissions. The thread polls active jobs for terminal states and
launches new ones as concurrency slots free up. The BatchHandle is
returned immediately.
When max_concurrent=None and retries=0, all jobs are submitted
synchronously in the calling thread before map() returns. No
background thread is created.
Manifest#
A JSON manifest is written to GCS before the first job is submitted. It
records the batch metadata (group ID, expected total, function name,
tags) and is updated after each successful submission with the child’s
job ID and attempt count. This enables crash recovery: attach_batch()
reads the manifest to determine which jobs were submitted and
reconstructs the handle.
Group ID#
Each batch gets a unique identifier in the format grp-{8-hex-chars}
(e.g., grp-a1b2c3d4). This ID is set on each child JobHandle as
group_id, along with group_kind="map" and the child’s group_index.
Submission errors#
If a call to the function itself raises (e.g., a packaging or validation
error), the exception is captured internally and the corresponding slot
in batch.jobs remains None. These errors are surfaced when you call
results() — either as entries in the BatchError.partial_results
list or as exception objects when return_exceptions=True.