一篇文章带你搞定 MongoDB 中 MapReduce 使用

玩过 Hadoop 的小伙伴对 MapReduce 应该不陌生,MapReduce 的强大且灵活,它可以将一个大问题分解为多个小问题,将各个小问题发送到不同的机器上去处理,所有的机器都完成计算后,再将计算结果合并为一个完整的解决方案,这就是所谓的分布式计算。此处我们就来看看 MongoDB 中 MapReduce 的使用。

一、mapReduce

MongoDB 中的 MapReduce 可以用来实现更复杂的聚合命令,使用 MapReduce 主要实现两个函数:
map 函数和 reduce 函数,map 函数生成键值对序列,map 函数的结果作为 reduce 函数的参数,reduce函数中再做进一步的统计,某些我的数据集如下:

{"_id" : ObjectId("59fa71d71fd59c3b2cd908d7"),"name" : "鲁迅","book" : "呐喊","price" : 38.0,"publisher" : "人民文学出版社"}
{"_id" : ObjectId("59fa71d71fd59c3b2cd908d8"),"name" : "曹雪芹","book" : "红楼梦","price" : 22.0,"publisher" : "人民文学出版社"}
{"_id" : ObjectId("59fa71d71fd59c3b2cd908d9"),"name" : "钱钟书","book" : "宋诗选注","price" : 99.0,"publisher" : "人民文学出版社"}
{"_id" : ObjectId("59fa71d71fd59c3b2cd908da"),"name" : "钱钟书","book" : "谈艺录","price" : 66.0,"publisher" : "三联书店"}
{"_id" : ObjectId("59fa71d71fd59c3b2cd908db"),"name" : "鲁迅","book" : "彷徨","price" : 55.0,"publisher" : "花城出版社"}

假如我想查询每位作者所出的书的总价,操作如下:

var map=function(){emit(this.name,this.price)}
var reduce=function(key,value){return Array.sum(value)}
var options={out:"totalPrice"}
db.sang_books.mapReduce(map,reduce,options);
db.totalPrice.find()

emit 函数用于生成 key-value 数据集合emit(key, value); OR emit(key, {v1:v1, v2:v2});
第一个参数是key, 第二个参数是key对应的数据集合

emit 函数主要实现的分组,接收两个参数,第一个参数表示分组的分段,第二个参数表示要统计的数据,减少做特定的数据处理操作,接收两个参数,对应的 emit 方法的两个参数,此处使用了 Array 中的sum 对 price 分段进行自加处理,options 中定义了将结果输出的集合,届时我们将在此集合中去查询数据,很少情况下,这个集合即使在数据库重启后也会保留,并保留集合中的数据。
查询结果如下:

{
    "_id" : "曹雪芹",
    "value" : 22.0
}
{
    "_id" : "钱钟书",
    "value" : 165.0
}
{
    "_id" : "鲁迅",
    "value" : 93.0
}

再加上我想查询每位作者出了几本书,如下:

var map=function(){emit(this.name,1)}
var reduce=function(key,value){return Array.sum(value)}
var options={out:"bookNum"}
db.sang_books.mapReduce(map,reduce,options);
db.bookNum.find()

查询结果如下:

{
    "_id" : "曹雪芹",
    "value" : 1.0
}
{
    "_id" : "钱钟书",
    "value" : 2.0
}
{
    "_id" : "鲁迅",
    "value" : 2.0
}

将每位作者的书列出来,如下:

var map=function(){emit(this.name,this.book)}
var reduce=function(key,value){return value.join(',')}
var options={out:"books"}
db.sang_books.mapReduce(map,reduce,options);
db.books.find()

结果如下:

{
    "_id" : "曹雪芹",
    "value" : "红楼梦"
}
{
    "_id" : "钱钟书",
    "value" : "宋诗选注,谈艺录"
}
{
    "_id" : "鲁迅",
    "value" : "呐喊,彷徨"
}

例如查询每个人体重在¥ 40以上的书:

var map=function(){emit(this.name,this.book)}
var reduce=function(key,value){return value.join(',')}
var options={query:{price:{$gt:40}},out:"books"}
db.sang_books.mapReduce(map,reduce,options);
db.books.find()

查询表示对查到的集合再进行筛选。

结果如下:

{
    "_id" : "钱钟书",
    "value" : "宋诗选注,谈艺录"
}
{
    "_id" : "鲁迅",
    "value" : "彷徨"
}

二、runCommand 实现

我们也可以利用 runCommand 命令来执行 MapReduce。格式如下:

db.runCommand(
               {
                 mapReduce: <collection>,
                 map: <function>,
                 reduce: <function>,
                 finalize: <function>,
                 out: <output>,
                 query: <document>,
                 sort: <document>,
                 limit: <number>,
                 scope: <document>,
                 jsMode: <boolean>,
                 verbose: <boolean>,
                 bypassDocumentValidation: <boolean>,
                 collation: <document>
               }
             )

含义如下:

参数意味着
mapReduce表示要操作的集合
mapmap函数
reducereduce函数
finalize最终处理函数
out输出的集合
query对结果进行过滤
sort对结果排序
limit返回的结果数
scope设置参数值,在这里设置的值在map,reduce,finalize函数中可见
jsMode是否将地图执行的中间数据由javascript对象转换成BSON对象,替换为false
verbose是否显示详细的时间统计信息
bypassDocumentValidation是否绕过文档验证
collation其他一些校对

如下操作,表示执行MapReduce操作重新统计的集合限制返回条数,限制返回条数之后再进行统计操作,如下:

var map=function(){emit(this.name,this.book)}
var reduce=function(key,value){return value.join(',')}
db.runCommand({mapreduce:'sang_books',map,reduce,out:"books",limit:4,verbose:true})
db.books.find()

执行结果如下:

{
    "_id" : "曹雪芹",
    "value" : "红楼梦"
}
{
    "_id" : "钱钟书",
    "value" : "宋诗选注,谈艺录"
}
{
    "_id" : "鲁迅",
    "value" : "呐喊"
}

小伙伴们看到,鲁迅在一本书不见了,就是因为limit是先限制集合返回条数,然后再执行统计操作。

finalize 操作表示最终处理函数,如下:

var f1 = function(key,reduceValue){var obj={};obj.author=key;obj.books=reduceValue; return obj}
var map=function(){emit(this.name,this.book)}
var reduce=function(key,value){return value.join(',')}
db.runCommand({mapreduce:'sang_books',map,reduce,out:"books",finalize:f1})
db.books.find()

f1 第一个参数键表示emit中的第一个参数,第二个参数表示reduce的执行结果,我们可以在f1中对这个结果进行再处理,结果如下:

{
    "_id" : "曹雪芹",
    "value" : {
        "author" : "曹雪芹",
        "books" : "红楼梦"
    }
}
{
    "_id" : "钱钟书",
    "value" : {
        "author" : "钱钟书",
        "books" : "宋诗选注,谈艺录"
    }
}
{
    "_id" : "鲁迅",
    "value" : {
        "author" : "鲁迅",
        "books" : "呐喊,彷徨"
    }
}

scope则可以用来定义一个在地图,reduce和finalize中都可见的变量,如下:

var f1 = function(key,reduceValue){var obj={};obj.author=key;obj.books=reduceValue;obj.sang=sang; return obj}
var map=function(){emit(this.name,this.book)}
var reduce=function(key,value){return value.join(',--'+sang+'--,')}
db.runCommand({mapreduce:'sang_books',map,reduce,out:"books",finalize:f1,scope:{sang:"haha"}})
db.books.find()

执行结果如下:

{
    "_id" : "曹雪芹",
    "value" : {
        "author" : "曹雪芹",
        "books" : "红楼梦",
        "sang" : "haha"
    }
}
{
    "_id" : "钱钟书",
    "value" : {
        "author" : "钱钟书",
        "books" : "宋诗选注,--haha--,谈艺录",
        "sang" : "haha"
    }
}
{
    "_id" : "鲁迅",
    "value" : {
        "author" : "鲁迅",
        "books" : "呐喊,--haha--,彷徨",
        "sang" : "haha"
    }
}