Using Asyncio with Elasticsearch¶
Starting in elasticsearch-py
v7.8.0 for Python 3.6+ the elasticsearch
package supports async/await with
Asyncio and Aiohttp.
You can either install aiohttp
directly or use the [async]
extra:
$ python -m pip install elasticsearch>=7.8.0 aiohttp # - OR - $ python -m pip install elasticsearch[async]>=7.8.0Note
Async functionality is a new feature of this library in v7.8.0+ so please open an issue if you find an issue or have a question about async support.
Getting Started with Async¶
After installation all async API endpoints are available via AsyncElasticsearch
and are used in the same way as other APIs, just with an extra await
:
import asyncio from elasticsearch import AsyncElasticsearch es = AsyncElasticsearch() async def main(): resp = await es.search( index="documents", query={"match_all": {}}, size=20, ) print(resp) loop = asyncio.get_event_loop() loop.run_until_complete(main())
All APIs that are available under the sync client are also available under the async client.
ASGI Applications and Elastic APM¶
ASGI (Asynchronous Server Gateway Interface) is a new way to
serve Python web applications making use of async I/O to achieve better performance.
Some examples of ASGI frameworks include FastAPI, Django 3.0+, and Starlette.
If you’re using one of these frameworks along with Elasticsearch then you
should be using AsyncElasticsearch
to avoid blocking
the event loop with synchronous network calls for optimal performance.
Elastic APM
also supports tracing of async Elasticsearch queries just the same as
synchronous queries. For an example on how to configure AsyncElasticsearch
with
a popular ASGI framework FastAPI and APM tracing
there is a pre-built example
in the examples/fastapi-apm
directory.
Frequently Asked Questions¶
NameError / ImportError when importing AsyncElasticsearch
?¶
If when trying to use AsyncElasticsearch
and you’re receiving a NameError
or ImportError
you should ensure that you’re running Python 3.6+ (check with $ python --version
) and
that you have aiohttp
installed in your environment (check with $ python -m pip freeze | grep aiohttp
).
If either of the above conditions is not met then async support won’t be available.
What about the elasticsearch-async
package?¶
Previously asyncio was supported separately via the elasticsearch-async
package. The elasticsearch-async
package has been deprecated in favor of
AsyncElasticsearch
provided by the elasticsearch
package
in v7.8 and onwards.
Receiving ‘Unclosed client session / connector’ warning?¶
This warning is created by aiohttp
when an open HTTP connection is
garbage collected. You’ll typically run into this when closing your application.
To resolve the issue ensure that close()
is called before the AsyncElasticsearch
instance is garbage collected.
For example if using FastAPI that might look like this:
from fastapi import FastAPI from elasticsearch import AsyncElasticsearch app = FastAPI() es = AsyncElasticsearch() # This gets called once the app is shutting down. @app.on_event("shutdown") async def app_shutdown(): await es.close()
Async Helpers¶
Async variants of all helpers are available in elasticsearch.helpers
and are all prefixed with async_*
. You’ll notice that these APIs
are identical to the ones in the sync Helpers documentation.
All async helpers that accept an iterator or generator also accept async iterators and async generators.
Bulk and Streaming Bulk¶
import asyncio from elasticsearch import AsyncElasticsearch from elasticsearch.helpers import async_bulk es = AsyncElasticsearch() async def gendata(): mywords = ['foo', 'bar', 'baz'] for word in mywords: yield { "_index": "mywords", "doc": {"word": word}, } async def main(): await async_bulk(es, gendata()) loop = asyncio.get_event_loop() loop.run_until_complete(main())import asyncio from elasticsearch import AsyncElasticsearch from elasticsearch.helpers import async_streaming_bulk es = AsyncElasticsearch() async def gendata(): mywords = ['foo', 'bar', 'baz'] for word in mywords: yield { "_index": "mywords", "word": word, } async def main(): async for ok, result in async_streaming_bulk(es, gendata()): action, result = result.popitem() if not ok: print("failed to %s document %s" % ()) loop = asyncio.get_event_loop() loop.run_until_complete(main())
Scan¶
import asyncio from elasticsearch import AsyncElasticsearch from elasticsearch.helpers import async_scan es = AsyncElasticsearch() async def main(): async for doc in async_scan( client=es, query={"query": {"match": {"title": "python"}}}, index="orders-*" ): print(doc) loop = asyncio.get_event_loop() loop.run_until_complete(main())
Reindex¶
API Reference¶
The API of AsyncElasticsearch
is nearly identical
to the API of Elasticsearch
with the exception that
every API call like search()
is
an async
function and requires an await
to properly return the response
body.
AsyncElasticsearch¶
Note
To reference Elasticsearch APIs that are namespaced like
.indices.create()
refer to the sync API reference. These APIs are identical between sync and async.