Batched Jobs#

run_async() is the tool for a single long-running job. When you need to run the same function over many inputs, such as a hyperparameter sweep, one job per dataset shard, an evaluation grid — wiring that up by hand means a loop that calls run_async(), your own bookkeeping for which handles are still live, your own error aggregation, your own cleanup. run_async_map() is that loop, done for you.

You call run_async_map() on a @kinetic.run()-decorated function with a list of inputs. It returns a single BatchHandle that represents the whole collection: one place to observe progress, collect results in input order, handle failures, cancel siblings, and tear everything down. The underlying jobs are independent Kinetic jobs — each one gets a real JobHandle, runs on its own pod, and writes its own artifacts to GCS.

This page builds on the single-job workflow covered in Detached Jobs. Familiarity with JobHandle and the PENDING/RUNNING/SUCCEEDED/FAILED/NOT_FOUND lifecycle is assumed.

A first fan-out#

Pass a @kinetic.run()-decorated function and a list of inputs to run_async_map(). It returns a BatchHandle immediately while jobs are submitted in the background.

import kinetic

@kinetic.run(accelerator="tpu-v5e-1")
def train(lr):
    import keras
    model = keras.Sequential([keras.layers.Dense(64, activation="relu"),
                              keras.layers.Dense(1)])
    model.compile(optimizer=keras.optimizers.Adam(learning_rate=lr), loss="mse")
    history = model.fit(x_train, y_train, epochs=10, verbose=0)
    return history.history["loss"][-1]

batch = train.run_async_map([0.001, 0.01, 0.1])
losses = batch.results()
print(losses)  # [0.32, 0.28, 0.41] — one result per input, in order

Note

You must use run_async_map() to fan out. Calling the decorated function directly will block until the job finishes and return the result directly, so it cannot be used for concurrent execution of multiple inputs.

Input modes#

The input_mode parameter controls how each item in inputs is passed to the function.

input_mode

Item type

How it’s called

Example item

"auto" (default)

dict with valid identifier keys

fn(**item)

{"lr": 0.01, "wd": 1e-4}

"auto" (default)

list or tuple

fn(*item)

[0.01, 32]

"auto" (default)

anything else

fn(item)

0.01

"single"

any

fn(item)

always passed as a single arg

"args"

list or tuple (required)

fn(*item)

[0.01, 32]

"kwargs"

dict (required)

fn(**item)

{"lr": 0.01}

Dict inputs (kwargs unpacking)#

When using "auto" mode, dicts with valid Python identifier keys are unpacked as keyword arguments:

@kinetic.run(accelerator="tpu-v5e-1")
def train(lr, batch_size):
    ...

configs = [
    {"lr": 0.001, "batch_size": 32},
    {"lr": 0.01,  "batch_size": 64},
]
batch = train.run_async_map(configs)

Preventing unpacking#

If your function takes a list or dict as a single argument, use input_mode="single" to prevent automatic unpacking:

@kinetic.run(accelerator="cpu")
def process(items):
    return sum(items)

batch = process.run_async_map([[1, 2, 3], [4, 5, 6]], input_mode="single")

Note

In "auto" mode, dicts with non-identifier keys (like {"not-an-id": 1}) or Python keywords (like {"class": 1}) are passed as a single positional argument rather than unpacked. Use input_mode="kwargs" or input_mode="single" if you need explicit control.

Monitoring a batch#

You can inspect progress at any time through the BatchHandle.

# Per-job status
for idx, status in batch.statuses():
    print(f"Job {idx}: {status.value}")

# Aggregate counts
print(batch.status_counts())
# {'RUNNING': 2, 'SUCCEEDED': 1}

statuses() returns (index, JobStatus) pairs for each submitted job. Slots that haven’t been submitted yet (when using bounded concurrency) are skipped. Job statuses follow the same lifecycle as single jobs — see Detached Jobs for details on PENDING, RUNNING, SUCCEEDED, FAILED, and NOT_FOUND.

Collecting results#

results()#

The simplest way to collect all results. By default it blocks until every job finishes and returns results in input order.

# Input order (default)
losses = batch.results()
# losses[0] corresponds to inputs[0], losses[1] to inputs[1], etc.

For faster access to early finishers, use ordered=False to collect in completion order:

losses = batch.results(ordered=False)
# Results appear in the order jobs finish, not input order

Parameters:

  • timeout (float | None, default None): Maximum seconds to wait. Raises TimeoutError if exceeded.

  • ordered (bool, default True): True returns results aligned with inputs. False returns results in the order jobs complete.

  • cleanup (bool, default True): Delete each child’s Kubernetes resources and GCS artifacts after downloading its result. The group manifest is preserved so attach_batch() still works.

  • return_exceptions (bool, default False): When True, failed positions contain the exception object instead of raising BatchError. When False, any failure raises BatchError.

Important

A TimeoutError does not cancel running jobs. They continue executing on the cluster. Call batch.cancel() explicitly if you want to stop them after a timeout.

as_completed()#

For processing results incrementally as jobs finish, use the as_completed() iterator. It yields JobHandle objects in completion order.

for job in batch.as_completed():
    result = job.result()
    print(f"{job.job_id} finished: {result}")

as_completed() streams results even while submission is still in progress. With bounded concurrency, you can start processing the first results before the last inputs have been submitted.

Parameters:

  • poll_interval (float, default 5.0): Seconds between status polls.

  • timeout (float | None, default None): Maximum seconds to wait. Raises TimeoutError if exceeded.

Handling failures#

When any job fails and return_exceptions=False (the default), results() raises a BatchError.

try:
    results = batch.results()
except kinetic.BatchError as e:
    print(f"Batch {e.group_id}: {len(e.failures)} of {len(e.partial_results)} jobs failed")
    for job in e.failures:
        print(f"  {job.job_id}: {job.status().value}")
    # e.partial_results has results at successful positions, None at failed ones

BatchError provides three attributes:

  • group_id: The batch identifier.

  • failures: List of JobHandle objects for the failed jobs.

  • partial_results: A list aligned with inputs where successful positions contain the result and failed positions contain None.

Tolerating failures#

Use return_exceptions=True to collect results without raising. Failed positions contain the exception object.

results = batch.results(return_exceptions=True)
for i, r in enumerate(results):
    if isinstance(r, Exception):
        print(f"Job {i} failed: {r}")
    else:
        print(f"Job {i}: {r}")

Inspecting failures#

failures() returns handles for jobs with status FAILED. It intentionally excludes NOT_FOUND because that status is ambiguous — a job may be NOT_FOUND because its Kubernetes resources were cleaned up, not because it failed. Use statuses() for finer-grained inspection.

for job in batch.failures():
    print(f"{job.job_id}: {job.tail(n=20)}")

Retries#

The retries parameter specifies how many additional attempts a job gets after failure. The total number of attempts per input is 1 + retries.

batch = train.run_async_map(configs, retries=2)
# Each job gets up to 3 attempts (1 initial + 2 retries)
  • Retries are triggered when a job reaches FAILED or NOT_FOUND status.

  • Before each retry, Kinetic cleans up the previous attempt’s Kubernetes resources (GCS artifacts are preserved for debugging).

  • The group manifest tracks the attempt count per job, so attach_batch() can distinguish retries from initial submissions.

  • Submission errors (when the call to the function itself raises) are not retried. These are typically packaging or configuration errors that would fail again.

Note

When retries > 0, job submission runs in a background thread so Kinetic can poll for failures and resubmit.

Concurrency control#

By default, run_async_map() limits the number of concurrently active jobs to 64. Use max_concurrent to tune this.

# At most 8 jobs running at once
batch = train.run_async_map(configs, max_concurrent=8)
# Submit all jobs immediately (no concurrency limit)
batch = train.run_async_map(configs, max_concurrent=None)
  • Default: 64. New jobs are launched as running ones finish.

  • None: All inputs are submitted immediately with no concurrency limit. When combined with retries=0 (the default), submission happens synchronously in the calling thread before map() returns.

  • Must be a positive integer when set. Passing 0 or a negative value raises ValueError.

Note

Kinetic logs a warning when submitting more than 100 jobs with max_concurrent=None, suggesting you set a limit to control resource usage.

Cancellation and fail-fast#

Fail-fast behavior#

The fail_fast and cancel_running_on_fail parameters control what happens when a job fails.

fail_fast

cancel_running_on_fail

On first failure…

False (default)

False (default)

All remaining jobs continue. Failures are collected at the end.

True

False

No new jobs are launched. Already-running jobs continue to completion.

True

True

No new jobs are launched. All running siblings are cancelled immediately.

False

True

No effect. cancel_running_on_fail only takes effect when fail_fast=True.

# Stop the batch as soon as any job fails, cancel all running siblings
batch = train.run_async_map(
    configs,
    fail_fast=True,
    cancel_running_on_fail=True,
)

A “failure” here means either a submission error ( the call raised) or a runtime failure (the remote job reached FAILED or NOT_FOUND status after exhausting retries).

Manual cancellation#

You can cancel all non-terminal jobs at any time, independent of the fail_fast setting:

batch.cancel()

Cancellation deletes each job’s Kubernetes resource but preserves GCS artifacts for debugging.

Reattaching to a batch#

If your local process exits or you want to check on a batch from a different machine, save the group_id and reattach later.

# Original session
batch = train.run_async_map(configs)
print(f"Batch ID: {batch.group_id}")  # e.g., "grp-a1b2c3d4"

# Later, from any machine with access to the same GCP project
batch = kinetic.attach_batch("grp-a1b2c3d4")
results = batch.results()

attach_batch() downloads the group manifest from GCS and reconstructs a JobHandle for each child. Index alignment is preserved: if the original batch had 10 inputs and only 7 were submitted before a crash, the returned batch.jobs list has 10 entries with None in the 3 unsubmitted slots.

Note

Kinetic logs a warning when a reattached batch has fewer children than expected, indicating partial submission.

Parameters:

  • group_id (str): The batch identifier (e.g., "grp-a1b2c3d4").

  • project (str | None, default None): GCP project. Uses the default when None.

  • cluster (str | None, default None): GKE cluster name. Uses the default when None.

Cleanup#

There are two ways to clean up resources after a batch completes.

Automatic cleanup via results()#

By default, results(cleanup=True) deletes each child’s Kubernetes resources and GCS artifacts after downloading its result. The group manifest is preserved, so attach_batch() still works.

# Each child is cleaned up as its result is downloaded
results = batch.results()  # cleanup=True is the default

Full teardown#

To delete everything — all children’s resources and the group manifest itself — call cleanup() on the handle:

batch.cleanup(k8s=True, gcs=True)

Parameters:

  • k8s (bool, default True): Delete Kubernetes resources (Jobs/pods) for each child.

  • gcs (bool, default True): Delete GCS artifacts for each child and the group manifest.

Important

After calling cleanup(gcs=True), the batch can no longer be reattached via attach_batch() because the manifest has been deleted.

How it works#

Threading model#

When max_concurrent is set (the default is 64) or retries > 0, run_async_map() launches a non-daemon background thread to manage submissions. The thread polls active jobs for terminal states and launches new ones as concurrency slots free up. The BatchHandle is returned immediately.

When max_concurrent=None and retries=0, all jobs are submitted synchronously in the calling thread before map() returns. No background thread is created.

Manifest#

A JSON manifest is written to GCS before the first job is submitted. It records the batch metadata (group ID, expected total, function name, tags) and is updated after each successful submission with the child’s job ID and attempt count. This enables crash recovery: attach_batch() reads the manifest to determine which jobs were submitted and reconstructs the handle.

Group ID#

Each batch gets a unique identifier in the format grp-{8-hex-chars} (e.g., grp-a1b2c3d4). This ID is set on each child JobHandle as group_id, along with group_kind="map" and the child’s group_index.

Submission errors#

If a call to the function itself raises (e.g., a packaging or validation error), the exception is captured internally and the corresponding slot in batch.jobs remains None. These errors are surfaced when you call results() — either as entries in the BatchError.partial_results list or as exception objects when return_exceptions=True.