Python recipes
Copy-paste workflows that chain real operations together using the fluent Geopera client, with the low-level free-function form shown once as an escape hatch.
These recipes assume you have already installed and configured the client. If you have not, start with the Python SDK overview. The recommended way to call any operation is the fluent client: client.<resource>.<action>(body), where each operation id maps to a resource path — catalog.search becomes client.catalog.search(...) and orders.archive.place becomes client.orders.archive.place(...). The body is a plain dict or the typed input model from geopera.models; the result is the parsed, typed output. The full operation catalogue is in the operation reference. Every Geopera operation is a POST that carries its arguments in the JSON body — there are no path-or-query “GET routes” to learn, so once you know the operation id you know the call.
Setup
Install the package and construct one client you can reuse across every recipe on this page:
pip install geoperafrom geopera import Geopera
# Reuse a single client across all the recipes below.
client = Geopera(token="gpra_...") # a Geopera API key, or a session tokenThe token is a Bearer credential: either a minted API key (prefix gpra_) or a session token obtained from a browser sign-in. Both are sent verbatim as Authorization: Bearer <token> on every call — the API key is the bearer token, so there is no token-exchange, OIDC, or refresh step to perform. The default base_url is https://api.geopera.com; pass base_url= if you target a staging environment. Avoid hard-coding the token: read it from the environment instead.
import os
from geopera import Geopera
client = Geopera(token=os.environ["GEOPERA_API_TOKEN"])Every fluent call returns the parsed, typed output model and raises a typed error on failure — see error handling below and the errors guide for the full problem schema. If you need the underlying generated client (status codes, raw content, or the sync/asyncio free functions), it is available as client.client; the low-level escape hatch recipe shows that form.
Recipe: catalog search
Discover archive imagery over an area and time window. catalog.search returns a typed CatalogSearchOutput. host_name is required — it names the STAC host you are querying; everything else narrows the search.
from geopera import Geopera
client = Geopera(token="gpra_...")
result = client.catalog.search({
"host_name": "earthsearch-aws",
"collections": ["sentinel-2-l2a"],
"bbox": [151.18, -33.92, 151.30, -33.84], # west, south, east, north
"datetime_": "2024-01-01T00:00:00Z/2024-03-31T23:59:59Z",
"query": {"eo:cloud_cover": {"lt": 20}}, # STAC query extension
"limit": 25,
})
for item in result.features:
print(item.id, item.properties)Key fields:
host_name(required) — the catalog host to query. Discover available hosts withcatalog.hosts.listand their collections withcatalog.collections.list.collections— a list of collection ids; repeatable and additive.bbox— a[west, south, east, north]bounding box in WGS84 degrees. Useintersects(a GeoJSON geometry) instead when you need a non-rectangular AOI; do not send both.datetime_— a single RFC 3339 instant or astart/endinterval. Note the trailing underscore:datetimeis reserved in Python, so the model field isdatetime_and it serializes todatetimeon the wire.query— the STAC query extension, used here to keep only scenes under 20% cloud cover.limit— page size; defaults to100if omitted.
Paging through every result
A single response is one page. To walk the full result set, carry the next_ token from each response into the next request and stop when it is absent:
from geopera import Geopera
client = Geopera(token="gpra_...")
body = {
"host_name": "earthsearch-aws",
"collections": ["sentinel-2-l2a"],
"bbox": [151.18, -33.92, 151.30, -33.84],
"limit": 100,
}
all_items = []
while True:
page = client.catalog.search(body)
all_items.extend(page.features)
if not page.next_: # no more pages
break
body = {**body, "next_": page.next_}
print(f"collected {len(all_items)} items")For very large result sets prefer the streaming variant, catalog.search_stream, which yields features without buffering whole pages in memory. The full discovery workflow is in the catalog discovery guide, and paging rules are in pagination.
Recipe: a tasking estimate
Before you commit to a satellite tasking order, preview the server-authoritative price. orders.tasking.estimate takes a body whose groups describe the AOIs and constraints you want priced, and returns a typed TaskingEstimateOutput.
from geopera import Geopera
client = Geopera(token="gpra_...")
# Each group is an open object: set the fields your tasking request needs.
group = {
"sensor": "optical-vhr",
"geometry": {
"type": "Polygon",
"coordinates": [[
[151.18, -33.92], [151.30, -33.92],
[151.30, -33.84], [151.18, -33.84],
[151.18, -33.92],
]],
},
"window": {
"start": "2024-06-01T00:00:00Z",
"end": "2024-06-30T23:59:59Z",
},
"max_cloud_cover": 15,
"off_nadir_max": 30,
}
estimate = client.orders.tasking.estimate({"groups": [group]})
print("estimated price:", estimate) # TaskingEstimateOutput (typed)Each group is an open object — the exact fields depend on the sensor and constraints, so pass them as a plain dict. You can price several AOIs in one call by adding more entries to groups. The returned price is authoritative: place the order with the same groups to lock it in, because re-pricing at placement time can shift if you alter the constraints. For the end-to-end tasking lifecycle (feasibility, quotations, placement), see the tasking guide.
Recipe: place an archive order with an Idempotency-Key
Placing an order spends credits, so make it retry-safe. orders.archive.place accepts an open body and returns 202 Accepted — the order is queued for fulfilment. Send an Idempotency-Key so a network retry never double-charges you.
import uuid
from geopera import Geopera
client = Geopera(token="gpra_...")
# One stable key per logical order — reuse it verbatim on every retry.
idem_key = str(uuid.uuid4())
order = client.orders.archive.place(
{
"collection": "sentinel-2-l2a",
"item_ids": ["S2A_MSIL2A_20240115T000241_R030_T56HKH_20240115T012345"],
"delivery": {"format": "geotiff"},
},
idempotency_key=idem_key,
)
print("order accepted:", order) # queued for fulfilmentThe idempotency_key argument sets the Idempotency-Key header. The rules:
- Generate the key before the first attempt and persist it, so a crash-and-retry reuses the same key rather than minting a fresh one.
- Replaying the exact same request with the same key returns the original outcome instead of creating a second order. A different key is always treated as a new order.
- Sending the same key with a different body is a conflict, not a silent overwrite — the server rejects it. Keep the key bound to one immutable request.
Because placement is asynchronous, the 202 response gives you an order handle, not a finished order. Track it to completion with orders.get (pass the returned order id) and list recent orders with orders.list. The full idempotency contract is in the idempotency guide, and delivery options and order tracking are covered in the ordering guide.
Place and poll the order to fulfilment
import time
import uuid
from geopera import Geopera
client = Geopera(token="gpra_...")
TERMINAL = {"fulfilled", "failed", "cancelled"}
placed = client.orders.archive.place(
{
"collection": "sentinel-2-l2a",
"item_ids": ["S2A_MSIL2A_20240115T000241_R030_T56HKH_20240115T012345"],
"delivery": {"format": "geotiff"},
},
idempotency_key=str(uuid.uuid4()),
)
order_id = placed["id"]
print("placed:", order_id)
while True:
order = client.orders.get({"order_id": order_id})
status = order["status"]
print("status:", status)
if status in TERMINAL:
break
time.sleep(10)
print("final:", order)If you would rather be notified than poll, attach an event subscription to order.fulfilled and let Geopera call your endpoint.
Recipe: run a processing job and poll for completion
Kick off a server-side processing job, then poll until it finishes. processing.create returns the new job; processing.job.get takes a job_id and returns the current job state, which you poll.
import time
from geopera import Geopera
client = Geopera(token="gpra_...")
TERMINAL = {"succeeded", "failed", "cancelled"}
created = client.processing.create({
"job_type": "ndvi",
"inputs": {
"collection": "sentinel-2-l2a",
"item_ids": ["S2A_MSIL2A_20240115T000241_R030_T56HKH_20240115T012345"],
},
})
job_id = created["id"]
print("job created:", job_id)
while True:
job = client.processing.job.get({"job_id": job_id})
status = job["status"]
print("status:", status)
if status in TERMINAL:
break
time.sleep(5)
if job["status"] != "succeeded":
raise RuntimeError(f"job {job_id} ended in {job['status']}: {job.get('error')}")
print("done:", job)processing.create returns a JSON object, so read fields by key (created["id"]). Poll on a fixed interval and stop once status reaches a terminal value, then branch on whether it succeeded. Discover available job_type values with processing.job_types.list, and browse ready-made pipelines with processing.catalog.list. The full pipeline — including registering outputs and listing a project’s jobs — is in the processing guide.
Polling with a bounded backoff
For long jobs, cap the wait and back off so you are not hammering the API. This also respects rate limits:
import itertools
import time
from geopera import Geopera
client = Geopera(token="gpra_...")
TERMINAL = {"succeeded", "failed", "cancelled"}
DEADLINE = time.monotonic() + 1800 # give up after 30 minutes
job_id = client.processing.create({"job_type": "ndvi", "inputs": {...}})["id"]
# 2s, 4s, 8s, 16s, then steady at 30s.
delays = itertools.chain([2, 4, 8, 16], itertools.repeat(30))
for delay in delays:
job = client.processing.job.get({"job_id": job_id})
if job["status"] in TERMINAL:
break
if time.monotonic() > DEADLINE:
raise TimeoutError(f"job {job_id} still {job['status']} after 30 min")
time.sleep(delay)
print("final status:", job["status"])Recipe: top up billing credits
Add credits to your balance with an off-session card charge. billing.topup accepts an open body, returns the created top-up, and — like order placement — accepts an Idempotency-Key so a retry never charges the card twice.
import uuid
from geopera import Geopera
client = Geopera(token="gpra_...")
topup = client.billing.topup(
{
"amount": 5000, # smallest currency unit (e.g. cents)
"currency": "usd",
},
idempotency_key=str(uuid.uuid4()),
)
print("topped up:", topup)amount is in the smallest currency unit, so 5000 with currency: "usd" is 50.00 USD — never send a floating-point dollar figure. As with order placement, persist the key before the first attempt so a retry reuses it; replaying the same key returns the original top-up rather than charging again.
Check the resulting balance with billing.credits.balance, review movements with billing.credits.transactions, and configure automatic top-ups (so you never hit a zero balance mid-order) with billing.set_auto_topup:
from geopera import Geopera
client = Geopera(token="gpra_...")
# Top up automatically whenever the balance drops below the threshold.
client.billing.set_auto_topup({
"enabled": True,
"threshold": 1000, # trigger at 10.00 USD remaining
"amount": 5000, # add 50.00 USD
"currency": "usd",
})
balance = client.billing.credits.balance({})
print("balance:", balance)The billing guide covers credits, invoices, and approvals end to end.
Recipe: create an event subscription
Get notified when something happens instead of polling. event_subscriptions.create takes an event_type to listen for and an endpoint_url to receive deliveries, and returns a typed SubscriptionOutput.
from geopera import Geopera
client = Geopera(token="gpra_...")
sub = client.event_subscriptions.create({
"event_type": "order.fulfilled",
"endpoint_url": "https://example.com/hooks/geopera",
"description": "Notify our pipeline when archive orders complete",
"filter_config": {"collection": "sentinel-2-l2a"},
"headers": {"X-Pipeline-Token": "shared-secret"},
})
print("subscribed:", sub) # SubscriptionOutputThe body fields:
event_type(required) — the event to listen for, e.g.order.fulfilledorprocessing.job.succeeded.endpoint_url(required) — the HTTPS URL Geopera POSTs each delivery to.description— a human-readable label that shows up when you list subscriptions.filter_config— narrow which events fire, so you only receive the deliveries you care about.headers— custom headers Geopera includes on every delivery (handy for routing or a shared secret your endpoint checks).
After creating a subscription, fire a sample delivery to confirm your endpoint is reachable, and list your subscriptions to confirm it registered:
from geopera import Geopera
client = Geopera(token="gpra_...")
sub = client.event_subscriptions.create({
"event_type": "order.fulfilled",
"endpoint_url": "https://example.com/hooks/geopera",
})
sub_id = sub.id
# Send a sample delivery so you can verify signature handling end to end.
client.event_subscriptions.test({"subscription_id": sub_id})
for s in client.event_subscriptions.list({}).subscriptions:
print(s.id, s.event_type, s.endpoint_url)The webhooks guide covers delivery signing, retries, and verification; the alerts and notifications guide covers the wider notification surface.
Putting it together: an end-to-end pipeline
The recipes above compose. This single script searches the catalog, prices and places an order, waits for fulfilment via a subscription instead of polling, then runs a processing job over the delivered scene — each step retry-safe where it spends money.
import time
import uuid
from geopera import Geopera
client = Geopera(token="gpra_...")
# 1. Discover a low-cloud scene over the AOI.
search = client.catalog.search({
"host_name": "earthsearch-aws",
"collections": ["sentinel-2-l2a"],
"bbox": [151.18, -33.92, 151.30, -33.84],
"query": {"eo:cloud_cover": {"lt": 10}},
"limit": 1,
})
if not search.features:
raise SystemExit("no scenes matched the AOI")
item_id = search.features[0].id
# 2. Make sure we have credits to spend.
client.billing.topup(
{"amount": 5000, "currency": "usd"},
idempotency_key=str(uuid.uuid4()),
)
# 3. Place the order, retry-safe.
order = client.orders.archive.place(
{
"collection": "sentinel-2-l2a",
"item_ids": [item_id],
"delivery": {"format": "geotiff"},
},
idempotency_key=str(uuid.uuid4()),
)
order_id = order["id"]
# 4. Wait for fulfilment.
while client.orders.get({"order_id": order_id})["status"] not in {
"fulfilled", "failed", "cancelled",
}:
time.sleep(10)
# 5. Process the delivered scene.
job = client.processing.create({
"job_type": "ndvi",
"inputs": {"collection": "sentinel-2-l2a", "item_ids": [item_id]},
})
print("pipeline kicked off processing job:", job["id"])Low-level escape hatch
The fluent client covers every operation in the snapshot. When you need to call an operation by free function — for scripting, full control over the four entry points (sync, sync_detailed, asyncio, asyncio_detailed), or an operation not yet in the fluent surface — drop down to the generated AuthenticatedClient and the per-operation modules under geopera.api.operations. Each operation id maps to a module (dots become underscores — catalog.search becomes catalog_search, orders.tasking.estimate becomes orders_tasking_estimate), and its input/output models live in geopera.models.
from geopera import AuthenticatedClient
from geopera.api.operations import catalog_search
from geopera.models import CatalogSearchInput
client = AuthenticatedClient(base_url="https://api.geopera.com", token="gpra_...")
result = catalog_search.sync(
client=client,
body=CatalogSearchInput(
host_name="earthsearch-aws",
collections=["sentinel-2-l2a"],
limit=10,
),
)
print(result) # CatalogSearchOutput (typed)The four entry points let you choose blocking versus async and parsed versus raw:
sync(client=, body=)— blocking; returns the parsed model (orNoneon an unexpected status).sync_detailed(client=, body=)— blocking; returns a response object exposingstatus_code,parsed,headers, and rawcontent.asyncio(client=, body=)—awaitable; parsed model.asyncio_detailed(client=, body=)—awaitable; response object.
import asyncio
from geopera import AuthenticatedClient
from geopera.api.operations import catalog_search
from geopera.models import CatalogSearchInput
async def main():
async with AuthenticatedClient(base_url="https://api.geopera.com", token="gpra_...") as client:
resp = await catalog_search.asyncio_detailed(
client=client,
body=CatalogSearchInput(host_name="earthsearch-aws", limit=5),
)
print(resp.status_code, resp.parsed)
asyncio.run(main())The fluent Geopera client wraps exactly this machinery; its underlying AuthenticatedClient is reachable as client.client if you want to mix both forms in one program — for example, using the fluent surface everywhere but dropping to sync_detailed on one call where you need the raw status code.
Error handling
Every recipe above can fail with a typed error. The fluent client raises a typed exception carrying the parsed error — a Problem (RFC 9457 problem+json, returned for business and permission errors) or an HTTPValidationError (a 422 input-validation failure):
from geopera import Geopera
from geopera.errors import GeoperaError
from geopera.models import Problem, HTTPValidationError
client = Geopera(token="gpra_...")
try:
data = client.catalog.search({"host_name": "earthsearch-aws", "limit": 5})
except GeoperaError as exc:
if isinstance(exc.error, Problem):
print("geopera error:", exc.error.title, exc.error.detail)
elif isinstance(exc.error, HTTPValidationError):
print("invalid input:", exc.error.detail)
else:
raiseCommon cases to branch on:
401 Unauthorized— the token is missing, malformed, or expired. Check that you passed agpra_API key or a current session token.403 Forbidden— the token is valid but lacks the scope the operation requires. Mint a key with the right scope in the portal.409 Conflict— usually anIdempotency-Keyreused with a different body; reuse the original body or pick a new key.422 Unprocessable Entity— input validation;HTTPValidationError.detaillists which fields failed.429 Too Many Requests— back off and retry; see rate limits.
If you need the raw HTTP status code instead of catching an exception, use the *_detailed free functions shown in the low-level escape hatch — they return a response object exposing status_code, parsed, and raw content, and never raise on a typed error status:
from geopera import AuthenticatedClient
from geopera.api.operations import catalog_search
from geopera.models import CatalogSearchInput, Problem
client = AuthenticatedClient(base_url="https://api.geopera.com", token="gpra_...")
resp = catalog_search.sync_detailed(
client=client, body=CatalogSearchInput(host_name="earthsearch-aws", limit=5)
)
if resp.status_code == 200:
data = resp.parsed # CatalogSearchOutput
elif isinstance(resp.parsed, Problem):
print("geopera error:", resp.parsed.title, resp.parsed.detail)For the complete Problem field reference and status-code semantics, see the errors guide.