<!-- Source: https://docs.geopera.com/api-reference/sdks/python/async · Markdown for LLMs -->

# Async usage

Async is first-class in the Python SDK: every operation that ships a `sync` callable also ships an awaitable `asyncio` one with an identical signature, both backed by a pooled `httpx.AsyncClient` that lives on the same `AuthenticatedClient`.

The fluent `Geopera` client — `client.catalog.search({...})` — is synchronous: its nodes call the operation module's `sync`/`sync_detailed` under the hood and block. There is no `await client.catalog.search(...)`. To go async you drop one level down to the generated operation modules and call their `asyncio` / `asyncio_detailed` callables yourself, awaiting each one inside an event loop. This page documents that path end to end: the four callables, the async context manager, fanning out with `asyncio.gather`, and getting or setting the underlying `httpx.AsyncClient`. The fluent client is still useful in async code — its `.client` property hands you the same underlying `AuthenticatedClient` you pass to the operation modules.

## The four callables

The SDK is fully typed, so each module under `geopera.api.operations.<operation_id>` exposes four entry points with matching parameters. All four take `client` and `body` as keyword-only arguments:

| Callable                                 | Returns                                                                  | Blocking?       |
| ---------------------------------------- | ------------------------------------------------------------------------ | --------------- |
| `sync(client=..., body=...)`             | the typed success model, a typed error model, or `None`                  | yes             |
| `sync_detailed(client=..., body=...)`    | a `Response` wrapper (`.parsed`, `.status_code`, `.headers`, `.content`) | yes             |
| `asyncio(client=..., body=...)`          | the typed success model, a typed error model, or `None`                  | no — `await` it |
| `asyncio_detailed(client=..., body=...)` | a `Response` wrapper                                                     | no — `await` it |

Because the signatures match, porting code from sync to async means swapping `result = op.sync(...)` for `result = await op.asyncio(...)` and running inside an event loop. Operations are invoked by name (`catalog_search`, `orders_tasking_estimate`) — every operation is a POST, so there are no HTTP verbs to choose and no GET routes. See [Operations](/api-reference/operations) for the full model.

The undecorated return type is a union. For `catalog_search`, `asyncio` returns `CatalogSearchOutput | HTTPValidationError | Problem | None`:

- the typed success model on a documented success status,
- a typed `Problem` (RFC 9457 `problem+json`) for business or permission errors,
- a typed `HTTPValidationError` for a 422 request-validation failure,
- `None` if the body could not be parsed and `raise_on_unexpected_status` is off.

`asyncio_detailed` wraps that same value in `.parsed` and adds `.status_code`, `.headers`, and `.content`.

## The async context manager

`AuthenticatedClient` (and the unauthenticated `Client`) implement both the sync (`with`) and async (`async with`) context-manager protocols. For async work, prefer `async with`: it enters the underlying `httpx.AsyncClient`'s context once, lets the connection pool be reused across every awaited call inside the block, and closes the pool cleanly on exit.

```python
import asyncio

from geopera import AuthenticatedClient
from geopera.api.operations import catalog_search
from geopera.models import CatalogSearchInput


async def main():
    async with AuthenticatedClient(
        base_url="https://api.geopera.com",
        token="gpra_...",          # Geopera API key or browser session token
    ) as client:
        result = await catalog_search.asyncio(
            client=client,
            body=CatalogSearchInput(
                host_name="api.geopera.com",
                collections=["sentinel-2-l2a"],
                limit=10,
            ),
        )
        print(result)              # -> CatalogSearchOutput (typed)


asyncio.run(main())
```

The `token` is the bearer credential, sent as `Authorization: Bearer <token>`. It is either a minted API key with the `gpra_` prefix or a session token obtained by signing in to Geopera through the browser — both are passed as `token` and used the same way. There is no token-exchange, refresh, or grant step for an API key: the key _is_ the bearer token. See [Authentication](/api-reference/authentication) for how the SDK builds the header.

### Why use the context manager

You can call `asyncio()` against a client you never entered — the SDK lazily constructs an `httpx.AsyncClient` on first use via `get_async_httpx_client()`. But without `async with`, that client is never explicitly closed, so its connection pool leaks until garbage collection and you may see "Unclosed client session" warnings. Entering the context manager once and reusing the client for the lifetime of your work is both faster (pooled, keep-alive connections) and clean.

You cannot enter the same client's async context twice — this mirrors `httpx` semantics, where `__aenter__` on an already-entered transport raises. Construct one client per `async with` block, or reuse a single long-lived client and never re-enter it.

### Without a context manager

For short-lived scripts where you do not want a nested `async with`, you can construct the client, run your work, and close the async transport explicitly with `aclose()`:

```python
import asyncio

from geopera import AuthenticatedClient
from geopera.api.operations import catalog_search
from geopera.models import CatalogSearchInput


async def main():
    client = AuthenticatedClient(base_url="https://api.geopera.com", token="gpra_...")
    try:
        result = await catalog_search.asyncio(
            client=client,
            body=CatalogSearchInput(host_name="api.geopera.com", limit=10),
        )
        print(result)
    finally:
        await client.get_async_httpx_client().aclose()   # release the pool


asyncio.run(main())
```

`async with` is the recommended form because it guarantees the `aclose()` even on exceptions; the explicit form above is the manual equivalent.

## Detailed responses

Use `asyncio_detailed` when you need the HTTP status code, response headers, or raw bytes, or want to branch on a typed error instead of getting back only the parsed body. It returns a `Response` object with these fields:

- `.status_code` — an `http.HTTPStatus` (compares equal to its `int` value, e.g. `resp.status_code == 200`).
- `.parsed` — the same typed value `asyncio` would have returned (success model, `Problem`, `HTTPValidationError`, or `None`).
- `.headers` — the response headers as a mutable mapping.
- `.content` — the raw response body as `bytes`.

```python
from geopera.api.operations import catalog_search
from geopera.models import CatalogSearchInput, Problem, HTTPValidationError


async def search(client):
    resp = await catalog_search.asyncio_detailed(
        client=client,
        body=CatalogSearchInput(
            host_name="api.geopera.com",
            collections=["sentinel-2-l2a"],
            limit=10,
        ),
    )
    if resp.status_code == 200:
        return resp.parsed                      # CatalogSearchOutput
    if isinstance(resp.parsed, HTTPValidationError):
        # 422 — your request body failed validation
        raise ValueError(resp.parsed.detail)
    if isinstance(resp.parsed, Problem):
        # RFC 9457 problem+json — business/permission errors
        raise RuntimeError(f"{resp.parsed.title}: {resp.parsed.detail}")
    return None
```

By default, errors are returned as typed `Problem` or `HTTPValidationError` models rather than raised. Set `raise_on_unexpected_status=True` on the client to make _undocumented_ status codes raise `geopera.errors.UnexpectedStatus` instead of returning `None`; documented error models are still returned, not raised. A request that exceeds the configured timeout raises `httpx.TimeoutException` regardless of that flag. See [Errors](/api-reference/errors) for the problem schema.

## Running operations concurrently

The payoff of async is concurrency. Share one entered client across many awaited calls and gather them — the connection pool fans the requests out without you managing threads.

```python
import asyncio

from geopera import AuthenticatedClient
from geopera.api.operations import catalog_search
from geopera.models import CatalogSearchInput

COLLECTIONS = ["sentinel-2-l2a", "landsat-c2-l2", "sentinel-1-grd"]


async def search_collection(client, collection: str):
    return await catalog_search.asyncio(
        client=client,
        body=CatalogSearchInput(
            host_name="api.geopera.com",
            collections=[collection],
            limit=10,
        ),
    )


async def main():
    async with AuthenticatedClient(
        base_url="https://api.geopera.com",
        token="gpra_...",
    ) as client:
        results = await asyncio.gather(
            *(search_collection(client, c) for c in COLLECTIONS)
        )
        for collection, result in zip(COLLECTIONS, results):
            print(collection, "->", result)


asyncio.run(main())


```

### Bounding concurrency

When you fan out widely, do not launch hundreds of awaits at once — you will trip the [rate limits](/api-reference/rate-limits) and exhaust the connection pool. Bound concurrency with an `asyncio.Semaphore`:

```python
import asyncio

from geopera.api.operations import catalog_search
from geopera.models import CatalogSearchInput


async def bounded_search(sem, client, collection):
    async with sem:                              # at most N in flight
        return await catalog_search.asyncio(
            client=client,
            body=CatalogSearchInput(host_name="api.geopera.com", collections=[collection]),
        )


async def search_all(client, collections):
    sem = asyncio.Semaphore(8)
    return await asyncio.gather(
        *(bounded_search(sem, client, c) for c in collections)
    )
```

### Handling per-task failures

`asyncio.gather` cancels the whole group on the first raised exception. If you want each task's outcome independently, pass `return_exceptions=True` and inspect the results, or use `asyncio_detailed` so failures come back as typed `Problem` values rather than raised exceptions:

```python
results = await asyncio.gather(
    *(search_collection(client, c) for c in COLLECTIONS),
    return_exceptions=True,
)
for collection, result in zip(COLLECTIONS, results):
    if isinstance(result, Exception):
        print(collection, "failed:", result)
    else:
        print(collection, "->", result)
```

Because the SDK returns documented errors as values rather than raising, the `asyncio_detailed` approach is usually cleaner: every task resolves to a `Response`, and you branch on `.status_code` / `.parsed` per result without `return_exceptions`.

## Configuring the async client

Constructor knobs apply equally to the async transport, because both the sync and async `httpx` clients are built from the same constructor fields:

- `timeout` — an `httpx.Timeout`; exceeding it raises `httpx.TimeoutException`.
- `verify_ssl` — `True` in production; `False`, a CA path, or an `ssl.SSLContext` for testing.
- `follow_redirects` — defaults to `False`.
- `headers` / `cookies` — sent with every request.
- `httpx_args` — a dict forwarded verbatim to the `httpx.AsyncClient` (and `httpx.Client`) constructor for anything the named kwargs do not cover.
- `raise_on_unexpected_status` — keyword-only; controls whether undocumented status codes raise `UnexpectedStatus`.

```python
import httpx

from geopera import AuthenticatedClient

client = AuthenticatedClient(
    base_url="https://api.geopera.com",
    token="gpra_...",
    timeout=httpx.Timeout(30.0),
    headers={"X-Request-Source": "my-app"},
    httpx_args={"http2": True},
    raise_on_unexpected_status=True,
)
```

The fluent helpers return a new client with the override applied via `attrs.evolve`, and also propagate the change to an already-constructed underlying client (both the sync and async `httpx` clients, if they exist):

- `client.with_headers({...})` — merge in additional headers.
- `client.with_cookies({...})` — merge in additional cookies.
- `client.with_timeout(httpx.Timeout(...))` — replace the timeout.

```python
client = AuthenticatedClient(base_url="https://api.geopera.com", token="gpra_...")
client = client.with_headers({"X-Request-Source": "batch-job"})
```

## Accessing the underlying async httpx client

The async transport is a standard `httpx.AsyncClient`. Two methods give you direct access, mirroring the sync `get_httpx_client` / `set_httpx_client` pair:

- `client.get_async_httpx_client()` — return the underlying `httpx.AsyncClient`, constructing one on first call from the constructor fields. This is where the `Authorization` header is materialized for `AuthenticatedClient`. Use it to `aclose()` the transport explicitly, or to read pool state.
- `client.set_async_httpx_client(async_client)` — supply your own pre-configured `httpx.AsyncClient`. This overrides every other setting on the client (base URL, cookies, headers, timeout), so you are responsible for the base URL and the auth header. It returns the client for chaining.

Reach for `set_async_httpx_client` when you need transport behavior the constructor does not expose — for example a custom transport with retries, an HTTP/2 client, or a shared connection-limit configuration.

```python
import httpx

from geopera import AuthenticatedClient

client = AuthenticatedClient(base_url="https://api.geopera.com", token="gpra_...")

# Bring your own transport. NOTE: this overrides base_url, headers (incl. auth),
# cookies, and timeout — set them yourself.
client.set_async_httpx_client(
    httpx.AsyncClient(
        base_url="https://api.geopera.com",
        headers={"Authorization": "Bearer gpra_..."},
        limits=httpx.Limits(max_connections=20, max_keepalive_connections=10),
        timeout=httpx.Timeout(30.0),
        http2=True,
    )
)
```

The sync and async transports are independent: `get_httpx_client()` / `set_httpx_client()` manage the blocking `httpx.Client`, while `get_async_httpx_client()` / `set_async_httpx_client()` manage the awaitable `httpx.AsyncClient`. A single `AuthenticatedClient` can hold both, but most code uses only one.

## Using the fluent client's underlying client for async

The fluent `Geopera` client is sync-only, but it constructs and holds an `AuthenticatedClient` you can reuse for async operation modules. Build the fluent client once, read `.client`, and pass that to `asyncio` / `asyncio_detailed`:

```python
import asyncio

from geopera import Geopera
from geopera.api.operations import catalog_search
from geopera.models import CatalogSearchInput

fluent = Geopera(token="gpra_...")              # sync facade
underlying = fluent.client                       # the AuthenticatedClient


async def main():
    async with underlying as client:             # enter the async pool
        result = await catalog_search.asyncio(
            client=client,
            body=CatalogSearchInput(host_name="api.geopera.com", limit=10),
        )
        print(result)


asyncio.run(main())
```

This lets one configured client back both styles: `fluent.catalog.search({...})` for quick blocking calls and `catalog_search.asyncio(client=fluent.client, ...)` for concurrent ones. Just remember `fluent.catalog.search(...)` always blocks — there is no awaitable form of the fluent nodes.

## Worked example: estimate tasking concurrently

A complete async flow that authenticates once, fans an estimate out concurrently, and handles errors with the detailed variant.

```python
import asyncio

from geopera import AuthenticatedClient
from geopera.api.operations import orders_tasking_estimate
from geopera.models import Problem, HTTPValidationError, TaskingEstimateInput


async def estimate(client, body: TaskingEstimateInput):
    resp = await orders_tasking_estimate.asyncio_detailed(client=client, body=body)
    if resp.status_code == 200:
        return resp.parsed                      # typed estimate output
    if isinstance(resp.parsed, HTTPValidationError):
        raise ValueError(resp.parsed.detail)
    if isinstance(resp.parsed, Problem):
        raise RuntimeError(f"{resp.parsed.title}: {resp.parsed.detail}")
    raise RuntimeError(f"unexpected status {resp.status_code}")


async def main():
    async with AuthenticatedClient(
        base_url="https://api.geopera.com",
        token="gpra_...",
    ) as client:
        bodies = [
            TaskingEstimateInput(...),          # fill with your AOI / window
            TaskingEstimateInput(...),
        ]
        estimates = await asyncio.gather(
            *(estimate(client, body) for body in bodies),
            return_exceptions=True,
        )
        for body, est in zip(bodies, estimates):
            if isinstance(est, Exception):
                print("estimate failed:", est)
            else:
                print(est)


asyncio.run(main())
```

Replace `TaskingEstimateInput(...)` with the fields documented for that operation in the [API reference](/api-reference/operations). The operation module name is the operation id with dots replaced by underscores (`orders.tasking.estimate` becomes `orders_tasking_estimate`), which is the same convention the fluent client uses to resolve `client.orders.tasking.estimate`.

## Gotchas

- **`asyncio()` must be awaited.** Calling it without `await` returns a coroutine and makes no request; you will also get a "coroutine was never awaited" warning. This is the most common async mistake.
- **The fluent client is sync.** `client.catalog.search({...})` blocks. To await an operation you must use the operation module's `asyncio` / `asyncio_detailed`, passing an `AuthenticatedClient` (which you can get from `fluent.client`).
- **Enter the context once.** You cannot re-enter the same client's `async with` block — `httpx` raises on a double `__aenter__`. Construct a fresh client or reuse the entered one.
- **One client, many calls.** Reusing an entered client across awaited operations is the point — it keeps connections pooled. Do not create a new client per request; that defeats the pool and the keep-alive.
- **Bound your fan-out.** `asyncio.gather` over a huge list launches every request at once. Use an `asyncio.Semaphore` and respect the [rate limits](/api-reference/rate-limits).
- **`gather` fails fast.** Without `return_exceptions=True`, the first raised exception cancels the rest. Use `asyncio_detailed` (errors come back as values) or `return_exceptions=True` for per-task outcomes.
- **`set_async_httpx_client` overrides everything.** When you supply your own `httpx.AsyncClient`, the SDK no longer injects the base URL or the auth header — set them on your client yourself.
- **Sync and async transports are separate.** Mixing `sync()` and `await asyncio()` on the same client is allowed but uses two distinct connection pools; `close`/`aclose` them independently.

## Related

- [Configuration](/api-reference/sdks/python/configuration) — every constructor knob shared by the sync and async transports.
- [Operations](/api-reference/operations) — how every capability is invoked by name.
- [Errors](/api-reference/errors) — the RFC 9457 `problem+json` schema returned by `Problem`.
- [Authentication](/api-reference/authentication) — session and `gpra_` API-key bearer tokens.
- [Rate limits](/api-reference/rate-limits) — limits to respect when fanning out concurrently.
