⚡ ElasticsearchBook is crafted by Jozef Sorocin (🟢 Book a consulting hour) and powered by:
- Spatialized.io (Elasticsearch & Google Maps consulting)
- in cooperation with Garages-Near-Me.com (Effortless parking across Germany)
Use Case
I don't have direct control over my incoming documents' structure but before they get inserted into my index I want to:
- skip a document if it comes from a test environment (
"env": "test"
)
- drop all empty fields (empty strings, empty arrays,
null
)
- remove leading underscores from attribute names (
_color
→color
)
- concatenate two string fields together (
color
+category
)
Can I do it in one go?
My documents will look like this:
{
"env": "staging",
"_tags": [],
"null": null,
"_category": "jackets",
"_color": "white"
}
and I'd like to end up with:
{
"env" : "staging",
"color" : "white",
"category" : "jackets"
"seo_category" : "white jackets"
}
Approach
Every time a document is about to be inserted, there's the possibility to let it run through a pipeline*.
A pipeline is composed of blocks called processors that will be executed in the order they've been declared. There are dozens of built-in processors available but you can compose your own in a scripting language called Painless (documented here and here.)
With that being said, we'll name our pipeline my_data_cleanser
, define the processor blocks, and save it inside _ingest/pipeline
PUT _ingest/pipeline/my_data_cleanser
{
"description": "Runs a doc thru data cleanser...",
"processors": [
{
"drop": {
"if" : "ctx.env == 'test'"
}
},
{
"script": {
"source": """
// For more on empty fields, check out Field Existence, Length, and Size
def keys_to_remove = ctx.keySet()
.stream()
.filter(field -> ctx[field] == null ||
ctx[field] == "" ||
(ctx[field] instanceof ArrayList && ctx[field].isEmpty()))
.collect(Collectors.toList());
for (key in keys_to_remove) {
ctx.remove(key);
}
"""
}
},
{
"script": {
"source": """
def forbidden_keys = [
'_type',
'_id',
'_version_type',
'_index',
'_version'
];
def keys_to_remove = [];
def corrected_keys_map = [:];
for (def pair : ctx.entrySet()) {
def key = pair.getKey();
if (forbidden_keys.contains(key)) {
continue;
}
def value = pair.getValue();
if (!key.startsWith('_')) {
continue;
}
// we care only about the first underscore
def new_key = key.substring(1);
if (new_key.length() < 1) {
continue;
}
corrected_keys_map[new_key] = value;
keys_to_remove.add(key);
}
// delete underscored pairs & prevent ConcurrentModificationException
ctx.entrySet().removeIf(e -> keys_to_remove.contains(e.getKey()));
// save the corrected entries back to the doc context
ctx.putAll(corrected_keys_map);
"""
}
},
{
"set" : {
"field": "seo_category",
"value": "{{_source.color}} {{_source.category}}"
}
}
]
}