Skip to content

Commit d053451

Browse files
Add support for bulk document operations (#1864) (#1871)
* Add support for bulk document operations * Apply suggestions from code review Co-authored-by: Quentin Pradet <[email protected]> --------- Co-authored-by: Quentin Pradet <[email protected]> (cherry picked from commit d81786c) Co-authored-by: Miguel Grinberg <[email protected]>
1 parent 0939ec4 commit d053451

File tree

4 files changed

+344
-4
lines changed

4 files changed

+344
-4
lines changed

Diff for: elasticsearch_dsl/_async/document.py

+82-1
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,20 @@
1616
# under the License.
1717

1818
import collections.abc
19-
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast
19+
from typing import (
20+
TYPE_CHECKING,
21+
Any,
22+
AsyncIterable,
23+
Dict,
24+
List,
25+
Optional,
26+
Tuple,
27+
Union,
28+
cast,
29+
)
2030

2131
from elasticsearch.exceptions import NotFoundError, RequestError
32+
from elasticsearch.helpers import async_bulk
2233
from typing_extensions import Self, dataclass_transform
2334

2435
from .._async.index import AsyncIndex
@@ -438,3 +449,73 @@ async def save(
438449
setattr(self.meta, k, meta["_" + k])
439450

440451
return meta if return_doc_meta else meta["result"]
452+
453+
@classmethod
454+
async def bulk(
455+
cls,
456+
actions: AsyncIterable[Union[Self, Dict[str, Any]]],
457+
using: Optional[AsyncUsingType] = None,
458+
index: Optional[str] = None,
459+
validate: bool = True,
460+
skip_empty: bool = True,
461+
**kwargs: Any,
462+
) -> Tuple[int, Union[int, List[Any]]]:
463+
"""
464+
Allows to perform multiple indexing operations in a single request.
465+
466+
:arg actions: a generator that returns document instances to be indexed,
467+
bulk operation dictionaries.
468+
:arg using: connection alias to use, defaults to ``'default'``
469+
:arg index: Elasticsearch index to use, if the ``Document`` is
470+
associated with an index this can be omitted.
471+
:arg validate: set to ``False`` to skip validating the documents
472+
:arg skip_empty: if set to ``False`` will cause empty values (``None``,
473+
``[]``, ``{}``) to be left on the document. Those values will be
474+
stripped out otherwise as they make no difference in Elasticsearch.
475+
476+
Any additional keyword arguments will be passed to
477+
``Elasticsearch.bulk`` unchanged.
478+
479+
:return: bulk operation results
480+
"""
481+
es = cls._get_connection(using)
482+
483+
i = cls._default_index(index)
484+
assert i is not None
485+
486+
class Generate:
487+
def __init__(
488+
self,
489+
doc_iterator: AsyncIterable[Union[AsyncDocument, Dict[str, Any]]],
490+
):
491+
self.doc_iterator = doc_iterator.__aiter__()
492+
493+
def __aiter__(self) -> Self:
494+
return self
495+
496+
async def __anext__(self) -> Dict[str, Any]:
497+
doc: Optional[Union[AsyncDocument, Dict[str, Any]]] = (
498+
await self.doc_iterator.__anext__()
499+
)
500+
501+
if isinstance(doc, dict):
502+
action = doc
503+
doc = None
504+
if "_source" in action and isinstance(
505+
action["_source"], AsyncDocument
506+
):
507+
doc = action["_source"]
508+
if validate: # pragma: no cover
509+
doc.full_clean()
510+
action["_source"] = doc.to_dict(
511+
include_meta=False, skip_empty=skip_empty
512+
)
513+
elif doc is not None:
514+
if validate: # pragma: no cover
515+
doc.full_clean()
516+
action = doc.to_dict(include_meta=True, skip_empty=skip_empty)
517+
if "_index" not in action:
518+
action["_index"] = i
519+
return action
520+
521+
return await async_bulk(es, Generate(actions), **kwargs)

Diff for: elasticsearch_dsl/_sync/document.py

+80-1
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,20 @@
1616
# under the License.
1717

1818
import collections.abc
19-
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast
19+
from typing import (
20+
TYPE_CHECKING,
21+
Any,
22+
Dict,
23+
Iterable,
24+
List,
25+
Optional,
26+
Tuple,
27+
Union,
28+
cast,
29+
)
2030

2131
from elasticsearch.exceptions import NotFoundError, RequestError
32+
from elasticsearch.helpers import bulk
2233
from typing_extensions import Self, dataclass_transform
2334

2435
from .._sync.index import Index
@@ -432,3 +443,71 @@ def save(
432443
setattr(self.meta, k, meta["_" + k])
433444

434445
return meta if return_doc_meta else meta["result"]
446+
447+
@classmethod
448+
def bulk(
449+
cls,
450+
actions: Iterable[Union[Self, Dict[str, Any]]],
451+
using: Optional[UsingType] = None,
452+
index: Optional[str] = None,
453+
validate: bool = True,
454+
skip_empty: bool = True,
455+
**kwargs: Any,
456+
) -> Tuple[int, Union[int, List[Any]]]:
457+
"""
458+
Allows to perform multiple indexing operations in a single request.
459+
460+
:arg actions: a generator that returns document instances to be indexed,
461+
bulk operation dictionaries.
462+
:arg using: connection alias to use, defaults to ``'default'``
463+
:arg index: Elasticsearch index to use, if the ``Document`` is
464+
associated with an index this can be omitted.
465+
:arg validate: set to ``False`` to skip validating the documents
466+
:arg skip_empty: if set to ``False`` will cause empty values (``None``,
467+
``[]``, ``{}``) to be left on the document. Those values will be
468+
stripped out otherwise as they make no difference in Elasticsearch.
469+
470+
Any additional keyword arguments will be passed to
471+
``Elasticsearch.bulk`` unchanged.
472+
473+
:return: bulk operation results
474+
"""
475+
es = cls._get_connection(using)
476+
477+
i = cls._default_index(index)
478+
assert i is not None
479+
480+
class Generate:
481+
def __init__(
482+
self,
483+
doc_iterator: Iterable[Union[Document, Dict[str, Any]]],
484+
):
485+
self.doc_iterator = doc_iterator.__iter__()
486+
487+
def __iter__(self) -> Self:
488+
return self
489+
490+
def __next__(self) -> Dict[str, Any]:
491+
doc: Optional[Union[Document, Dict[str, Any]]] = (
492+
self.doc_iterator.__next__()
493+
)
494+
495+
if isinstance(doc, dict):
496+
action = doc
497+
doc = None
498+
if "_source" in action and isinstance(action["_source"], Document):
499+
doc = action["_source"]
500+
if validate: # pragma: no cover
501+
doc.full_clean()
502+
action["_source"] = doc.to_dict(
503+
include_meta=False, skip_empty=skip_empty
504+
)
505+
elif doc is not None:
506+
if validate: # pragma: no cover
507+
doc.full_clean()
508+
action = doc.to_dict(include_meta=True, skip_empty=skip_empty)
509+
if "_index" not in action:
510+
action["_index"] = i
511+
return action
512+
513+
return bulk(es, Generate(actions), **kwargs)

Diff for: tests/test_integration/_async/test_document.py

+91-1
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,11 @@
2323

2424
from datetime import datetime
2525
from ipaddress import ip_address
26-
from typing import Any
26+
from typing import TYPE_CHECKING, Any, AsyncIterator, Dict, List, Union
2727

2828
import pytest
2929
from elasticsearch import AsyncElasticsearch, ConflictError, NotFoundError
30+
from elasticsearch.helpers.errors import BulkIndexError
3031
from pytest import raises
3132
from pytz import timezone
3233

@@ -49,6 +50,7 @@
4950
RankFeatures,
5051
Text,
5152
analyzer,
53+
mapped_field,
5254
)
5355
from elasticsearch_dsl.utils import AttrList
5456

@@ -705,3 +707,91 @@ async def test_highlight_in_meta(async_data_client: AsyncElasticsearch) -> None:
705707
assert "description" in commit.meta.highlight
706708
assert isinstance(commit.meta.highlight["description"], AttrList)
707709
assert len(commit.meta.highlight["description"]) > 0
710+
711+
712+
@pytest.mark.asyncio
713+
async def test_bulk(async_data_client: AsyncElasticsearch) -> None:
714+
class Address(InnerDoc):
715+
street: str
716+
active: bool
717+
718+
class Doc(AsyncDocument):
719+
if TYPE_CHECKING:
720+
_id: int
721+
name: str
722+
age: int
723+
languages: List[str] = mapped_field(Keyword())
724+
addresses: List[Address]
725+
726+
class Index:
727+
name = "bulk-index"
728+
729+
await Doc._index.delete(ignore_unavailable=True)
730+
await Doc.init()
731+
732+
async def gen1() -> AsyncIterator[Union[Doc, Dict[str, Any]]]:
733+
yield Doc(
734+
name="Joe",
735+
age=33,
736+
languages=["en", "fr"],
737+
addresses=[
738+
Address(street="123 Main St", active=True),
739+
Address(street="321 Park Dr.", active=False),
740+
],
741+
)
742+
yield Doc(name="Susan", age=20, languages=["en"])
743+
yield {"_op_type": "create", "_id": "45", "_source": Doc(name="Sarah", age=45)}
744+
745+
await Doc.bulk(gen1(), refresh=True)
746+
docs = list(await Doc.search().execute())
747+
assert len(docs) == 3
748+
assert docs[0].to_dict() == {
749+
"name": "Joe",
750+
"age": 33,
751+
"languages": [
752+
"en",
753+
"fr",
754+
],
755+
"addresses": [
756+
{
757+
"active": True,
758+
"street": "123 Main St",
759+
},
760+
{
761+
"active": False,
762+
"street": "321 Park Dr.",
763+
},
764+
],
765+
}
766+
assert docs[1].to_dict() == {
767+
"name": "Susan",
768+
"age": 20,
769+
"languages": ["en"],
770+
}
771+
assert docs[2].to_dict() == {
772+
"name": "Sarah",
773+
"age": 45,
774+
}
775+
assert docs[2].meta.id == "45"
776+
777+
async def gen2() -> AsyncIterator[Union[Doc, Dict[str, Any]]]:
778+
yield {"_op_type": "create", "_id": "45", "_source": Doc(name="Sarah", age=45)}
779+
780+
# a "create" action with an existing id should fail
781+
with raises(BulkIndexError):
782+
await Doc.bulk(gen2(), refresh=True)
783+
784+
async def gen3() -> AsyncIterator[Union[Doc, Dict[str, Any]]]:
785+
yield Doc(_id="45", name="Sarah", age=45, languages=["es"])
786+
yield {"_op_type": "delete", "_id": docs[1].meta.id}
787+
788+
await Doc.bulk(gen3(), refresh=True)
789+
with raises(NotFoundError):
790+
await Doc.get(docs[1].meta.id)
791+
doc = await Doc.get("45")
792+
assert doc is not None
793+
assert (doc).to_dict() == {
794+
"name": "Sarah",
795+
"age": 45,
796+
"languages": ["es"],
797+
}

0 commit comments

Comments
 (0)