Async usage
Async is first-class in the Python SDK: every operation that ships a sync callable also ships an awaitable asyncio one with an identical signature, both backed by a pooled httpx.AsyncClient that lives on the same AuthenticatedClient.
The fluent Geopera client — client.catalog.search({...}) — is synchronous: its nodes call the operation module’s sync/sync_detailed under the hood and block. There is no await client.catalog.search(...). To go async you drop one level down to the generated operation modules and call their asyncio / asyncio_detailed callables yourself, awaiting each one inside an event loop. This page documents that path end to end: the four callables, the async context manager, fanning out with asyncio.gather, and getting or setting the underlying httpx.AsyncClient. The fluent client is still useful in async code — its .client property hands you the same underlying AuthenticatedClient you pass to the operation modules.
The four callables
The SDK is fully typed, so each module under geopera.api.operations.<operation_id> exposes four entry points with matching parameters. All four take client and body as keyword-only arguments:
| Callable | Returns | Blocking? |
|---|---|---|
sync(client=..., body=...) | the typed success model, a typed error model, or None | yes |
sync_detailed(client=..., body=...) | a Response wrapper (.parsed, .status_code, .headers, .content) | yes |
asyncio(client=..., body=...) | the typed success model, a typed error model, or None | no — await it |
asyncio_detailed(client=..., body=...) | a Response wrapper | no — await it |
Because the signatures match, porting code from sync to async means swapping result = op.sync(...) for result = await op.asyncio(...) and running inside an event loop. Operations are invoked by name (catalog_search, orders_tasking_estimate) — every operation is a POST, so there are no HTTP verbs to choose and no GET routes. See Operations for the full model.
The undecorated return type is a union. For catalog_search, asyncio returns CatalogSearchOutput | HTTPValidationError | Problem | None:
- the typed success model on a documented success status,
- a typed
Problem(RFC 9457problem+json) for business or permission errors, - a typed
HTTPValidationErrorfor a 422 request-validation failure, Noneif the body could not be parsed andraise_on_unexpected_statusis off.
asyncio_detailed wraps that same value in .parsed and adds .status_code, .headers, and .content.
The async context manager
AuthenticatedClient (and the unauthenticated Client) implement both the sync (with) and async (async with) context-manager protocols. For async work, prefer async with: it enters the underlying httpx.AsyncClient’s context once, lets the connection pool be reused across every awaited call inside the block, and closes the pool cleanly on exit.
import asyncio
from geopera import AuthenticatedClient
from geopera.api.operations import catalog_search
from geopera.models import CatalogSearchInput
async def main():
async with AuthenticatedClient(
base_url="https://api.geopera.com",
token="gpra_...", # Geopera API key or browser session token
) as client:
result = await catalog_search.asyncio(
client=client,
body=CatalogSearchInput(
host_name="api.geopera.com",
collections=["sentinel-2-l2a"],
limit=10,
),
)
print(result) # -> CatalogSearchOutput (typed)
asyncio.run(main())The token is the bearer credential, sent as Authorization: Bearer <token>. It is either a minted API key with the gpra_ prefix or a session token obtained by signing in to Geopera through the browser — both are passed as token and used the same way. There is no token-exchange, refresh, or grant step for an API key: the key is the bearer token. See Authentication for how the SDK builds the header.
Why use the context manager
You can call asyncio() against a client you never entered — the SDK lazily constructs an httpx.AsyncClient on first use via get_async_httpx_client(). But without async with, that client is never explicitly closed, so its connection pool leaks until garbage collection and you may see “Unclosed client session” warnings. Entering the context manager once and reusing the client for the lifetime of your work is both faster (pooled, keep-alive connections) and clean.
You cannot enter the same client’s async context twice — this mirrors httpx semantics, where __aenter__ on an already-entered transport raises. Construct one client per async with block, or reuse a single long-lived client and never re-enter it.
Without a context manager
For short-lived scripts where you do not want a nested async with, you can construct the client, run your work, and close the async transport explicitly with aclose():
import asyncio
from geopera import AuthenticatedClient
from geopera.api.operations import catalog_search
from geopera.models import CatalogSearchInput
async def main():
client = AuthenticatedClient(base_url="https://api.geopera.com", token="gpra_...")
try:
result = await catalog_search.asyncio(
client=client,
body=CatalogSearchInput(host_name="api.geopera.com", limit=10),
)
print(result)
finally:
await client.get_async_httpx_client().aclose() # release the pool
asyncio.run(main())async with is the recommended form because it guarantees the aclose() even on exceptions; the explicit form above is the manual equivalent.
Detailed responses
Use asyncio_detailed when you need the HTTP status code, response headers, or raw bytes, or want to branch on a typed error instead of getting back only the parsed body. It returns a Response object with these fields:
.status_code— anhttp.HTTPStatus(compares equal to itsintvalue, e.g.resp.status_code == 200)..parsed— the same typed valueasynciowould have returned (success model,Problem,HTTPValidationError, orNone)..headers— the response headers as a mutable mapping..content— the raw response body asbytes.
from geopera.api.operations import catalog_search
from geopera.models import CatalogSearchInput, Problem, HTTPValidationError
async def search(client):
resp = await catalog_search.asyncio_detailed(
client=client,
body=CatalogSearchInput(
host_name="api.geopera.com",
collections=["sentinel-2-l2a"],
limit=10,
),
)
if resp.status_code == 200:
return resp.parsed # CatalogSearchOutput
if isinstance(resp.parsed, HTTPValidationError):
# 422 — your request body failed validation
raise ValueError(resp.parsed.detail)
if isinstance(resp.parsed, Problem):
# RFC 9457 problem+json — business/permission errors
raise RuntimeError(f"{resp.parsed.title}: {resp.parsed.detail}")
return NoneBy default, errors are returned as typed Problem or HTTPValidationError models rather than raised. Set raise_on_unexpected_status=True on the client to make undocumented status codes raise geopera.errors.UnexpectedStatus instead of returning None; documented error models are still returned, not raised. A request that exceeds the configured timeout raises httpx.TimeoutException regardless of that flag. See Errors for the problem schema.
Running operations concurrently
The payoff of async is concurrency. Share one entered client across many awaited calls and gather them — the connection pool fans the requests out without you managing threads.
import asyncio
from geopera import AuthenticatedClient
from geopera.api.operations import catalog_search
from geopera.models import CatalogSearchInput
COLLECTIONS = ["sentinel-2-l2a", "landsat-c2-l2", "sentinel-1-grd"]
async def search_collection(client, collection: str):
return await catalog_search.asyncio(
client=client,
body=CatalogSearchInput(
host_name="api.geopera.com",
collections=[collection],
limit=10,
),
)
async def main():
async with AuthenticatedClient(
base_url="https://api.geopera.com",
token="gpra_...",
) as client:
results = await asyncio.gather(
*(search_collection(client, c) for c in COLLECTIONS)
)
for collection, result in zip(COLLECTIONS, results):
print(collection, "->", result)
asyncio.run(main())
Bounding concurrency
When you fan out widely, do not launch hundreds of awaits at once — you will trip the rate limits and exhaust the connection pool. Bound concurrency with an asyncio.Semaphore:
import asyncio
from geopera.api.operations import catalog_search
from geopera.models import CatalogSearchInput
async def bounded_search(sem, client, collection):
async with sem: # at most N in flight
return await catalog_search.asyncio(
client=client,
body=CatalogSearchInput(host_name="api.geopera.com", collections=[collection]),
)
async def search_all(client, collections):
sem = asyncio.Semaphore(8)
return await asyncio.gather(
*(bounded_search(sem, client, c) for c in collections)
)Handling per-task failures
asyncio.gather cancels the whole group on the first raised exception. If you want each task’s outcome independently, pass return_exceptions=True and inspect the results, or use asyncio_detailed so failures come back as typed Problem values rather than raised exceptions:
results = await asyncio.gather(
*(search_collection(client, c) for c in COLLECTIONS),
return_exceptions=True,
)
for collection, result in zip(COLLECTIONS, results):
if isinstance(result, Exception):
print(collection, "failed:", result)
else:
print(collection, "->", result)Because the SDK returns documented errors as values rather than raising, the asyncio_detailed approach is usually cleaner: every task resolves to a Response, and you branch on .status_code / .parsed per result without return_exceptions.
Configuring the async client
Constructor knobs apply equally to the async transport, because both the sync and async httpx clients are built from the same constructor fields:
timeout— anhttpx.Timeout; exceeding it raiseshttpx.TimeoutException.verify_ssl—Truein production;False, a CA path, or anssl.SSLContextfor testing.follow_redirects— defaults toFalse.headers/cookies— sent with every request.httpx_args— a dict forwarded verbatim to thehttpx.AsyncClient(andhttpx.Client) constructor for anything the named kwargs do not cover.raise_on_unexpected_status— keyword-only; controls whether undocumented status codes raiseUnexpectedStatus.
import httpx
from geopera import AuthenticatedClient
client = AuthenticatedClient(
base_url="https://api.geopera.com",
token="gpra_...",
timeout=httpx.Timeout(30.0),
headers={"X-Request-Source": "my-app"},
httpx_args={"http2": True},
raise_on_unexpected_status=True,
)The fluent helpers return a new client with the override applied via attrs.evolve, and also propagate the change to an already-constructed underlying client (both the sync and async httpx clients, if they exist):
client.with_headers({...})— merge in additional headers.client.with_cookies({...})— merge in additional cookies.client.with_timeout(httpx.Timeout(...))— replace the timeout.
client = AuthenticatedClient(base_url="https://api.geopera.com", token="gpra_...")
client = client.with_headers({"X-Request-Source": "batch-job"})Accessing the underlying async httpx client
The async transport is a standard httpx.AsyncClient. Two methods give you direct access, mirroring the sync get_httpx_client / set_httpx_client pair:
client.get_async_httpx_client()— return the underlyinghttpx.AsyncClient, constructing one on first call from the constructor fields. This is where theAuthorizationheader is materialized forAuthenticatedClient. Use it toaclose()the transport explicitly, or to read pool state.client.set_async_httpx_client(async_client)— supply your own pre-configuredhttpx.AsyncClient. This overrides every other setting on the client (base URL, cookies, headers, timeout), so you are responsible for the base URL and the auth header. It returns the client for chaining.
Reach for set_async_httpx_client when you need transport behavior the constructor does not expose — for example a custom transport with retries, an HTTP/2 client, or a shared connection-limit configuration.
import httpx
from geopera import AuthenticatedClient
client = AuthenticatedClient(base_url="https://api.geopera.com", token="gpra_...")
# Bring your own transport. NOTE: this overrides base_url, headers (incl. auth),
# cookies, and timeout — set them yourself.
client.set_async_httpx_client(
httpx.AsyncClient(
base_url="https://api.geopera.com",
headers={"Authorization": "Bearer gpra_..."},
limits=httpx.Limits(max_connections=20, max_keepalive_connections=10),
timeout=httpx.Timeout(30.0),
http2=True,
)
)The sync and async transports are independent: get_httpx_client() / set_httpx_client() manage the blocking httpx.Client, while get_async_httpx_client() / set_async_httpx_client() manage the awaitable httpx.AsyncClient. A single AuthenticatedClient can hold both, but most code uses only one.
Using the fluent client’s underlying client for async
The fluent Geopera client is sync-only, but it constructs and holds an AuthenticatedClient you can reuse for async operation modules. Build the fluent client once, read .client, and pass that to asyncio / asyncio_detailed:
import asyncio
from geopera import Geopera
from geopera.api.operations import catalog_search
from geopera.models import CatalogSearchInput
fluent = Geopera(token="gpra_...") # sync facade
underlying = fluent.client # the AuthenticatedClient
async def main():
async with underlying as client: # enter the async pool
result = await catalog_search.asyncio(
client=client,
body=CatalogSearchInput(host_name="api.geopera.com", limit=10),
)
print(result)
asyncio.run(main())This lets one configured client back both styles: fluent.catalog.search({...}) for quick blocking calls and catalog_search.asyncio(client=fluent.client, ...) for concurrent ones. Just remember fluent.catalog.search(...) always blocks — there is no awaitable form of the fluent nodes.
Worked example: estimate tasking concurrently
A complete async flow that authenticates once, fans an estimate out concurrently, and handles errors with the detailed variant.
import asyncio
from geopera import AuthenticatedClient
from geopera.api.operations import orders_tasking_estimate
from geopera.models import Problem, HTTPValidationError, TaskingEstimateInput
async def estimate(client, body: TaskingEstimateInput):
resp = await orders_tasking_estimate.asyncio_detailed(client=client, body=body)
if resp.status_code == 200:
return resp.parsed # typed estimate output
if isinstance(resp.parsed, HTTPValidationError):
raise ValueError(resp.parsed.detail)
if isinstance(resp.parsed, Problem):
raise RuntimeError(f"{resp.parsed.title}: {resp.parsed.detail}")
raise RuntimeError(f"unexpected status {resp.status_code}")
async def main():
async with AuthenticatedClient(
base_url="https://api.geopera.com",
token="gpra_...",
) as client:
bodies = [
TaskingEstimateInput(...), # fill with your AOI / window
TaskingEstimateInput(...),
]
estimates = await asyncio.gather(
*(estimate(client, body) for body in bodies),
return_exceptions=True,
)
for body, est in zip(bodies, estimates):
if isinstance(est, Exception):
print("estimate failed:", est)
else:
print(est)
asyncio.run(main())Replace TaskingEstimateInput(...) with the fields documented for that operation in the API reference. The operation module name is the operation id with dots replaced by underscores (orders.tasking.estimate becomes orders_tasking_estimate), which is the same convention the fluent client uses to resolve client.orders.tasking.estimate.
Gotchas
asyncio()must be awaited. Calling it withoutawaitreturns a coroutine and makes no request; you will also get a “coroutine was never awaited” warning. This is the most common async mistake.- The fluent client is sync.
client.catalog.search({...})blocks. To await an operation you must use the operation module’sasyncio/asyncio_detailed, passing anAuthenticatedClient(which you can get fromfluent.client). - Enter the context once. You cannot re-enter the same client’s
async withblock —httpxraises on a double__aenter__. Construct a fresh client or reuse the entered one. - One client, many calls. Reusing an entered client across awaited operations is the point — it keeps connections pooled. Do not create a new client per request; that defeats the pool and the keep-alive.
- Bound your fan-out.
asyncio.gatherover a huge list launches every request at once. Use anasyncio.Semaphoreand respect the rate limits. gatherfails fast. Withoutreturn_exceptions=True, the first raised exception cancels the rest. Useasyncio_detailed(errors come back as values) orreturn_exceptions=Truefor per-task outcomes.set_async_httpx_clientoverrides everything. When you supply your ownhttpx.AsyncClient, the SDK no longer injects the base URL or the auth header — set them on your client yourself.- Sync and async transports are separate. Mixing
sync()andawait asyncio()on the same client is allowed but uses two distinct connection pools;close/aclosethem independently.
Related
- Configuration — every constructor knob shared by the sync and async transports.
- Operations — how every capability is invoked by name.
- Errors — the RFC 9457
problem+jsonschema returned byProblem. - Authentication — session and
gpra_API-key bearer tokens. - Rate limits — limits to respect when fanning out concurrently.