Pre-Indexing: Pipelines

Use Case

I don't have direct control over my incoming documents' structure but before they get inserted into my index I want to:
  1. skip a document if it comes from a test environment ("env": "test")
  1. drop all empty fields (empty strings, empty arrays, null)
  1. remove leading underscores from attribute names (_colorcolor)
  1. concatenate two string fields together (color + category)
Can I do it in one go?
My documents will look like this:
  "env": "staging",
  "_tags": [],
  "null": null,
  "_category": "jackets",
  "_color": "white"
and I'd like to end up with:
  "env" : "staging",
  "color" : "white",
  "category" : "jackets"
  "seo_category" : "white jackets"


Every time a document is about to be inserted, there's the possibility to let it run through a pipeline*. A pipeline is composed of blocks called processors that will be executed in the order they've been declared. There are dozens of built-in processors available but you can compose your own in a scripting language called Painless (documented here and here.)
With that being said, we'll name our pipeline my_data_cleanser, define the processor blocks, and save it inside _ingest/pipeline
PUT _ingest/pipeline/my_data_cleanser
  "description": "Runs a doc thru data cleanser...",
  "processors": [
      "drop": {
        "if" : "ctx.env == 'test'"

      "script": {
        "source": """
          // For more on empty fields, check out Field Existence, Length, and Size
          def keys_to_remove = ctx.keySet()
                          .filter(field -> ctx[field] == null || 
                                           ctx[field] == "" || 
                                           (ctx[field] instanceof ArrayList && ctx[field].isEmpty()))

          for (key in keys_to_remove) {

      "script": {
        "source": """
          def forbidden_keys = [
          def keys_to_remove = [];
          def corrected_keys_map = [:];
          for (def pair : ctx.entrySet()) {
            def key = pair.getKey();
            if (forbidden_keys.contains(key)) {
            def value = pair.getValue();
            if (!key.startsWith('_')) {
						// we care only about the first underscore
						def new_key = key.substring(1);
            if (new_key.length() < 1) {
            corrected_keys_map[new_key] = value;
          // delete underscored pairs & prevent ConcurrentModificationException
          ctx.entrySet().removeIf(e -> keys_to_remove.contains(e.getKey()));
          // save the corrected entries back to the doc context

      "set" : {
        "field": "seo_category",
        "value": "{{_source.color}} {{_source.category}}"

