[ElasticSearch Series-07] ES development scenarios and index sharding settings and optimization

ElasticSearch series overall column

Content Link address
[1] ElasticSearch download and installation https://zhenghuisheng.blog.csdn.net/article/details/129260827
[2] ElasticSearch concepts and basic operations https://blog.csdn. net/zhenghuishengq/article/details/134121631
[3] ElasticSearch’s advanced query Query DSL https://blog.csdn.net/zhenghuishengq/article/details/134159587
[4] Aggregation query operation of ElasticSearch https://blog.csdn.net/zhenghuishengq/article/details/134159587
[5] SpringBoot integrates elasticSearch https://blog.csdn.net/zhenghuishengq/article/details/134212200
[6] The construction of Es cluster architecture and the core concepts of clusters https: //blog.csdn.net/zhenghuishengq/article/details/134258577
[Seven] ES development scenarios and Settings and optimization of index sharding https://blog.csdn.net/zhenghuishengq/article/details/134302130

ES development scenarios and index sharding settings and optimization

  • 1. ES development scenarios and index sharding settings and optimization
    • 1. ES application scenarios
      • 1.1, Information search library
      • 1.2, time series library
    • 2. Design and management of shards
      • 2.1, single shard
      • 2.2, multiple shards
        • 2.2.1, reasons for inaccurate score calculation
      • 2.3. Sharding design
        • 2.3.1, Sharding type selection and advantages and disadvantages
        • 2.3.2, Primary Sharding Design and Cases
        • 2.3.3, Replica Sharding Design
    • 3. The underlying reading and writing principles of ElasticSearch
      • 3.1. Writing of data
        • 3.1.1, Data writing process
        • 3.2.2, Data storage file format
      • 3.2. Reading of data
        • 3.2.1, query based on id
        • 3.2.2, query based on keywords
      • 3.3. Optimization of data reading and writing

1. ES development scenarios and index sharding settings and optimization

In the previous article, we explained the construction of Es cluster, as well as some concepts of indexing, sharding, replicas, etc. This next article mainly explains some application scenarios of ElasticSearch in actual development.

1, ES application scenario

In actual development, ES mainly has two application scenarios: one is based on application scenarios where the amount of data is large but the data growth is slow, such as order query, product query, etc.; the other is based on Application scenarios with large amounts of data and rapid data growth, such as a large amount of log information every day, storing and querying logs through time series, etc.

1.1, information search library

This is the first case, which is aimed at application scenarios where the amount of data is large but the growth is slow. For example, in a mall app, the product information, order information, etc., after the data is added to es, you can choose to store it in shards by the type or name of the product. When querying, you only need to query the corresponding data based on the product type or name. Just split the node

What needs to be considered in this scenario is the relevance of the search, such as scoring and weighting, which have nothing to do with the time range.

For example, if you enter the home appliances in the search box in the picture above, all the home appliance information, brands, etc. will be displayed below. Then when doing index sharding in ES, you can perform shard storage based on the brand. It should be noted that the data volume of a single shard should not be too large, such as not exceeding 20g. If the data volume is too much, you can increase the number of replica shards to improve throughput

If the amount of data in a single index is too large, the index can be split through reindex. It can be split according to some enumerated fields. For example, orders can be split according to regions, products can be split according to brands, etc.

1.2, Time Series Library

For statistics based on time series, log query, etc., generally each piece of data will have a timestamp, and each document will basically not be updated, mainly for query, so the writing requirements for data will be relatively high. For example, tens of thousands of pieces of data are inserted into the es database every day

When creating an index, you can create an index directly based on time, such as dividing it by day, week, or month. If there are tens of thousands of log messages every day, you can create an index directly based on time, and you can open one every night. Schedule a task to create an index, and then all the data for today will be stored in this index. When doing subsequent statistics, you only need to locate this index slice.

PUT /logs_2023-11-07

You can also directly choose to use this Date Math expression, its syntax is as follows

<static_name{<!-- -->date_math_expr{<!-- -->date_format|time_zone}}>

Use the official website of Date Math: https://www.elastic.co/guide/en/elasticsearch/reference/7.16/date-math-index-names.html

For example, in the example given in the official document, the displayed result is to create an index for today’s date, and the prefix is my-index.

# PUT /<my-index-{<!-- -->now/d}>
PUT /<my-index-{now/d}>

The percentage signs above also have corresponding values in the official website, you just need to modify them according to these values.

The escape expressions provided by the official website include these, such as now/d, etc., just replace them one by one according to the above values from <, -, {, }, /, etc.

If you need to query the latest data, you can also use the hot and cold separation architecture to add the data of the past few days to the hot hotspot data; and when storing information such as logs, it is okay to lose a few pieces of data, so in the settings The number of copies can be directly set to 0

If the front end wants to query an index fixedly, you can add an index by using an alias. First delete the original index, and then set the alias of the new index to the name of the original index.

2. Sharding design and management

2.1, single shard

Starting in es7, when creating an index, there is only one shard and one copy by default. In the following example, I am using version 7.7. After creating an index, the default number of shards and copies is 1. Because using a single shard directly can avoid many problems, such as scoring problems, aggregation problems, etc.

"number_of_shards" : "1",
"number_of_replicas" : "1",

However, a single shard also has some shortcomings. For example, in a cluster, a single shard cannot achieve horizontal expansion well unless you manually reindex to add shards and split the data.

2.2, multiple shards

The advantages and disadvantages of multiple shards and a single shard are exactly opposite. Multiple shards are conducive to horizontal expansion of nodes, and their performance will be higher than that of a single shard index. However, there will also be some problems with multiple shards, such as inaccurate calculations, aggregation queries, etc.

2.2.1, reasons for inaccurate score calculation

When the amount of data is large, the data is generally evenly distributed on each node, so this kind of inaccurate calculation will not occur. It usually occurs when the amount of data is small, such as the data of each shard. The quantities are relatively small, for example

First create an index and set the number of shards to 3

PUT /zhs_db
{<!-- -->
  "settings":{<!-- -->
    "number_of_shards" : "3"
  }
}

Then insert data into the document. _bulk batch insertion is not used here, because the batch insertion will be in a shard. Insert three pieces of data into it. According to the hash rules, the three pieces of data will fall into three shards respectively, and one piece of data will be in one shard.

POST /zhs_db/_doc/1?routing=zhenghuisheng
{<!-- -->
 "content":"Cross Cluster elasticsearch Search"
}

POST /zhs_db/_doc/2?routing=zhenghuisheng2
{<!-- -->
 "content":"elasticsearch Search"
}

POST /zhs_db/_doc/3?routing=zhenghuisheng3
{<!-- -->
 "content":"elasticsearch"
}

Then perform a match to query the content, and the value is elasticSearch

GET /zhs_db/_search
{<!-- -->
  "query": {<!-- -->
    "match": {<!-- -->
      "content": "elasticsearch"
    }
  }
}

After executing the above query, the results are as follows. Originally, the higher the proportion of documents, the greater the score. That is, the one with id 3 accounts for 100%, so it is logical that it has the highest score. However, in fact, in The result of the query is that the score with id 1 is the highest, but the actual score with id 1 is the lowest. Therefore, overall, this score is inaccurate.

The main reason is that each shard has its own scoring standards. Each shard is calculated based on the correlation of the data on its own shard. The main reason is that the amount of data is small, so if the amount of data is small, In this case, it is recommended to set the number of shards to 1

Of course, when the amount of data is small, there is also a corresponding solution, which is to use DFS Query Then Fetch. The principle is to search out all the data and then put it into a coordination node. The coordinating node performs a complete score calculation again. However, in actual development, this method is not recommended because its performance is relatively low.

GET /zhs_db/_search?search_type=dfs_query_then_fetch
{<!-- -->
  "query": {<!-- -->
    "match": {<!-- -->
      "content": "elasticsearch"
    }
  }
}

2.3, Sharding design

2.3.1, Sharding type selection and advantages and disadvantages

The advantages and disadvantages of using a single shard and multiple shards have been mentioned above. Next, let’s talk about how to design this shard in actual development.

Generally speaking, the number of shards needs to be greater than the number of nodes, so it is basically preferred to consider multi-shard data. This is conducive to automatic allocation when new data nodes are added, and if the data of an index is distributed in Different nodes can be executed in parallel, and when data is written, it can also be distributed to multiple machines.

However, too many shards will also bring some side effects, because each shard is a Lucene index, which is actually a process. If there are too many shards, it will occupy the resources of the machine, resulting in additional overhead, and All shards require maintenance and management by the Master node, which will cause the Master node to bear a greater burden, so the shards need to be controlled within 10W.

2.3.2, Primary Sharding Design and Cases

In actual development, log data only needs to set the number of primary shards, but does not need to set the number of replicas. Other data that grows slowly needs to set the number of primary shards and the number of replicas. Next, let’s talk about the number of primary shards. How to design

  • When the data is search type, such as product information type, then a single shard should not exceed 20 G of data.
  • When the data is log type, such as order type and flow type, then a single shard should not exceed 50G of data.

Search design: For example, if the design is based on brand, a brand corresponds to an index, and an index corresponds to a primary shard and a replica shard.

Log design: Create a log index every day, and create 10 shards for each log index. 300 shards need to be created in a month, and no copies are required.

2.3.3, replica sharding design

Generally in actual development, the replica is set to 0 or 1. Log data does not require a replica and can be directly set to 0. Search data is generally set to 1. The replica is a slave shard similar to the master shard. All data in the primary shard.

Copy shard data means that after the data is inserted into the live shard, another copy is saved in the replica shard. If there are too many replica shards, it will take time to store the replica data. More time will have a certain impact on writing performance

However, replica sharding also has benefits. First of all, it can improve query efficiency, prevent data loss, and ensure data security. Therefore, replica sharding is necessary, but it cannot be designed to be too large, except for the log class. And by continuously adjusting the number of replica shards, the query rate and response rate of the entire system can reach the optimal state.

In order to avoid uneven distribution, the number of shards is adjusted as follows:

  • index.routing.allocation.total_shards_per_node: indicates the maximum number of shards for each Node in the index, -1 indicates infinity
  • cluster.routing.allocation.total_shards_per_node: indicates the maximum number of shards for each Node in the cluster, -1 indicates infinite

3, ElasticSearch underlying reading and writing principle

The above has talked about some concepts and designs of sharding and replicas. Next, we will analyze the function of sharding and replicas through the process of writing data in es.

3.1, Data writing

3.1.1, data writing process

As mentioned earlier, different functions are implemented through different data nodes. The write request needs to pass through the coordination node first, and then forwarded to the data data node for storage. Then the data is first stored on the primary shard, and then the data is synchronized to the replica. On sharding, the specific process is implemented as follows

  • 1. After the user initiates a write request, the request will first go to the coordination node.
  • 2. After the coordination node receives the request, it will forward the request to the corresponding node through route routing.
  • 3. The data will then be synchronized to the primary shard of the node. If there is a replica shard, the data will be synchronized to the replica shard.
  • 4. After the master primary shard and the replica shard have synchronized the data, the coordinating node will give the client a response of successful storage.
3.2.2, data storage file format

segment file: In mysql, mysql is stored on the disk in units of pages. In ElasticSearch, it is stored in the form of segment file. The essence of each file is an inverted index, an Large shards are composed of small Segment files. When there are too many files, each small file will be automatically merged, or you can manually force the merge. During the merge, the documents marked for deletion will be physically deleted.

commit point: When a document is deleted, ElasticSearch will not delete it immediately, but first make a mark through this commit point file. Each file has a .del mark. If set If the document is deleted, this document will be filtered out by default when querying data. This is also somewhat similar to the row format of mysql. There are fields in it to mark whether it has been deleted.

translog file: similar to mysql redolog file to prevent data loss due to downtime and used for data recovery

os cache: Cache, a disk flush operation will be performed every s, you can also force a disk flush through refresh

3.2, Reading of data

There are two main ways to query data, one is to query directly based on ID, and the other is to match directly based on keywords.

3.2.1, query based on id

The method of querying based on ID is relatively simple. First, the client initiates a request, and then sends the request to the coordinator node. The coordinator node uses the ID to perform hash modulus and locate the Data data node. There is a master on the data node. Sharding and replica sharding. Both shards can query data. The replica will be randomly selected to be still in the primary shard, such as P0 and R0 below. One of the two shards will be selected. Sharding is used as the basis for data query, and then the response results are returned to the coordinator sharding, and finally the data is returned to the user through the coordinator sharding.

Of course, if there is a load balancing between the replica shard and the primary shard, it can improve the performance of the entire system.

3.2.2, query based on keywords

The process is roughly the same as above. First, the request is sent to the coordinator node, and then the Data node is located. However, in the process of querying data and returning data, it needs to go through two stages of operations.

Because the bottom layer of es uses an inverted index, the first step is to query all the data with keywords that need to be queried, and then carry the id of that row of data, and then query by id. This is equivalent to querying Querying twice, using mysql to explain, is to query data through an indexed field, and then return the id to the table

  • Query phase: The first step is this. First, return all the data to the coordinator node, and then perform filtering, sorting and other operations in the coordinator node.
  • fetch phase: The second step is to perform another query based on the results determined in the first step and return the data through the id of the data.

3.3, data reading and writing optimization

We have learned about the underlying principles of reading and writing. After knowing the principles, we can perform optimization operations according to the principles.

Read data optimization

For example, when reading data, reduce wildcard queries, prefix queries and other queries that require full-text retrieval; if fields that do not need to be calculated can use precise queries; use Filter Context and use the internal caching mechanism to reduce unnecessary calculations. ; Combine profile and explain to analyze slow query problems, etc.

Write data optimization

When writing data, you can use batch insertion to increase system throughput, or use multi-threading to insert data;

Sharding Optimization

In addition to query optimization, you can also optimize on sharding. If the amount of data is small, you can use a single shard. When the amount of data is large and use multiple shards, you need to prevent the overhead caused by too many shards. For this kind of time series query, you can force merge to delete unnecessary documents, thereby reducing the number of such segments.

In addition to the above optimizations, you can also optimize the server hardware equipment, adjust the refresh frequency, adjust the frequency of translog writing to disk, etc. Such as the following template, set refresh refresh time, translog flush disk, etc.

DELETE myindex
PUT myindex
{<!-- -->
  "settings": {<!-- -->
    "index": {<!-- -->
      "refresh_interval": "30s", #refresh every 30s
      "number_of_shards": "2"
    },
    "routing": {<!-- -->
      "allocation": {<!-- -->
        "total_shards_per_node": "3" #Control shards to avoid data hot spots
      }
    },
    "translog": {<!-- -->
      "sync_interval": "30s",
      "durability": "async" #Reduce the frequency of translog disk placement
    },
    "number_of_replicas": 0
  },
  "mappings": {<!-- -->
    "dynamic": false, #Avoid unnecessary field indexes, update by query if necessary
Index necessary fields
    "properties": {<!-- -->}
  }
}