MapReduce in MongoDB

Using MapReduce in MongoDB:

  • What are some benefits to MapReduce vs. Aggregation Pipelines
  • Parts of the mapReduce function.
  • How to create a mapReduce job.

 

 

Today, I would like to talk about some use cases for MapReduce in MongoDB and show how to create a simple MapReduce job:

MapReduce gives us flexibility where an Aggregation Pipeline does not. While MongoDB’s Aggregation framework is great for ad-hoc queries, MapReduce allows us to leverage the full functionality of JavaScript in queries. For instance, MapReduce provides the tools for us to create incremental aggregation over large collections. With JavaScript on our side, we can generate complex queries that have large outputs.

MapReduce really allows you to operationalize your database. We can store complex results in separate collections (unlike Aggregation Pipelines) and update them regularly. You could then drive follow on analytics from the results collection created by your MapReduce queries.

 

The mapReduce function:

The mapReduce function has three parts, the map, the reduce, and the options. The <map> and <reduce> parameters are both JavaScript functions while the <options> parameter is a document containing any options set (like outputs, see MongoDB documentation for further details). See the prototype below:

db.collection.mapReduce(
     <map>,
     <reduce>,
     {
          <options>
     }
)

Example:

Lets try an example, I have well forecast models in my Petro.ai database, let’s try to get the average arps parameters for each gas well by model in the database. I currently have 4 models for the just over 3400 wells in the database.

First, we need to write the <map> function:

var mapFunction = function(){
    if(this.model.gas){
        if(this.model.gas.arps){
            var key = this.modelName;
            var value = {count: 1,
                        qi : this.model.gas.arps.qi,
                        de : this.model.gas.arps.de,
                        b : this.model.gas.arps.b,
                        dmin : this.model.gas.arps.dmin,
                        di : this.model.gas.arps.di
            };
            emit(key , value);
        }
    }
}

The function above says that if we have a well with a arps gas model, let’s look at the object and get the modelName and the parameters for the well. We store this in as key and value to emit. Emit allows the listening reduce function to grab the results.

Next, we need to write the <reduce>:

var reduceFunction = function(key, values){
    var reducedValues = {count: 0,
                        qi : 0,
                        de : 0,
                        b : 0,
                        dmin : 0,
                        di : 0
    };
    for (var idx = 0; idx < values.length; idx++){
        reducedValues.count += values[idx].count;
        reducedValues.qi += values[idx].qi;
        reducedValues.de += values[idx].de;
        reducedValues.b += values[idx].b;
        reducedValues.dmin += values[idx].dmin;
        reducedValues.di += values[idx].di;
    };
    return reducedValues;
}

In the <reduce> function above, we create an empty object and gather results. We look at each result and add the values to the reducedValues object. We end up with one object (for each modelName) that has all the data we need to start the averaging.

From here, we exercise one of the <options> mongoDB gives us. We need to create a finalize function to average up the results:

var finalizeFunction  = function(key, reducedVal){
    reducedVal.avg = {};
    reducedVal.avg.qi = reducedVal.qi/reducedVal.count;
    reducedVal.avg.de = reducedVal.de/reducedVal.count;
    reducedVal.avg.b = reducedVal.b/reducedVal.count;
    reducedVal.avg.dmin = reducedVal.dmin/reducedVal.count;
    reducedVal.avg.di = reducedVal.di/reducedVal.count;
    
    return reducedVal;
}

The finalize function takes our result object by key (modelName) and performs the average calculation. The function creates an empty reducedVal.avg and then fills it with the calculations. This is then returned, giving us the modified object.

Let’s put that all together in the mapReduce function call:

db.getCollection('WellDeclineCurveModels').mapReduce(
    mapFunction,
    reduceFunction,
    {
        out: { inline: 1 },
        finalize : finalizeFunction
    }
)

Remember our finalize function is set in the <options>. Also notice that the out parameter is set to inline. Inline will give us an in memory object for us to inspect, just like an Aggregation Pipeline. MapReduce allows us to also specify a new collection and add to that collection over time if we choose, something that Aggregation Pipelines cannot do.

 

Conclusion

Time for the results:

{
    "results" : [ 
        {
            "_id" : "Test",
            "value" : {
                "count" : 584.0,
                "qi" : 147682.635953647,
                "de" : 313.673394551317,
                "b" : 722.443168817512,
                "dmin" : 35.0399999999999,
                "di" : 390.83688115506,
                "avg" : {
                    "qi" : 252.881225948026,
                    "de" : 0.537111976971433,
                    "b" : 1.23706022057793,
                    "dmin" : 0.0599999999999998,
                    "di" : 0.669241234854555
                }
            }
        },
       ...

This is just a subset (please contact me if you want it all!) but the idea is captured. We have a value object, and we can easily see that there are 584 wells in the ‘Test’ model. The avg object has the averages for this model’s arps parameters from the 584 wells.

From this point we can ask other questions and continue to develop the mapReduce job:

  • Has this changed over time?
  • Do the averages change by area?
  • Which wells are outliers?

We can even take the function and run it regularly and monitor the results in another collection.

Here is the full script:

//average gas arps params by modelName in Petro.ai

var mapFunction = function(){
    if(this.model.gas){
        if(this.model.gas.arps){
            var key = this.modelName;
            var value = {count: 1,
                        qi : this.model.gas.arps.qi,
                        de : this.model.gas.arps.de,
                        b : this.model.gas.arps.b,
                        dmin : this.model.gas.arps.dmin,
                        di : this.model.gas.arps.di
            };
            emit(key , value);
        }
    }
}


var reduceFunction = function(key, values){
    var reducedValues = {count: 0,
                        qi : 0,
                        de : 0,
                        b : 0,
                        dmin : 0,
                        di : 0
    };
    for (var idx = 0; idx < values.length; idx++){
        reducedValues.count += values[idx].count;
        reducedValues.qi += values[idx].qi;
        reducedValues.de += values[idx].de;
        reducedValues.b += values[idx].b;
        reducedValues.dmin += values[idx].dmin;
        reducedValues.di += values[idx].di;
    };
    return reducedValues;
}


var finalizeFunction  = function(key, reducedVal){
    reducedVal.avg = {};
    reducedVal.avg.qi = reducedVal.qi/reducedVal.count;
    reducedVal.avg.de = reducedVal.de/reducedVal.count;
    reducedVal.avg.b = reducedVal.b/reducedVal.count;
    reducedVal.avg.dmin = reducedVal.dmin/reducedVal.count;
    reducedVal.avg.di = reducedVal.di/reducedVal.count;
    
    return reducedVal;
}


db.getCollection('WellDeclineCurveModels').mapReduce(
    mapFunction,
    reduceFunction,
    {
        out: { inline: 1 },
        finalize : finalizeFunction
    }
)

If you have a Petro.ai database, you should be able to copypasta and run the script.

 

I hope you have seen that using MapReduce with MongoDB can be very powerful! As always, please let me know if you have have questions or comments.

 

 

 

Leave a Comment

Your email address will not be published. Required fields are marked *