diff --git a/elasticsearch/compat.py b/elasticsearch/compat.py index af77c5d03..c5322b561 100644 --- a/elasticsearch/compat.py +++ b/elasticsearch/compat.py @@ -32,6 +32,12 @@ map = map from queue import Queue +try: + from collections.abs import Mapping +except ImportError: + from collections import Mapping + + __all__ = [ "string_types", "quote_plus", @@ -41,4 +47,5 @@ "urlparse", "map", "Queue", + "Mapping", ] diff --git a/elasticsearch/helpers/actions.py b/elasticsearch/helpers/actions.py index 87fcc305d..e171190b5 100644 --- a/elasticsearch/helpers/actions.py +++ b/elasticsearch/helpers/actions.py @@ -19,7 +19,7 @@ import time from ..exceptions import TransportError -from ..compat import map, string_types, Queue +from ..compat import map, string_types, Queue, Mapping from .errors import ScanError, BulkIndexError @@ -43,9 +43,22 @@ def expand_action(data): data = data.copy() op_type = data.pop("_op_type", "index") action = {op_type: {}} + + # If '_source' is a dict use it for source + # otherwise if op_type == 'update' then + # '_source' should be in the metadata. + if ( + op_type == "update" + and "_source" in data + and not isinstance(data["_source"], Mapping) + ): + action[op_type]["_source"] = data.pop("_source") + for key in ( "_id", "_index", + "_if_seq_no", + "_if_primary_term", "_parent", "_percolate", "_retry_on_conflict", @@ -54,6 +67,8 @@ def expand_action(data): "_type", "_version", "_version_type", + "if_seq_no", + "if_primary_term", "parent", "pipeline", "retry_on_conflict", @@ -62,13 +77,15 @@ def expand_action(data): "version_type", ): if key in data: - if key in [ + if key in { + "_if_seq_no", + "_if_primary_term", "_parent", "_retry_on_conflict", "_routing", "_version", "_version_type", - ]: + }: action[op_type][key[1:]] = data.pop(key) else: action[op_type][key] = data.pop(key) diff --git a/test_elasticsearch/test_helpers.py b/test_elasticsearch/test_helpers.py index 8ea7c74c4..d3640d89e 100644 --- a/test_elasticsearch/test_helpers.py +++ b/test_elasticsearch/test_helpers.py @@ -76,6 +76,103 @@ class TestChunkActions(TestCase): def setup_method(self, _): self.actions = [({"index": {}}, {"some": u"datá", "i": i}) for i in range(100)] + def test_expand_action(self): + self.assertEqual(helpers.expand_action({}), ({"index": {}}, {})) + self.assertEqual( + helpers.expand_action({"key": "val"}), ({"index": {}}, {"key": "val"}) + ) + + def test_expand_action_actions(self): + self.assertEqual( + helpers.expand_action( + {"_op_type": "delete", "_id": "id", "_index": "index"} + ), + ({"delete": {"_id": "id", "_index": "index"}}, None), + ) + self.assertEqual( + helpers.expand_action( + {"_op_type": "update", "_id": "id", "_index": "index", "key": "val"} + ), + ({"update": {"_id": "id", "_index": "index"}}, {"key": "val"}), + ) + self.assertEqual( + helpers.expand_action( + {"_op_type": "create", "_id": "id", "_index": "index", "key": "val"} + ), + ({"create": {"_id": "id", "_index": "index"}}, {"key": "val"}), + ) + self.assertEqual( + helpers.expand_action( + { + "_op_type": "create", + "_id": "id", + "_index": "index", + "_source": {"key": "val"}, + } + ), + ({"create": {"_id": "id", "_index": "index"}}, {"key": "val"}), + ) + + def test_expand_action_options(self): + for option in ( + "_id", + "_index", + "_percolate", + "_timestamp", + "_type", + "if_seq_no", + "if_primary_term", + "parent", + "pipeline", + "retry_on_conflict", + "routing", + "version", + "version_type", + ("_parent", "parent"), + ("_retry_on_conflict", "retry_on_conflict"), + ("_routing", "routing"), + ("_version", "version"), + ("_version_type", "version_type"), + ("_if_seq_no", "if_seq_no"), + ("_if_primary_term", "if_primary_term"), + ): + if isinstance(option, str): + action_option = option + else: + option, action_option = option + self.assertEqual( + helpers.expand_action({"key": "val", option: 0}), + ({"index": {action_option: 0}}, {"key": "val"}), + ) + + def test__source_metadata_or_source(self): + self.assertEqual( + helpers.expand_action({"_source": {"key": "val"}}), + ({"index": {}}, {"key": "val"}), + ) + + self.assertEqual( + helpers.expand_action( + {"_source": ["key"], "key": "val", "_op_type": "update"} + ), + ({"update": {"_source": ["key"]}}, {"key": "val"}), + ) + + self.assertEqual( + helpers.expand_action( + {"_source": True, "key": "val", "_op_type": "update"} + ), + ({"update": {"_source": True}}, {"key": "val"}), + ) + + # This case is only to ensure backwards compatibility with old functionality. + self.assertEqual( + helpers.expand_action( + {"_source": {"key2": "val2"}, "key": "val", "_op_type": "update"} + ), + ({"update": {}}, {"key2": "val2"}), + ) + def test_chunks_are_chopped_by_byte_size(self): self.assertEqual( 100,