Async usage

Async is first-class in the Python SDK: every operation that ships a sync callable also ships an awaitable asyncio one with an identical signature, both backed by a pooled httpx.AsyncClient that lives on the same AuthenticatedClient.

The fluent Geopera client — client.catalog.search({...}) — is synchronous: its nodes call the operation module’s sync/sync_detailed under the hood and block. There is no await client.catalog.search(...). To go async you drop one level down to the generated operation modules and call their asyncio / asyncio_detailed callables yourself, awaiting each one inside an event loop. This page documents that path end to end: the four callables, the async context manager, fanning out with asyncio.gather, and getting or setting the underlying httpx.AsyncClient. The fluent client is still useful in async code — its .client property hands you the same underlying AuthenticatedClient you pass to the operation modules.

The four callables

The SDK is fully typed, so each module under geopera.api.operations.<operation_id> exposes four entry points with matching parameters. All four take client and body as keyword-only arguments:

CallableReturnsBlocking?
sync(client=..., body=...)the typed success model, a typed error model, or Noneyes
sync_detailed(client=..., body=...)a Response wrapper (.parsed, .status_code, .headers, .content)yes
asyncio(client=..., body=...)the typed success model, a typed error model, or Noneno — await it
asyncio_detailed(client=..., body=...)a Response wrapperno — await it

Because the signatures match, porting code from sync to async means swapping result = op.sync(...) for result = await op.asyncio(...) and running inside an event loop. Operations are invoked by name (catalog_search, orders_tasking_estimate) — every operation is a POST, so there are no HTTP verbs to choose and no GET routes. See Operations for the full model.

The undecorated return type is a union. For catalog_search, asyncio returns CatalogSearchOutput | HTTPValidationError | Problem | None:

  • the typed success model on a documented success status,
  • a typed Problem (RFC 9457 problem+json) for business or permission errors,
  • a typed HTTPValidationError for a 422 request-validation failure,
  • None if the body could not be parsed and raise_on_unexpected_status is off.

asyncio_detailed wraps that same value in .parsed and adds .status_code, .headers, and .content.

The async context manager

AuthenticatedClient (and the unauthenticated Client) implement both the sync (with) and async (async with) context-manager protocols. For async work, prefer async with: it enters the underlying httpx.AsyncClient’s context once, lets the connection pool be reused across every awaited call inside the block, and closes the pool cleanly on exit.

python
import asyncio

from geopera import AuthenticatedClient
from geopera.api.operations import catalog_search
from geopera.models import CatalogSearchInput


async def main():
    async with AuthenticatedClient(
        base_url="https://api.geopera.com",
        token="gpra_...",          # Geopera API key or browser session token
    ) as client:
        result = await catalog_search.asyncio(
            client=client,
            body=CatalogSearchInput(
                host_name="api.geopera.com",
                collections=["sentinel-2-l2a"],
                limit=10,
            ),
        )
        print(result)              # -> CatalogSearchOutput (typed)


asyncio.run(main())

The token is the bearer credential, sent as Authorization: Bearer <token>. It is either a minted API key with the gpra_ prefix or a session token obtained by signing in to Geopera through the browser — both are passed as token and used the same way. There is no token-exchange, refresh, or grant step for an API key: the key is the bearer token. See Authentication for how the SDK builds the header.

Why use the context manager

You can call asyncio() against a client you never entered — the SDK lazily constructs an httpx.AsyncClient on first use via get_async_httpx_client(). But without async with, that client is never explicitly closed, so its connection pool leaks until garbage collection and you may see “Unclosed client session” warnings. Entering the context manager once and reusing the client for the lifetime of your work is both faster (pooled, keep-alive connections) and clean.

You cannot enter the same client’s async context twice — this mirrors httpx semantics, where __aenter__ on an already-entered transport raises. Construct one client per async with block, or reuse a single long-lived client and never re-enter it.

Without a context manager

For short-lived scripts where you do not want a nested async with, you can construct the client, run your work, and close the async transport explicitly with aclose():

python
import asyncio

from geopera import AuthenticatedClient
from geopera.api.operations import catalog_search
from geopera.models import CatalogSearchInput


async def main():
    client = AuthenticatedClient(base_url="https://api.geopera.com", token="gpra_...")
    try:
        result = await catalog_search.asyncio(
            client=client,
            body=CatalogSearchInput(host_name="api.geopera.com", limit=10),
        )
        print(result)
    finally:
        await client.get_async_httpx_client().aclose()   # release the pool


asyncio.run(main())

async with is the recommended form because it guarantees the aclose() even on exceptions; the explicit form above is the manual equivalent.

Detailed responses

Use asyncio_detailed when you need the HTTP status code, response headers, or raw bytes, or want to branch on a typed error instead of getting back only the parsed body. It returns a Response object with these fields:

  • .status_code — an http.HTTPStatus (compares equal to its int value, e.g. resp.status_code == 200).
  • .parsed — the same typed value asyncio would have returned (success model, Problem, HTTPValidationError, or None).
  • .headers — the response headers as a mutable mapping.
  • .content — the raw response body as bytes.
python
from geopera.api.operations import catalog_search
from geopera.models import CatalogSearchInput, Problem, HTTPValidationError


async def search(client):
    resp = await catalog_search.asyncio_detailed(
        client=client,
        body=CatalogSearchInput(
            host_name="api.geopera.com",
            collections=["sentinel-2-l2a"],
            limit=10,
        ),
    )
    if resp.status_code == 200:
        return resp.parsed                      # CatalogSearchOutput
    if isinstance(resp.parsed, HTTPValidationError):
        # 422 — your request body failed validation
        raise ValueError(resp.parsed.detail)
    if isinstance(resp.parsed, Problem):
        # RFC 9457 problem+json — business/permission errors
        raise RuntimeError(f"{resp.parsed.title}: {resp.parsed.detail}")
    return None

By default, errors are returned as typed Problem or HTTPValidationError models rather than raised. Set raise_on_unexpected_status=True on the client to make undocumented status codes raise geopera.errors.UnexpectedStatus instead of returning None; documented error models are still returned, not raised. A request that exceeds the configured timeout raises httpx.TimeoutException regardless of that flag. See Errors for the problem schema.

Running operations concurrently

The payoff of async is concurrency. Share one entered client across many awaited calls and gather them — the connection pool fans the requests out without you managing threads.

python
import asyncio

from geopera import AuthenticatedClient
from geopera.api.operations import catalog_search
from geopera.models import CatalogSearchInput

COLLECTIONS = ["sentinel-2-l2a", "landsat-c2-l2", "sentinel-1-grd"]


async def search_collection(client, collection: str):
    return await catalog_search.asyncio(
        client=client,
        body=CatalogSearchInput(
            host_name="api.geopera.com",
            collections=[collection],
            limit=10,
        ),
    )


async def main():
    async with AuthenticatedClient(
        base_url="https://api.geopera.com",
        token="gpra_...",
    ) as client:
        results = await asyncio.gather(
            *(search_collection(client, c) for c in COLLECTIONS)
        )
        for collection, result in zip(COLLECTIONS, results):
            print(collection, "->", result)


asyncio.run(main())

Bounding concurrency

When you fan out widely, do not launch hundreds of awaits at once — you will trip the rate limits and exhaust the connection pool. Bound concurrency with an asyncio.Semaphore:

python
import asyncio

from geopera.api.operations import catalog_search
from geopera.models import CatalogSearchInput


async def bounded_search(sem, client, collection):
    async with sem:                              # at most N in flight
        return await catalog_search.asyncio(
            client=client,
            body=CatalogSearchInput(host_name="api.geopera.com", collections=[collection]),
        )


async def search_all(client, collections):
    sem = asyncio.Semaphore(8)
    return await asyncio.gather(
        *(bounded_search(sem, client, c) for c in collections)
    )

Handling per-task failures

asyncio.gather cancels the whole group on the first raised exception. If you want each task’s outcome independently, pass return_exceptions=True and inspect the results, or use asyncio_detailed so failures come back as typed Problem values rather than raised exceptions:

python
results = await asyncio.gather(
    *(search_collection(client, c) for c in COLLECTIONS),
    return_exceptions=True,
)
for collection, result in zip(COLLECTIONS, results):
    if isinstance(result, Exception):
        print(collection, "failed:", result)
    else:
        print(collection, "->", result)

Because the SDK returns documented errors as values rather than raising, the asyncio_detailed approach is usually cleaner: every task resolves to a Response, and you branch on .status_code / .parsed per result without return_exceptions.

Configuring the async client

Constructor knobs apply equally to the async transport, because both the sync and async httpx clients are built from the same constructor fields:

  • timeout — an httpx.Timeout; exceeding it raises httpx.TimeoutException.
  • verify_sslTrue in production; False, a CA path, or an ssl.SSLContext for testing.
  • follow_redirects — defaults to False.
  • headers / cookies — sent with every request.
  • httpx_args — a dict forwarded verbatim to the httpx.AsyncClient (and httpx.Client) constructor for anything the named kwargs do not cover.
  • raise_on_unexpected_status — keyword-only; controls whether undocumented status codes raise UnexpectedStatus.
python
import httpx

from geopera import AuthenticatedClient

client = AuthenticatedClient(
    base_url="https://api.geopera.com",
    token="gpra_...",
    timeout=httpx.Timeout(30.0),
    headers={"X-Request-Source": "my-app"},
    httpx_args={"http2": True},
    raise_on_unexpected_status=True,
)

The fluent helpers return a new client with the override applied via attrs.evolve, and also propagate the change to an already-constructed underlying client (both the sync and async httpx clients, if they exist):

  • client.with_headers({...}) — merge in additional headers.
  • client.with_cookies({...}) — merge in additional cookies.
  • client.with_timeout(httpx.Timeout(...)) — replace the timeout.
python
client = AuthenticatedClient(base_url="https://api.geopera.com", token="gpra_...")
client = client.with_headers({"X-Request-Source": "batch-job"})

Accessing the underlying async httpx client

The async transport is a standard httpx.AsyncClient. Two methods give you direct access, mirroring the sync get_httpx_client / set_httpx_client pair:

  • client.get_async_httpx_client() — return the underlying httpx.AsyncClient, constructing one on first call from the constructor fields. This is where the Authorization header is materialized for AuthenticatedClient. Use it to aclose() the transport explicitly, or to read pool state.
  • client.set_async_httpx_client(async_client) — supply your own pre-configured httpx.AsyncClient. This overrides every other setting on the client (base URL, cookies, headers, timeout), so you are responsible for the base URL and the auth header. It returns the client for chaining.

Reach for set_async_httpx_client when you need transport behavior the constructor does not expose — for example a custom transport with retries, an HTTP/2 client, or a shared connection-limit configuration.

python
import httpx

from geopera import AuthenticatedClient

client = AuthenticatedClient(base_url="https://api.geopera.com", token="gpra_...")

# Bring your own transport. NOTE: this overrides base_url, headers (incl. auth),
# cookies, and timeout — set them yourself.
client.set_async_httpx_client(
    httpx.AsyncClient(
        base_url="https://api.geopera.com",
        headers={"Authorization": "Bearer gpra_..."},
        limits=httpx.Limits(max_connections=20, max_keepalive_connections=10),
        timeout=httpx.Timeout(30.0),
        http2=True,
    )
)

The sync and async transports are independent: get_httpx_client() / set_httpx_client() manage the blocking httpx.Client, while get_async_httpx_client() / set_async_httpx_client() manage the awaitable httpx.AsyncClient. A single AuthenticatedClient can hold both, but most code uses only one.

Using the fluent client’s underlying client for async

The fluent Geopera client is sync-only, but it constructs and holds an AuthenticatedClient you can reuse for async operation modules. Build the fluent client once, read .client, and pass that to asyncio / asyncio_detailed:

python
import asyncio

from geopera import Geopera
from geopera.api.operations import catalog_search
from geopera.models import CatalogSearchInput

fluent = Geopera(token="gpra_...")              # sync facade
underlying = fluent.client                       # the AuthenticatedClient


async def main():
    async with underlying as client:             # enter the async pool
        result = await catalog_search.asyncio(
            client=client,
            body=CatalogSearchInput(host_name="api.geopera.com", limit=10),
        )
        print(result)


asyncio.run(main())

This lets one configured client back both styles: fluent.catalog.search({...}) for quick blocking calls and catalog_search.asyncio(client=fluent.client, ...) for concurrent ones. Just remember fluent.catalog.search(...) always blocks — there is no awaitable form of the fluent nodes.

Worked example: estimate tasking concurrently

A complete async flow that authenticates once, fans an estimate out concurrently, and handles errors with the detailed variant.

python
import asyncio

from geopera import AuthenticatedClient
from geopera.api.operations import orders_tasking_estimate
from geopera.models import Problem, HTTPValidationError, TaskingEstimateInput


async def estimate(client, body: TaskingEstimateInput):
    resp = await orders_tasking_estimate.asyncio_detailed(client=client, body=body)
    if resp.status_code == 200:
        return resp.parsed                      # typed estimate output
    if isinstance(resp.parsed, HTTPValidationError):
        raise ValueError(resp.parsed.detail)
    if isinstance(resp.parsed, Problem):
        raise RuntimeError(f"{resp.parsed.title}: {resp.parsed.detail}")
    raise RuntimeError(f"unexpected status {resp.status_code}")


async def main():
    async with AuthenticatedClient(
        base_url="https://api.geopera.com",
        token="gpra_...",
    ) as client:
        bodies = [
            TaskingEstimateInput(...),          # fill with your AOI / window
            TaskingEstimateInput(...),
        ]
        estimates = await asyncio.gather(
            *(estimate(client, body) for body in bodies),
            return_exceptions=True,
        )
        for body, est in zip(bodies, estimates):
            if isinstance(est, Exception):
                print("estimate failed:", est)
            else:
                print(est)


asyncio.run(main())

Replace TaskingEstimateInput(...) with the fields documented for that operation in the API reference. The operation module name is the operation id with dots replaced by underscores (orders.tasking.estimate becomes orders_tasking_estimate), which is the same convention the fluent client uses to resolve client.orders.tasking.estimate.

Gotchas

  • asyncio() must be awaited. Calling it without await returns a coroutine and makes no request; you will also get a “coroutine was never awaited” warning. This is the most common async mistake.
  • The fluent client is sync. client.catalog.search({...}) blocks. To await an operation you must use the operation module’s asyncio / asyncio_detailed, passing an AuthenticatedClient (which you can get from fluent.client).
  • Enter the context once. You cannot re-enter the same client’s async with block — httpx raises on a double __aenter__. Construct a fresh client or reuse the entered one.
  • One client, many calls. Reusing an entered client across awaited operations is the point — it keeps connections pooled. Do not create a new client per request; that defeats the pool and the keep-alive.
  • Bound your fan-out. asyncio.gather over a huge list launches every request at once. Use an asyncio.Semaphore and respect the rate limits.
  • gather fails fast. Without return_exceptions=True, the first raised exception cancels the rest. Use asyncio_detailed (errors come back as values) or return_exceptions=True for per-task outcomes.
  • set_async_httpx_client overrides everything. When you supply your own httpx.AsyncClient, the SDK no longer injects the base URL or the auth header — set them on your client yourself.
  • Sync and async transports are separate. Mixing sync() and await asyncio() on the same client is allowed but uses two distinct connection pools; close/aclose them independently.

Related

  • Configuration — every constructor knob shared by the sync and async transports.
  • Operations — how every capability is invoked by name.
  • Errors — the RFC 9457 problem+json schema returned by Problem.
  • Authentication — session and gpra_ API-key bearer tokens.
  • Rate limits — limits to respect when fanning out concurrently.