Elasticsearch: How to Index Documents for Elasticsearch Using the Bulk API in Python

When we need to create an Elasticsearch index, the data source is usually not normalized and cannot be imported directly. Raw data can be stored in databases, raw CSV/XML files, or even fetched from third-party APIs. In this case, we need to preprocess the data to make it work with the Bulk API. In this tutorial, we will demonstrate how to index Elasticsearch documents from CSV files using simple Python code. Will use the native Elasticsearch bulk API and the API in the helpers module. You’ll learn how to index Elasticsearch documents with the right tools for different occasions.

In the previous article “Elasticsearch: Everything you need to know about using Elasticsearch in Python – 8.x”, I showed how to use the bulk API to index documents into Elasticsearch. Careful developers may observe that if we have a lot of documents and a large amount of data, that method may not be applicable, because all operations are performed in memory. If our original document is large, this is very likely to cause insufficient memory. In today’s article, I’ll explore implementing generators in Python.

For the convenience of testing, our data can be obtained from https://github.com/liu-xiao-guo/py-elasticsearch8. data.csv will be the raw data we use.

Install

For testing purposes, we will use my previous article “Elasticsearch: How to run Elasticsearch 8.x on Docker for local development” for deployment. Here we use docker compose to install Elasticsearch and Kibana. We will not apply security settings. For more information on how to use Python to connect to Elasticsearch with security, please refer to the previous article “Elasticsearch: Everything you need to know about using Elasticsearch in Python – 8.x”. We can refer to that article to install the required Python packages.

Create an index in Python

We will create the same latops-demo index as demonstrated in the previous article. First, we’ll create the index directly using the Elasticsearch client. Also, settings and mappings will be passed as top-level parameters, not via the body parameter. The command to create an index is:

main.py

# Import Elasticsearch package
from elasticsearch import Elasticsearch
import csv
import json
 
# Connect to Elasticsearch cluster
es = Elasticsearch( "http://localhost:9200")
resp = es. info()
print(resp)
 
settings = {
    "index": {"number_of_replicas": 2},
    "analysis": {
        "filter": {
            "ngram_filter": {
                "type": "edge_ngram",
                "min_gram": 2,
                "max_gram": 15,
            }
        },
        "analyzer": {
            "ngram_analyzer": {
                "type": "custom",
                "tokenizer": "standard",
                "filter": ["lowercase", "ngram_filter"],
            }
        }
    }
}
 
mappings = {
    "properties": {
        "id": {"type": "long"},
        "name": {
            "type": "text",
            "analyzer": "standard",
            "fields": {
                "keyword": {"type": "keyword"},
                "ngrams": {"type": "text", "analyzer": "ngram_analyzer"},
            }
        },
        "brand": {
            "type": "text",
            "fields": {
                "keyword": {"type": "keyword"},
            }
        },
        "price": {"type": "float"},
        "attributes": {
            "type": "nested",
            "properties": {
                "attribute_name": {"type": "text"},
                "attribute_value": {"type": "text"},
            }
        }
    }
}
 
configurations = {
    "settings": {
        "index": {"number_of_replicas": 2},
        "analysis": {
            "filter": {
                "ngram_filter": {
                    "type": "edge_ngram",
                    "min_gram": 2,
                    "max_gram": 15,
                }
            },
            "analyzer": {
                "ngram_analyzer": {
                    "type": "custom",
                    "tokenizer": "standard",
                    "filter": ["lowercase", "ngram_filter"],
                }
            }
        }
    },
    "mappings": {
        "properties": {
            "id": {"type": "long"},
            "name": {
                "type": "text",
                "analyzer": "standard",
                "fields": {
                    "keyword": {"type": "keyword"},
                    "ngrams": {"type": "text", "analyzer": "ngram_analyzer"},
                }
            },
            "brand": {
                "type": "text",
                "fields": {
                    "keyword": {"type": "keyword"},
                }
            },
            "price": {"type": "float"},
            "attributes": {
                "type": "nested",
                "properties": {
                    "attribute_name": {"type": "text"},
                    "attribute_value": {"type": "text"},
                }
            }
        }
    }
}
 
 
INDEX_NAME = "laptops-demo"
 
# check the existence of the index. If yes, remove it
if(es.indices.exists(index=INDEX_NAME)):
    print("The index has already existed, going to remove it")
    es.options(ignore_status=404).indices.delete(index=INDEX_NAME)
 
# Create the index with the correct configurations
res = es.indices.create(index=INDEX_NAME, settings=settings,mappings=mappings)
print(res)
 
# The following is another way to create the index, but it is deprecated
# es.indices.create(index = INDEX_NAME, body =configurations )
 

The index is now created. We can use the following command in Kibana to view:

GET _cat/indices

We can start adding documents to it.

Using native Elasticsearch bulk API

Using the native Elasticsearch batch API is handy when you have a small dataset to load, as the syntax is the same as native Elasticsearch queries and can be run directly in the Dev console. You don’t need to learn anything new.

The data files to be loaded can be downloaded from this link. Save it as data.csv, which will be used in the following Python code:

main.py

# Import Elasticsearch package
from elasticsearch import Elasticsearch
import csv
import json
 
# Connect to Elasticsearch cluster
es = Elasticsearch( "http://localhost:9200")
resp = es. info()
# print(resp)
 
settings = {
    "index": {"number_of_replicas": 2},
    "analysis": {
        "filter": {
            "ngram_filter": {
                "type": "edge_ngram",
                "min_gram": 2,
                "max_gram": 15,
            }
        },
        "analyzer": {
            "ngram_analyzer": {
                "type": "custom",
                "tokenizer": "standard",
                "filter": ["lowercase", "ngram_filter"],
            }
        }
    }
}
 
mappings = {
    "properties": {
        "id": {"type": "long"},
        "name": {
            "type": "text",
            "analyzer": "standard",
            "fields": {
                "keyword": {"type": "keyword"},
                "ngrams": {"type": "text", "analyzer": "ngram_analyzer"},
            }
        },
        "brand": {
            "type": "text",
            "fields": {
                "keyword": {"type": "keyword"},
            }
        },
        "price": {"type": "float"},
        "attributes": {
            "type": "nested",
            "properties": {
                "attribute_name": {"type": "text"},
                "attribute_value": {"type": "text"},
            }
        }
    }
}
 
configurations = {
    "settings": {
        "index": {"number_of_replicas": 2},
        "analysis": {
            "filter": {
                "ngram_filter": {
                    "type": "edge_ngram",
                    "min_gram": 2,
                    "max_gram": 15,
                }
            },
            "analyzer": {
                "ngram_analyzer": {
                    "type": "custom",
                    "tokenizer": "standard",
                    "filter": ["lowercase", "ngram_filter"],
                }
            }
        }
    },
    "mappings": {
        "properties": {
            "id": {"type": "long"},
            "name": {
                "type": "text",
                "analyzer": "standard",
                "fields": {
                    "keyword": {"type": "keyword"},
                    "ngrams": {"type": "text", "analyzer": "ngram_analyzer"},
                }
            },
            "brand": {
                "type": "text",
                "fields": {
                    "keyword": {"type": "keyword"},
                }
            },
            "price": {"type": "float"},
            "attributes": {
                "type": "nested",
                "properties": {
                    "attribute_name": {"type": "text"},
                    "attribute_value": {"type": "text"},
                }
            }
        }
    }
}
 
 
INDEX_NAME = "laptops-demo"
 
# check the existence of the index. If yes, remove it
if(es.indices.exists(index=INDEX_NAME)):
    print("The index has already existed, going to remove it")
    es.options(ignore_status=404).indices.delete(index=INDEX_NAME)
 
# Create the index with the correct configurations
res = es.indices.create(index=INDEX_NAME, settings=settings,mappings=mappings)
print(res)
 
# The following is another way to create the index, but it is deprecated
# es.indices.create(index = INDEX_NAME, body =configurations )
 
with open("data.csv", "r") as fi:
    reader = csv.DictReader(fi, delimiter=",")

    actions = []
    for row in reader:
        action = {"index": {"_index": INDEX_NAME, "_id": int(row["id"])}}
        doc = {
            "id": int(row["id"]),
            "name": row["name"],
            "price": float(row["price"]),
            "brand": row["brand"],
            "attributes": [
                {"attribute_name": "cpu", "attribute_value": row["cpu"]},
                {"attribute_name": "memory", "attribute_value": row["memory"]},
                {
                    "attribute_name": "storage",
                    "attribute_value": row["storage"],
                },
            ],
        }
        actions. append(action)
        actions.append(doc)

    es.bulk(index=INDEX_NAME, operations=actions, refresh=True)

# Check the results:
result = es.count(index=INDEX_NAME)
print(result)
print(result. body['count'])

We run the code above:

$ python main.py
The index has already existed, going to remove it
{'acknowledged': True, 'shards_acknowledged': True, 'index': 'laptops-demo'}
{'count': 200, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}}
200

Note: In the bulk instruction above, we need to use refresh=True, otherwise when we read count, its value may be 0.

In the above code, there is a fatal problem that we create actions in memory. If our data is relatively large, the memory required by actions will be relatively large. It is obviously not suitable for the case of very large data.

Note that we use the csv library to conveniently read data from CSV files. As can be seen, the syntax of the native bulk API is very simple and can be used across different languages (including Dev Tools Console).

Use bulk helper – bulk helper

As mentioned above, one problem with the native bulk API is that all data needs to be loaded into memory before it can be indexed. This can be problematic and inefficient when we have a large dataset. To solve this problem, we can use bulk helpers, which can index Elasticsearch documents from iterators or generators. So it doesn’t need to load all the data into memory first, which is very memory efficient. However, the syntax is a bit different, as we’ll see shortly.

Before we use the bulk helper to index documents, we should delete the documents in the index to confirm that the bulk helper is indeed working successfully. This has already been done in our code above. We can then run the following code to load data into Elasticsearch using batch helpers:

main.py

# Import Elasticsearch package
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import csv
import json
 
# Connect to Elasticsearch cluster
es = Elasticsearch( "http://localhost:9200")
resp = es. info()
# print(resp)
 
settings = {
    "index": {"number_of_replicas": 2},
    "analysis": {
        "filter": {
            "ngram_filter": {
                "type": "edge_ngram",
                "min_gram": 2,
                "max_gram": 15,
            }
        },
        "analyzer": {
            "ngram_analyzer": {
                "type": "custom",
                "tokenizer": "standard",
                "filter": ["lowercase", "ngram_filter"],
            }
        }
    }
}
 
mappings = {
    "properties": {
        "id": {"type": "long"},
        "name": {
            "type": "text",
            "analyzer": "standard",
            "fields": {
                "keyword": {"type": "keyword"},
                "ngrams": {"type": "text", "analyzer": "ngram_analyzer"},
            }
        },
        "brand": {
            "type": "text",
            "fields": {
                "keyword": {"type": "keyword"},
            }
        },
        "price": {"type": "float"},
        "attributes": {
            "type": "nested",
            "properties": {
                "attribute_name": {"type": "text"},
                "attribute_value": {"type": "text"},
            }
        }
    }
}
 
configurations = {
    "settings": {
        "index": {"number_of_replicas": 2},
        "analysis": {
            "filter": {
                "ngram_filter": {
                    "type": "edge_ngram",
                    "min_gram": 2,
                    "max_gram": 15,
                }
            },
            "analyzer": {
                "ngram_analyzer": {
                    "type": "custom",
                    "tokenizer": "standard",
                    "filter": ["lowercase", "ngram_filter"],
                }
            }
        }
    },
    "mappings": {
        "properties": {
            "id": {"type": "long"},
            "name": {
                "type": "text",
                "analyzer": "standard",
                "fields": {
                    "keyword": {"type": "keyword"},
                    "ngrams": {"type": "text", "analyzer": "ngram_analyzer"},
                }
            },
            "brand": {
                "type": "text",
                "fields": {
                    "keyword": {"type": "keyword"},
                }
            },
            "price": {"type": "float"},
            "attributes": {
                "type": "nested",
                "properties": {
                    "attribute_name": {"type": "text"},
                    "attribute_value": {"type": "text"},
                }
            }
        }
    }
}
 
 
INDEX_NAME = "laptops-demo"
 
# check the existence of the index. If yes, remove it
if(es.indices.exists(index=INDEX_NAME)):
    print("The index has already existed, going to remove it")
    es.options(ignore_status=404).indices.delete(index=INDEX_NAME)
 
# Create the index with the correct configurations
res = es.indices.create(index=INDEX_NAME, settings=settings,mappings=mappings)
print(res)
 
# The following is another way to create the index, but it is deprecated
# es.indices.create(index = INDEX_NAME, body =configurations )
 
def generate_docs():
    with open("data.csv", "r") as fi:
        reader = csv.DictReader(fi, delimiter=",")

        for row in reader:
            doc = {
                "_index": INDEX_NAME,
                "_id": int(row["id"]),
                "_source": {
                    "id": int(row["id"]),
                    "name": row["name"],
                    "price": float(row["price"]),
                    "brand": row["brand"],
                    "attributes": [
                        {
                            "attribute_name": "cpu",
                            "attribute_value": row["cpu"],
                        },
                        {
                            "attribute_name": "memory",
                            "attribute_value": row["memory"],
                        },
                        {
                            "attribute_name": "storage",
                            "attribute_value": row["storage"],
                        },
                    ],
                },
            }
            yield doc


helpers. bulk(es, generate_docs())
# (200, []) -- 200 indexed, no errors.

es.indices.refresh()

# Check the results:
result = es.count(index=INDEX_NAME)
print(result. body['count'])

Run the above code. The displayed results are as follows:

$ python main.py
The index has already existed, going to remove it
{'acknowledged': True, 'shards_acknowledged': True, 'index': 'laptops-demo'}
200

From the above results we can see that we have successfully ingested 200 documents.

The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledgePython entry skill treeHomepageOverview 258170 people are learning systematically