Enrich your Elasticsearch documents in Elasticsearch

Author: David Pilato

For Elasticsearch?, we know that joins should be done at “index time” not query time. This blog post is the start of a series of three posts, as there are many approaches we can take within the Elastic? ecosystem. We’ll cover how to do this in Elasticsearch. The next blog post will show how to do this using the centralized component Logstash, and the previous post will show how to do this at the edge using Elastic Agent/Beats.

To give a simple example, assume we are an e-commerce website and collect logs in kibana_sample_data_logs:

{
  "agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24",
  "bytes": 1831,
  "clientip": "30.156.16.164",
  "extension": "",
  "geo": {
    "srcdest": "US:IN",
    "src": "US",
    "dest": "IN",
    "coordinates": {
      "lat": 55.53741389,
      "lon": -132.3975144
    }
  },
  "host": "elastic-elastic-elastic.org",
  "index": "kibana_sample_data_logs",
  "ip": "30.156.16.163",
  "machine": {
    "ram": 9663676416,
    "os": "win xp"
  },
  "memory": 73240,
  "message": "30.156.16.163 - - [2018-09-01T12:43:49.756Z] "GET /wp-login.php HTTP/1.1" 404 1831 "- " "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24"",
  "phpmemory": 73240,
  "referer": "http://www.elastic-elastic-elastic.com/success/timothy-l-kopra",
  "request": "/wp-login.php",
  "response": 404,
  "tags": [
    "success",
    "info"
  ],
  "timestamp": "2023-03-18T12:43:49.756Z",
  "url": "https://elastic-elastic-elastic.org/wp-login.php",
  "utc_time": "2023-03-18T12:43:49.756Z",
  "event": {
    "dataset": "sample_web_logs"
  }
}

Note that you can easily import this dataset using the Kibana? sample dataset by clicking the “Add data” button in the “Sample web blogs” box:

We also have a VIP index that contains information about our customers:

{
  "ip" : "30.156.16.164",
  "vip": true,
  "name": "David P"
}

To import this example dataset, we simply run:

DELETE /vip
PUT /vip
{
  "mappings": {
    "properties": {
      "ip": { "type": "keyword" },
      "name": { "type": "text" },
      "vip": { "type": "boolean" }
    }
  }
}
POST /vip/_bulk
{ "index" : { } }
{ "ip" : "30.156.16.164", "vip": true, "name": "David P" }
{ "index" : { } }
{ "ip" : "164.85.94.243", "vip": true, "name": "Philipp K" }
{ "index" : { } }
{ "ip" : "50.184.59.162", "vip": true, "name": "Adrienne V" }
{ "index" : { } }
{ "ip" : "236.212.255.77", "vip": true, "name": "Carly R" }
{ "index" : { } }
{ "ip" : "16.241.165.21", "vip": true, "name": "Naoise R" }
{ "index" : { } }
{ "ip" : "246.106.125.113", "vip": true, "name": "Iulia F" }
{ "index" : { } }
{ "ip" : "81.194.200.150", "vip": true, "name": "Jelena Z" }
{ "index" : { } }
{ "ip" : "111.237.144.54", "vip": true, "name": "Matt R" }

To perform “joins at index time” we need to enrich our dataset to get a final log that looks like this:

{
  "agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24",
  "bytes": 1831,
  "clientip": "30.156.16.164",
  "extension": "",
  "geo": {
    "srcdest": "US:IN",
    "src": "US",
    "dest": "IN",
    "coordinates": {
      "lat": 55.53741389,
      "lon": -132.3975144
    }
  },
  "host": "elastic-elastic-elastic.org",
  "index": "kibana_sample_data_logs",
  "ip": "30.156.16.163",
  "machine": {
    "ram": 9663676416,
    "os": "win xp"
  },
  "memory": 73240,
  "message": "30.156.16.163 - - [2018-09-01T12:43:49.756Z] "GET /wp-login.php HTTP/1.1" 404 1831 "- " "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24"",
  "phpmemory": 73240,
  "referer": "http://www.elastic-elastic-elastic.com/success/timothy-l-kopra",
  "request": "/wp-login.php",
  "response": 404,
  "tags": [
    "success",
    "info"
  ],
  "timestamp": "2023-03-18T12:43:49.756Z",
  "url": "https://elastic-elastic-elastic.org/wp-login.php",
  "utc_time": "2023-03-18T12:43:49.756Z",
  "event": {
    "dataset": "sample_web_logs"
  },
  "vip": true,
  "name": "David P"
}

You can do this out of the box using the Elasticsearch Enrich Processor in the ingestion pipeline. Let’s see how to do this.

Enriching Elasticsearch data in Elasticsearch

Ingest pipeline – ingest pipeline

Let’s start with the ingest pipeline.

We can start with an empty one, which we will use to simulate the behavior we want. We don’t need the full set of fields from the original dataset, so we simplify it:

POST /_ingest/pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "clientip": "30.156.16.164"
      }
    }
  ],
  "pipeline": {
    "processors": []
  }
}

We now need to add an enrich processor to our pipeline. But to do this, we need to first create an enriched policy:

PUT /_enrich/policy/vip-policy
{
  "match": {
    "indices": "vip",
    "match_field": "ip",
    "enrich_fields": ["name", "vip"]
  }
}

After creating a rich policy, we can execute it using the Execute Rich Policy API:

PUT /_enrich/policy/vip-policy/_execute

We can simulate it now:

POST /_ingest/pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "clientip": "30.156.16.164"
      }
    }
  ],
  "pipeline": {
    "processors": [{
      "enrich": {
        "policy_name": "vip-policy",
        "field": "clientip",
        "target_field": "enriched"
      }
    }]
  }
}

This gives the following response:

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_id": "_id",
        "_version": "-3",
        "_source": {
          "enriched": {
            "name": "David P",
            "vip": true,
            "ip": "30.156.16.164"
          },
          "clientip": "30.156.16.164"
        },
        "_ingest": {
          "timestamp": "2023-04-06T17:14:29.127569953Z"
        }
      }
    }
  ]
}

We just need to clean up the data to get the structure we expect:

POST /_ingest/pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "clientip": "30.156.16.164"
      }
    }
  ],
  "pipeline": {
    "processors": [{
      "enrich": {
        "policy_name": "vip-policy",
        "field": "clientip",
        "target_field": "enriched"
      }
    },{
      "rename": {
        "field": "enriched.name",
        "target_field": "name"
      }
    },{
      "rename": {
        "field": "enriched.vip",
        "target_field": "vip"
      }
    },{
      "remove": {
        "field": "enriched"
      }
    }
    ]
  }
}

Now gives the expected result:

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_id": "_id",
        "_version": "-3",
        "_source": {
          "name": "David P",
          "vip": true,
          "clientip": "30.156.16.164"
        },
        "_ingest": {
          "timestamp": "2023-04-06T17:16:08.175186282Z"
        }
      }
    }
  ]
}

We can now store the final pipeline:

PUT /_ingest/pipeline/vip
{
  "processors": [{
    "enrich": {
      "policy_name": "vip-policy",
      "field": "clientip",
      "target_field": "enriched"
    }
  },{
    "rename": {
      "field": "enriched.name",
      "target_field": "name",
      "ignore_failure": true
    }
  },{
    "rename": {
      "field": "enriched.vip",
      "target_field": "vip",
      "ignore_failure": true
    }
  },{
    "remove": {
      "field": "enriched",
      "ignore_failure": true
    }
  }
  ]
}

Note that we changed it a bit by adding some ignore_failure directives, as we might not find any relevant data in the vip index.

We can create the target index using the same mapping as the source index:

# Get the source mapping
GET /kibana_sample_data_logs/_mapping

# Create the destination index
PUT /kibana_sample_data_logs_new
{
  // Paste the source mappings structure
  "mappings": {
    "properties": {
      // And add the properties we are adding
      "name": {
        "type": "keyword"
      },
      "vip": {
        "type": "boolean"
      }
    }
  }
}

And call the Reindex API:

POST_reindex
{
  "source": {
    "index": "kibana_sample_data_logs"
  },
  "dest": {
    "index": "kibana_sample_data_logs_new",
    "pipeline": "vip"
  }
}

Let’s check if the job is done:

GET /kibana_sample_data_logs_new/_search?filter_path=aggregations.by_name.buckets
{
  "size": 0,
  "aggs": {
    "by_name": {
      "terms": {
        "field": "name"
      }
    }
  }
}

The above command gives a response similar to the following:

{
  "aggregations": {
    "by_name": {
      "buckets": [
        {
          "key": "David P",
          "doc_count": 100
        },
        {
          "key": "Philipp K",
          "doc_count": 29
        },
        {
          "key": "Adrienne V",
          "doc_count": 26
        },
        {
          "key": "Carly R",
          "doc_count": 26
        },
        {
          "key": "Iulia F",
          "doc_count": 25
        },
        {
          "key": "Naoise R",
          "doc_count": 25
        },
        {
          "key": "Jelena Z",
          "doc_count": 24
        },
        {
          "key": "Matt R",
          "doc_count": 24
        }
      ]
    }
  }
}

Runtime field rich

Another way to enrich your data is to do it at search time rather than index time. This goes against the first sentence of this article, but sometimes, you need to make some trade-offs. Here, we want to trade search speed for flexibility.

The runtime field feature allows enrichment of search response objects, but cannot be used to query or aggregate data. A simple example of this functionality:

GET kibana_sample_data_logs/_search?filter_path=hits.hits.fields
{
  "size": 1,
  "query": {
    "match": {
      "clientip": "30.156.16.164"
    }
  },
  "runtime_mappings": {
    "enriched": {
        "type": "lookup",
        "target_index": "vip",
        "input_field": "clientip",
        "target_field": "ip",
        "fetch_fields": ["name", "vip"]
    }
  },
  "fields": [
    "clientip",
    "enriched"
  ],
  "_source": false
}

The above command gives the following response:

{
  "hits": {
    "hits": [
      {
        "fields": {
          "enriched": [
            {
              "name": [
                "David P"
              ],
              "vip": [
                true
              ]
            }
          ],
          "clientip": [
            "30.156.16.164"
          ]
        }
      }
    ]
  }
}

Note that this can also be added as part of the mapping:

PUT kibana_sample_data_logs/_mappings
{
  "runtime": {
    "enriched": {
      "type": "lookup",
      "target_index": "vip",
      "input_field": "clientip",
      "target_field": "ip",
      "fetch_fields": ["name", "vip"]
    }
  }
}

GET kibana_sample_data_logs/_search
{
  "size": 1,
  "query": {
    "match": {
      "clientip": "30.156.16.164"
    }
  },
  "fields": [
    "clientip",
    "enriched"
  ]
}

However, if you want to be able to search or aggregate these fields, you need to actually emit something when searching.

Note that we cannot use this method to lookup in another index. So, because and only because the length of the list is small, we can use a script to do the “enrichment” dynamically:

PUT kibana_sample_data_logs/_mappings
{
  "runtime": {
    "name": {
      "type": "keyword",
      "script": {
        "source":
        """
        def name=params.name;
        for (int i=0; i< params.lookup.length; i + + ) {
          if (params.lookup[i].ip == doc['clientip'].value) {
            emit(params.lookup[i].name);
            break;
          }
        }
        """,
        "lang": "painless",
        "params": {
          "name": "David P",
          "lookup": [
            { "ip" : "30.156.16.164", "vip": true, "name": "David P" },
            { "ip" : "164.85.94.243", "vip": true, "name": "Philipp K" },
            { "ip" : "50.184.59.162", "vip": true, "name": "Adrienne V" },
            { "ip" : "236.212.255.77", "vip": true, "name": "Carly R" },
            { "ip" : "16.241.165.21", "vip": true, "name": "Naoise R" },
            { "ip" : "246.106.125.113", "vip": true, "name": "Iulia F" },
            { "ip" : "81.194.200.150", "vip": true, "name": "Jelena Z" },
            { "ip" : "111.237.144.54", "vip": true, "name": "Matt R" }
          ]
        }
      }
    },
    "vip": {
      "type": "boolean",
      "script": {
        "source":
        """
        def name=params.name;
        for (int i=0; i< params.lookup.length; i + + ) {
          if (params.lookup[i].ip == doc['clientip'].value) {
            emit(params.lookup[i].vip);
            break;
          }
        }
        """,
        "lang": "painless",
        "params": {
          "name": "David P",
          "lookup": [
            { "ip" : "30.156.16.164", "vip": true, "name": "David P" },
            { "ip" : "164.85.94.243", "vip": true, "name": "Philipp K" },
            { "ip" : "50.184.59.162", "vip": true, "name": "Adrienne V" },
            { "ip" : "236.212.255.77", "vip": true, "name": "Carly R" },
            { "ip" : "16.241.165.21", "vip": true, "name": "Naoise R" },
            { "ip" : "246.106.125.113", "vip": true, "name": "Iulia F" },
            { "ip" : "81.194.200.150", "vip": true, "name": "Jelena Z" },
            { "ip" : "111.237.144.54", "vip": true, "name": "Matt R" }
          ]
        }
      }
    }
  }
}

We can aggregate these runtime fields again:

GET /kibana_sample_data_logs/_search?filter_path=aggregations.by_name.buckets
{
  "size": 0,
  "aggs": {
    "by_name": {
      "terms": {
        "field": "name"
      }
    }
  }
}

This gives the same results as we saw before, but of course a little slower:

{
  "aggregations": {
    "by_name": {
      "buckets": [
        {
          "key": "David P",
          "doc_count": 100
        },
        {
          "key": "Philipp K",
          "doc_count": 29
        },
        {
          "key": "Adrienne V",
          "doc_count": 26
        },
        {
          "key": "Carly R",
          "doc_count": 26
        },
        {
          "key": "Iulia F",
          "doc_count": 25
        },
        {
          "key": "Naoise R",
          "doc_count": 25
        },
        {
          "key": "Jelena Z",
          "doc_count": 24
        },
        {
          "key": "Matt R",
          "doc_count": 24
        }
      ]
    }
  }
}

Again, this method does not work with large indexes, so re-indexing the data as we saw in part one would be the preferred method.

The release and timing of any features or functionality described in this article are at the sole discretion of Elastic. Any features or functionality that are currently unavailable may not be delivered on time or at all.

The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge. MySQL entry-level skills treeDatabase compositionTable 76942 people are learning the system