<!-- Source: https://docs.geopera.com/api-reference/sdks/python/recipes · Markdown for LLMs -->

# Python recipes

Copy-paste workflows that chain real operations together using the fluent `Geopera` client, with the low-level free-function form shown once as an escape hatch.

These recipes assume you have already installed and configured the client. If you have not, start with the [Python SDK overview](/api-reference/sdks/python). The recommended way to call any operation is the fluent client: `client.<resource>.<action>(body)`, where each operation id maps to a resource path — `catalog.search` becomes `client.catalog.search(...)` and `orders.archive.place` becomes `client.orders.archive.place(...)`. The `body` is a plain dict or the typed input model from `geopera.models`; the result is the parsed, typed output. The full operation catalogue is in the [operation reference](/api-reference/sdks/python/reference). Every Geopera operation is a `POST` that carries its arguments in the JSON body — there are no path-or-query "GET routes" to learn, so once you know the operation id you know the call.

## Setup

Install the package and construct one client you can reuse across every recipe on this page:

```bash
pip install geopera
```

```python
from geopera import Geopera

# Reuse a single client across all the recipes below.
client = Geopera(token="gpra_...")  # a Geopera API key, or a session token
```

The `token` is a [Bearer credential](/api-reference/authentication): either a minted API key (prefix `gpra_`) or a session token obtained from a browser sign-in. Both are sent verbatim as `Authorization: Bearer <token>` on every call — the API key _is_ the bearer token, so there is no token-exchange, OIDC, or refresh step to perform. The default `base_url` is `https://api.geopera.com`; pass `base_url=` if you target a staging environment. Avoid hard-coding the token: read it from the environment instead.

```python
import os

from geopera import Geopera

client = Geopera(token=os.environ["GEOPERA_API_TOKEN"])
```

Every fluent call returns the parsed, typed output model and raises a typed error on failure — see [error handling](#error-handling) below and the [errors guide](/api-reference/errors) for the full problem schema. If you need the underlying generated client (status codes, raw content, or the `sync`/`asyncio` free functions), it is available as `client.client`; the [low-level escape hatch](#low-level-escape-hatch) recipe shows that form.

## Recipe: catalog search

Discover archive imagery over an area and time window. `catalog.search` returns a typed `CatalogSearchOutput`. `host_name` is required — it names the STAC host you are querying; everything else narrows the search.

```python
from geopera import Geopera

client = Geopera(token="gpra_...")

result = client.catalog.search({
    "host_name": "earthsearch-aws",
    "collections": ["sentinel-2-l2a"],
    "bbox": [151.18, -33.92, 151.30, -33.84],   # west, south, east, north
    "datetime_": "2024-01-01T00:00:00Z/2024-03-31T23:59:59Z",
    "query": {"eo:cloud_cover": {"lt": 20}},     # STAC query extension
    "limit": 25,
})

for item in result.features:
    print(item.id, item.properties)
```

Key fields:

- `host_name` (required) — the catalog host to query. Discover available hosts with `catalog.hosts.list` and their collections with `catalog.collections.list`.
- `collections` — a list of collection ids; repeatable and additive.
- `bbox` — a `[west, south, east, north]` bounding box in WGS84 degrees. Use `intersects` (a GeoJSON geometry) instead when you need a non-rectangular AOI; do not send both.
- `datetime_` — a single RFC 3339 instant or a `start/end` interval. Note the trailing underscore: `datetime` is reserved in Python, so the model field is `datetime_` and it serializes to `datetime` on the wire.
- `query` — the STAC query extension, used here to keep only scenes under 20% cloud cover.
- `limit` — page size; defaults to `100` if omitted.

### Paging through every result

A single response is one page. To walk the full result set, carry the `next_` token from each response into the next request and stop when it is absent:

```python
from geopera import Geopera

client = Geopera(token="gpra_...")

body = {
    "host_name": "earthsearch-aws",
    "collections": ["sentinel-2-l2a"],
    "bbox": [151.18, -33.92, 151.30, -33.84],
    "limit": 100,
}

all_items = []
while True:
    page = client.catalog.search(body)
    all_items.extend(page.features)
    if not page.next_:        # no more pages
        break
    body = {**body, "next_": page.next_}

print(f"collected {len(all_items)} items")
```

For very large result sets prefer the streaming variant, `catalog.search_stream`, which yields features without buffering whole pages in memory. The full discovery workflow is in the [catalog discovery guide](/api-reference/guides/catalog-discovery), and paging rules are in [pagination](/api-reference/pagination).

## Recipe: a tasking estimate

Before you commit to a satellite tasking order, preview the server-authoritative price. `orders.tasking.estimate` takes a body whose `groups` describe the AOIs and constraints you want priced, and returns a typed `TaskingEstimateOutput`.

```python
from geopera import Geopera

client = Geopera(token="gpra_...")

# Each group is an open object: set the fields your tasking request needs.
group = {
    "sensor": "optical-vhr",
    "geometry": {
        "type": "Polygon",
        "coordinates": [[
            [151.18, -33.92], [151.30, -33.92],
            [151.30, -33.84], [151.18, -33.84],
            [151.18, -33.92],
        ]],
    },
    "window": {
        "start": "2024-06-01T00:00:00Z",
        "end": "2024-06-30T23:59:59Z",
    },
    "max_cloud_cover": 15,
    "off_nadir_max": 30,
}

estimate = client.orders.tasking.estimate({"groups": [group]})
print("estimated price:", estimate)  # TaskingEstimateOutput (typed)
```

Each group is an open object — the exact fields depend on the sensor and constraints, so pass them as a plain dict. You can price several AOIs in one call by adding more entries to `groups`. The returned price is authoritative: place the order with the _same_ groups to lock it in, because re-pricing at placement time can shift if you alter the constraints. For the end-to-end tasking lifecycle (feasibility, quotations, placement), see the [tasking guide](/api-reference/guides/tasking).

## Recipe: place an archive order with an Idempotency-Key

Placing an order spends credits, so make it retry-safe. `orders.archive.place` accepts an open body and returns `202 Accepted` — the order is queued for fulfilment. Send an `Idempotency-Key` so a network retry never double-charges you.

```python
import uuid

from geopera import Geopera

client = Geopera(token="gpra_...")

# One stable key per logical order — reuse it verbatim on every retry.
idem_key = str(uuid.uuid4())

order = client.orders.archive.place(
    {
        "collection": "sentinel-2-l2a",
        "item_ids": ["S2A_MSIL2A_20240115T000241_R030_T56HKH_20240115T012345"],
        "delivery": {"format": "geotiff"},
    },
    idempotency_key=idem_key,
)
print("order accepted:", order)  # queued for fulfilment
```

The `idempotency_key` argument sets the `Idempotency-Key` header. The rules:

- **Generate the key before the first attempt and persist it**, so a crash-and-retry reuses the same key rather than minting a fresh one.
- Replaying the exact same request with the same key returns the _original_ outcome instead of creating a second order. A _different_ key is always treated as a new order.
- Sending the same key with a _different_ body is a conflict, not a silent overwrite — the server rejects it. Keep the key bound to one immutable request.

Because placement is asynchronous, the `202` response gives you an order handle, not a finished order. Track it to completion with `orders.get` (pass the returned order id) and list recent orders with `orders.list`. The full idempotency contract is in the [idempotency guide](/api-reference/idempotency), and delivery options and order tracking are covered in the [ordering guide](/api-reference/guides/ordering).

### Place and poll the order to fulfilment

```python
import time
import uuid

from geopera import Geopera

client = Geopera(token="gpra_...")

TERMINAL = {"fulfilled", "failed", "cancelled"}

placed = client.orders.archive.place(
    {
        "collection": "sentinel-2-l2a",
        "item_ids": ["S2A_MSIL2A_20240115T000241_R030_T56HKH_20240115T012345"],
        "delivery": {"format": "geotiff"},
    },
    idempotency_key=str(uuid.uuid4()),
)
order_id = placed["id"]
print("placed:", order_id)

while True:
    order = client.orders.get({"order_id": order_id})
    status = order["status"]
    print("status:", status)
    if status in TERMINAL:
        break
    time.sleep(10)

print("final:", order)
```

If you would rather be notified than poll, attach an [event subscription](#recipe-create-an-event-subscription) to `order.fulfilled` and let Geopera call your endpoint.

## Recipe: run a processing job and poll for completion

Kick off a server-side processing job, then poll until it finishes. `processing.create` returns the new job; `processing.job.get` takes a `job_id` and returns the current job state, which you poll.

```python
import time

from geopera import Geopera

client = Geopera(token="gpra_...")

TERMINAL = {"succeeded", "failed", "cancelled"}

created = client.processing.create({
    "job_type": "ndvi",
    "inputs": {
        "collection": "sentinel-2-l2a",
        "item_ids": ["S2A_MSIL2A_20240115T000241_R030_T56HKH_20240115T012345"],
    },
})
job_id = created["id"]
print("job created:", job_id)

while True:
    job = client.processing.job.get({"job_id": job_id})
    status = job["status"]
    print("status:", status)
    if status in TERMINAL:
        break
    time.sleep(5)

if job["status"] != "succeeded":
    raise RuntimeError(f"job {job_id} ended in {job['status']}: {job.get('error')}")

print("done:", job)
```

`processing.create` returns a JSON object, so read fields by key (`created["id"]`). Poll on a fixed interval and stop once `status` reaches a terminal value, then branch on whether it succeeded. Discover available `job_type` values with `processing.job_types.list`, and browse ready-made pipelines with `processing.catalog.list`. The full pipeline — including registering outputs and listing a project's jobs — is in the [processing guide](/api-reference/guides/processing).

### Polling with a bounded backoff

For long jobs, cap the wait and back off so you are not hammering the API. This also respects [rate limits](/api-reference/rate-limits):

```python
import itertools
import time

from geopera import Geopera

client = Geopera(token="gpra_...")

TERMINAL = {"succeeded", "failed", "cancelled"}
DEADLINE = time.monotonic() + 1800  # give up after 30 minutes

job_id = client.processing.create({"job_type": "ndvi", "inputs": {...}})["id"]

# 2s, 4s, 8s, 16s, then steady at 30s.
delays = itertools.chain([2, 4, 8, 16], itertools.repeat(30))
for delay in delays:
    job = client.processing.job.get({"job_id": job_id})
    if job["status"] in TERMINAL:
        break
    if time.monotonic() > DEADLINE:
        raise TimeoutError(f"job {job_id} still {job['status']} after 30 min")
    time.sleep(delay)

print("final status:", job["status"])
```

## Recipe: top up billing credits

Add credits to your balance with an off-session card charge. `billing.topup` accepts an open body, returns the created top-up, and — like order placement — accepts an `Idempotency-Key` so a retry never charges the card twice.

```python
import uuid

from geopera import Geopera

client = Geopera(token="gpra_...")

topup = client.billing.topup(
    {
        "amount": 5000,        # smallest currency unit (e.g. cents)
        "currency": "usd",
    },
    idempotency_key=str(uuid.uuid4()),
)
print("topped up:", topup)
```

`amount` is in the smallest currency unit, so `5000` with `currency: "usd"` is 50.00 USD — never send a floating-point dollar figure. As with order placement, persist the key _before_ the first attempt so a retry reuses it; replaying the same key returns the original top-up rather than charging again.

Check the resulting balance with `billing.credits.balance`, review movements with `billing.credits.transactions`, and configure automatic top-ups (so you never hit a zero balance mid-order) with `billing.set_auto_topup`:

```python
from geopera import Geopera

client = Geopera(token="gpra_...")

# Top up automatically whenever the balance drops below the threshold.
client.billing.set_auto_topup({
    "enabled": True,
    "threshold": 1000,   # trigger at 10.00 USD remaining
    "amount": 5000,      # add 50.00 USD
    "currency": "usd",
})

balance = client.billing.credits.balance({})
print("balance:", balance)
```

The [billing guide](/api-reference/guides/billing) covers credits, invoices, and approvals end to end.

## Recipe: create an event subscription

Get notified when something happens instead of polling. `event_subscriptions.create` takes an `event_type` to listen for and an `endpoint_url` to receive deliveries, and returns a typed `SubscriptionOutput`.

```python
from geopera import Geopera

client = Geopera(token="gpra_...")

sub = client.event_subscriptions.create({
    "event_type": "order.fulfilled",
    "endpoint_url": "https://example.com/hooks/geopera",
    "description": "Notify our pipeline when archive orders complete",
    "filter_config": {"collection": "sentinel-2-l2a"},
    "headers": {"X-Pipeline-Token": "shared-secret"},
})
print("subscribed:", sub)  # SubscriptionOutput
```

The body fields:

- `event_type` (required) — the event to listen for, e.g. `order.fulfilled` or `processing.job.succeeded`.
- `endpoint_url` (required) — the HTTPS URL Geopera POSTs each delivery to.
- `description` — a human-readable label that shows up when you list subscriptions.
- `filter_config` — narrow which events fire, so you only receive the deliveries you care about.
- `headers` — custom headers Geopera includes on every delivery (handy for routing or a shared secret your endpoint checks).

After creating a subscription, fire a sample delivery to confirm your endpoint is reachable, and list your subscriptions to confirm it registered:

```python
from geopera import Geopera

client = Geopera(token="gpra_...")

sub = client.event_subscriptions.create({
    "event_type": "order.fulfilled",
    "endpoint_url": "https://example.com/hooks/geopera",
})
sub_id = sub.id

# Send a sample delivery so you can verify signature handling end to end.
client.event_subscriptions.test({"subscription_id": sub_id})

for s in client.event_subscriptions.list({}).subscriptions:
    print(s.id, s.event_type, s.endpoint_url)
```

The [webhooks guide](/api-reference/guides/webhooks) covers delivery signing, retries, and verification; the [alerts and notifications guide](/api-reference/guides/alerts-and-notifications) covers the wider notification surface.

## Putting it together: an end-to-end pipeline

The recipes above compose. This single script searches the catalog, prices and places an order, waits for fulfilment via a subscription instead of polling, then runs a processing job over the delivered scene — each step retry-safe where it spends money.

```python
import time
import uuid

from geopera import Geopera

client = Geopera(token="gpra_...")

# 1. Discover a low-cloud scene over the AOI.
search = client.catalog.search({
    "host_name": "earthsearch-aws",
    "collections": ["sentinel-2-l2a"],
    "bbox": [151.18, -33.92, 151.30, -33.84],
    "query": {"eo:cloud_cover": {"lt": 10}},
    "limit": 1,
})
if not search.features:
    raise SystemExit("no scenes matched the AOI")
item_id = search.features[0].id

# 2. Make sure we have credits to spend.
client.billing.topup(
    {"amount": 5000, "currency": "usd"},
    idempotency_key=str(uuid.uuid4()),
)

# 3. Place the order, retry-safe.
order = client.orders.archive.place(
    {
        "collection": "sentinel-2-l2a",
        "item_ids": [item_id],
        "delivery": {"format": "geotiff"},
    },
    idempotency_key=str(uuid.uuid4()),
)
order_id = order["id"]

# 4. Wait for fulfilment.
while client.orders.get({"order_id": order_id})["status"] not in {
    "fulfilled", "failed", "cancelled",
}:
    time.sleep(10)

# 5. Process the delivered scene.
job = client.processing.create({
    "job_type": "ndvi",
    "inputs": {"collection": "sentinel-2-l2a", "item_ids": [item_id]},
})
print("pipeline kicked off processing job:", job["id"])
```

## Low-level escape hatch

The fluent client covers every operation in the snapshot. When you need to call an operation by free function — for scripting, full control over the four entry points (`sync`, `sync_detailed`, `asyncio`, `asyncio_detailed`), or an operation not yet in the fluent surface — drop down to the generated `AuthenticatedClient` and the per-operation modules under `geopera.api.operations`. Each operation id maps to a module (dots become underscores — `catalog.search` becomes `catalog_search`, `orders.tasking.estimate` becomes `orders_tasking_estimate`), and its input/output models live in `geopera.models`.

```python
from geopera import AuthenticatedClient
from geopera.api.operations import catalog_search
from geopera.models import CatalogSearchInput

client = AuthenticatedClient(base_url="https://api.geopera.com", token="gpra_...")

result = catalog_search.sync(
    client=client,
    body=CatalogSearchInput(
        host_name="earthsearch-aws",
        collections=["sentinel-2-l2a"],
        limit=10,
    ),
)
print(result)  # CatalogSearchOutput (typed)
```

The four entry points let you choose blocking versus async and parsed versus raw:

- `sync(client=, body=)` — blocking; returns the parsed model (or `None` on an unexpected status).
- `sync_detailed(client=, body=)` — blocking; returns a response object exposing `status_code`, `parsed`, `headers`, and raw `content`.
- `asyncio(client=, body=)` — `await`able; parsed model.
- `asyncio_detailed(client=, body=)` — `await`able; response object.

```python
import asyncio

from geopera import AuthenticatedClient
from geopera.api.operations import catalog_search
from geopera.models import CatalogSearchInput

async def main():
    async with AuthenticatedClient(base_url="https://api.geopera.com", token="gpra_...") as client:
        resp = await catalog_search.asyncio_detailed(
            client=client,
            body=CatalogSearchInput(host_name="earthsearch-aws", limit=5),
        )
        print(resp.status_code, resp.parsed)

asyncio.run(main())
```

The fluent `Geopera` client wraps exactly this machinery; its underlying `AuthenticatedClient` is reachable as `client.client` if you want to mix both forms in one program — for example, using the fluent surface everywhere but dropping to `sync_detailed` on one call where you need the raw status code.

## Error handling

Every recipe above can fail with a typed error. The fluent client raises a typed exception carrying the parsed error — a `Problem` (RFC 9457 `problem+json`, returned for business and permission errors) or an `HTTPValidationError` (a `422` input-validation failure):

```python
from geopera import Geopera
from geopera.errors import GeoperaError
from geopera.models import Problem, HTTPValidationError

client = Geopera(token="gpra_...")

try:
    data = client.catalog.search({"host_name": "earthsearch-aws", "limit": 5})
except GeoperaError as exc:
    if isinstance(exc.error, Problem):
        print("geopera error:", exc.error.title, exc.error.detail)
    elif isinstance(exc.error, HTTPValidationError):
        print("invalid input:", exc.error.detail)
    else:
        raise
```

Common cases to branch on:

- **`401 Unauthorized`** — the token is missing, malformed, or expired. Check that you passed a `gpra_` API key or a current session token.
- **`403 Forbidden`** — the token is valid but lacks the [scope](/api-reference/scopes) the operation requires. Mint a key with the right scope in the portal.
- **`409 Conflict`** — usually an `Idempotency-Key` reused with a different body; reuse the original body or pick a new key.
- **`422 Unprocessable Entity`** — input validation; `HTTPValidationError.detail` lists which fields failed.
- **`429 Too Many Requests`** — back off and retry; see [rate limits](/api-reference/rate-limits).

If you need the raw HTTP status code instead of catching an exception, use the `*_detailed` free functions shown in the [low-level escape hatch](#low-level-escape-hatch) — they return a response object exposing `status_code`, `parsed`, and raw `content`, and never raise on a typed error status:

```python
from geopera import AuthenticatedClient
from geopera.api.operations import catalog_search
from geopera.models import CatalogSearchInput, Problem

client = AuthenticatedClient(base_url="https://api.geopera.com", token="gpra_...")

resp = catalog_search.sync_detailed(
    client=client, body=CatalogSearchInput(host_name="earthsearch-aws", limit=5)
)
if resp.status_code == 200:
    data = resp.parsed                 # CatalogSearchOutput
elif isinstance(resp.parsed, Problem):
    print("geopera error:", resp.parsed.title, resp.parsed.detail)
```

For the complete `Problem` field reference and status-code semantics, see the [errors guide](/api-reference/errors).

## Where to go next

<CardGrid columns={2}>
  <InfoCard title="Operation reference" description="Every operation mapped to its resource action and input model." href="/api-reference/sdks/python/reference" />
  <InfoCard title="Async patterns" description="Concurrency, connection reuse, and polling with the async client." href="/api-reference/sdks/python/async" />
  <InfoCard title="Idempotency" description="Make order placement and top-ups safe to retry." href="/api-reference/idempotency" />
  <InfoCard title="Errors" description="The RFC 9457 problem+json schema and status-code semantics." href="/api-reference/errors" />
</CardGrid>
