Working with Data#
kinetic.Data(...) is the API for getting bytes into your remote function.
It accepts a local file or directory path, or a gs:// URI, and resolves
to a plain filesystem path inside the pod. Your function code only sees
paths — never URIs, never Data objects.
That uniformity is the whole point: you write the same training code whether the data started on your laptop, in a GCS bucket, or as a FUSE-mounted dataset too large to fit on disk.
A first example#
import kinetic
from kinetic import Data
@kinetic.run(accelerator="cpu")
def process_data(data_path):
import os
print(f"Reading from: {data_path}")
return sorted(os.listdir(data_path))
# Local directory
process_data(Data("./my_dataset/"))
# GCS directory — trailing slash signals it's a directory
process_data(Data("gs://my-bucket/training-set/"))
Data works as a function argument, as a value inside a list/dict, and as
a value in the volumes={...} decorator argument:
@kinetic.run(
accelerator="tpu-v5e-4",
volumes={"/data": Data("./dataset/")},
)
def train():
import pandas as pd
df = pd.read_csv("/data/train.csv")
return len(df)
Use volumes={...} when your training script has hardcoded absolute
paths it expects to read from. Pass Data(...) as a function argument
when you’d rather receive the path explicitly.
Choosing a data access pattern#
Three patterns cover almost everything:
Downloaded
Data(default) —Data("..."). Kinetic copies the bytes onto the pod’s local disk before your function runs. Reads are fast (local disk), but the pod has to wait for the download to finish.FUSE-mounted
Data—Data("gs://...", fuse=True). The bucket is mounted lazily; only files you actuallyopen()are fetched from GCS. Pod startup is near-instant; per-file reads pay GCS latency.Raw
gs://streaming — your code usestf.io.gfile,gcsfs, or a similar library to talk to GCS directly withoutData(...). This bypasses theDataabstraction entirely; reach for it only when you have a specific reason to.
Decision table:
Dataset size |
Access pattern |
Use |
|---|---|---|
Small (<10 GB) |
Read most/all files |
|
Small (<10 GB) |
Random access |
|
Medium (10–100 GB) |
Streaming once-through |
|
Medium (10–100 GB) |
Random access many epochs |
|
Large (>100 GB) |
Streaming, sparse subset |
|
Large (>100 GB) |
Need indexed shards |
|
Already in GCS |
Any size |
|
Tip
Recommended defaults:
For small or medium datasets you read every epoch, use plain
Data(...). The download cost is paid once at pod startup; subsequent reads are local-disk fast.For datasets that are too large to fit on the pod’s disk, or where you only touch a fraction of the files, use
Data("gs://...", fuse=True).Wrap GCS data in
Data(...)even when it is already in GCS so your function uses the same path-based API regardless of source. Note that Kinetic’s content-hash-based upload caching applies only to local data; GCS-hostedDatais passed through by URI without rehashing or re-uploading.
FUSE mounting#
fuse=True mounts the data through the
GCS FUSE CSI driver
instead of downloading it. Your function still receives a filesystem
path; reads stream on demand from GCS.
@kinetic.run(
accelerator="tpu-v5e-4",
volumes={"/data": Data("gs://my-bucket/imagenet/", fuse=True)},
)
def train():
# Only files you open() are fetched from GCS
...
FUSE works with both volumes={...} and function arguments, with both
local paths and GCS URIs. Single files work transparently — the pod sees
a file path, not a directory:
@kinetic.run(accelerator="cpu")
def read_config(config_path):
with open(config_path) as f:
return json.load(f)
read_config(Data("./config.json", fuse=True))
You can mix FUSE-mounted and downloaded data in the same job:
@kinetic.run(
accelerator="tpu-v5e-4",
volumes={
"/data": Data("gs://my-bucket/large-dataset/", fuse=True),
"/config": Data("./small-config/"),
},
)
def train(extra_data):
...
train(Data("./labels.csv")) # downloaded function-argument data
Prerequisites: FUSE mounting needs the GCS FUSE CSI driver addon on
the GKE cluster. kinetic up enables it by default.
For a runnable end-to-end walkthrough covering volume mounts, single
files, multiple FUSE volumes, and mixed FUSE/downloaded data in the same
job, see
examples/example_fuse.py.
How it caches#
Local data is content-addressed: identical bytes upload only once, regardless of how many jobs reference them. SHA-256 of the contents becomes the cache key, and re-runs with unchanged data skip the upload entirely.
This also means files inside your project root that you wrap in
Data(...) are automatically excluded from the per-job context.zip
payload — no redundant upload of the same bytes.
Appendix: implementation internals#
The rest of this page is for contributors and people debugging data-related issues. End users do not need to read it.
Data reference serialization#
Data objects can’t be sent directly to the remote pod. During
_prepare_artifacts(), each Data is uploaded to GCS and replaced with
a serializable __data_ref__ dict:
{
"__data_ref__": True,
"gcs_uri": "gs://bucket/namespace/data-cache/abc123",
"is_dir": True,
"mount_path": "/data", # None for function-argument Data
"fuse": False, # True when fuse=True was passed
}
On the remote pod, resolve_data_refs() in remote_runner.py walks the
deserialized args/kwargs recursively and replaces these dicts with local
filesystem paths.
Upload and caching pipeline#
Local data is uploaded to gs://{bucket}/{namespace}/data-cache/{hash}/,
where {hash} is a SHA-256 computed over sorted file contents. The flow:
Compute content hash (deterministic: sorted DFS order, per-file SHA-256, then combined).
Check for a sentinel blob at
{namespace}/data-markers/{hash}— if present, skip upload.Upload files preserving directory structure under the hash prefix.
Write the sentinel blob last to signal upload-complete.
For single files, the blob is stored at {hash}/{filename}. For
directories, the full tree is preserved under {hash}/. The returned
GCS URI always points to the hash prefix directory, not individual files.
FUSE mount implementation#
GCS FUSE can only mount directories, not individual files. The system handles this through several layers:
Volume spec construction (execution.py): for fuse=True Data, a
FUSE volume spec is built with gcs_uri, mount_path, is_dir, and
read_only. Specs live on ctx.fuse_volume_specs and pass through to
the backend.
URI adjustment for uploaded single files: upload_data() returns a
directory-level URI (gs://bucket/ns/data-cache/{hash}) since the hash
prefix is a directory. For FUSE single-file mounts, _fuse_gcs_uri()
appends the original filename (gs://bucket/ns/data-cache/{hash}/config.json)
so the only-dir mount option scopes to the hash directory rather than
the entire data-cache/ tree. The data ref retains the directory-level
URI for download compatibility.
K8s volume generation: each spec becomes an inline ephemeral CSI
volume. The only-dir mount option scopes the mount to a specific GCS
prefix. For single files (is_dir=False), the parent directory is
mounted. The pod receives a gke-gcsfuse/volumes: "true" annotation to
trigger the GCS FUSE sidecar injection.