Pre-Indexing: Pipelines

🏡 Home 📖 Chapter Home 👉 Next

Use Case

I don't have direct control over my incoming documents' structure but before they get inserted into my index I want to:
  1. skip a document if it comes from a test environment ("env": "test")
  1. drop all empty fields (empty strings, empty arrays, null)
  1. remove leading underscores from attribute names (_colorcolor)
  1. concatenate two string fields together (color + category)
Can I do it in one go?
 
My documents will look like this:
{
  "env": "staging",
  "_tags": [],
  "null": null,
  "_category": "jackets",
  "_color": "white"
}
and I'd like to end up with:
{
  "env" : "staging",
  "color" : "white",
  "category" : "jackets"
  "seo_category" : "white jackets"
}
 

Approach

Every time a document is about to be inserted, there's the possibility to let it run through a pipeline*. A pipeline is composed of blocks called processors that will be executed in the order they've been declared. There are dozens of built-in processors available but you can compose your own in a scripting language called Painless (documented here and here.)
With that being said, we'll name our pipeline my_data_cleanser, define the processor blocks, and save it inside _ingest/pipeline
PUT _ingest/pipeline/my_data_cleanser
{
  "description": "Runs a doc thru data cleanser...",
  "processors": [
    {
      "drop": {
        "if" : "ctx.env == 'test'"
      }
    },

    {
      "script": {
        "source": """
          // For more on empty fields, check out Field Existence, Length, and Size
          def keys_to_remove = ctx.keySet()
                          .stream()
                          .filter(field -> ctx[field] == null || 
                                           ctx[field] == "" || 
                                           (ctx[field] instanceof ArrayList && ctx[field].isEmpty()))
                          .collect(Collectors.toList());

          for (key in keys_to_remove) {
            ctx.remove(key);
          }
        """
      }
    },

    {
      "script": {
        "source": """
          def forbidden_keys = [
            '_type',
            '_id',
            '_version_type',
            '_index',
            '_version'
          ];
          
          def keys_to_remove = [];
          def corrected_keys_map = [:];
          
          for (def pair : ctx.entrySet()) {
            def key = pair.getKey();
            if (forbidden_keys.contains(key)) {
              continue;
            }
            def value = pair.getValue();
            
            if (!key.startsWith('_')) {
              continue;
            }
            
						// we care only about the first underscore
						def new_key = key.substring(1);
            if (new_key.length() < 1) {
              continue;
            }
            corrected_keys_map[new_key] = value;
            
            keys_to_remove.add(key);
          }
          
          // delete underscored pairs & prevent ConcurrentModificationException
          ctx.entrySet().removeIf(e -> keys_to_remove.contains(e.getKey()));
         
          // save the corrected entries back to the doc context
          ctx.putAll(corrected_keys_map);
        """
      }
    },

    {
      "set" : {
        "field": "seo_category",
        "value": "{{_source.color}} {{_source.category}}"
      }
    }
  ]
}

Already purchased? Sign in here.