打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
MongoDB: 10. MapReduce
2011-04-03 00:09
在 MongoDB 上使用 Map/Reduce 进行并行 "统计" 很容易。
db.runCommand(
{
mapreduce : <collection>,
map : <mapfunction>,
reduce : <reducefunction>
[, query : <query filter object>]
[, sort : <sort the query.  useful   optimization>] for
[, limit : <number of objects to   from collection>] return
[, out : <output-collection name>]
[, keeptemp: < | >] true false
[, finalize : <finalizefunction>]
[, scope : <object where fields go into javascript global scope >]
[, verbose :  ] true
});
参数说明:
?mapreduce: 要操作的目标集合。
?map: 映射函数 (生成键值对序列,作为 reduce 函数参数)。
?reduce: 统计函数。
?query: 目标记录过滤。
?sort: 目标记录排序。
?limit: 限制目标记录数量。
?out: 统计结果存放集合 (不指定则使用临时集合,在客户端断开后自动删除)。
?keeptemp: 是否保留临时集合。
?finalize: 最终处理函数 (对 reduce 返回结果进行最终整理后存入结果集合)。
?scope: 向 map、reduce、finalize 导入外部变量。
?verbose: 显示详细的时间统计信息。
官方文档有几句话很重要:
map/reduce is invoked via a database.  The database creates a temporary collection to hold output of the operation. The collection is command cleaned up when the client connection closes, or when explicitly dropped. Alternatively, one can specify a permanent output collection name. map and reduce functions are written in JavaScript and execute on the server.
In sharded environments, data processing of map/reduce operations runs in parallel on all shards.
MapReduce jobs on a single mongod process are single threaded. This is due to a design limitation in current JavaScript engines. We are looking into alternatives to solve this issue, but for now if you want to parallelize your MapReduce jobs, you will need to either use sharding or do the aggregation client-side in your code.
先准备点简单的数据练练手。
> for (var i = 0; i < 1000; i++) {
... var u = { name : "user" + i, age : i % 40 + 1, sex : i % 2 };
... db.users.insert(u);
... }
> db.users.ensureIndex({name:1})
> db.users.ensureIndex({age:1})
> db.users.count()
1000
> db.users.find().limit(10)
{ "_id" : ObjectId("4c89bf5b24280691541787b8"), "name" : "user0", "age" : 1, "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787b9"), "name" : "user1", "age" : 2, "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787ba"), "name" : "user2", "age" : 3, "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787bb"), "name" : "user3", "age" : 4, "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787bc"), "name" : "user4", "age" : 5, "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787bd"), "name" : "user5", "age" : 6, "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787be"), "name" : "user6", "age" : 7, "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787bf"), "name" : "user7", "age" : 8, "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787c0"), "name" : "user8", "age" : 9, "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787c1"), "name" : "user9", "age" : 10, "sex" : 1 }
1. Map
Map 函数必须调用 emit(key, value) 返回键值对,使用 this 访问当前待处理的 Document。
> m = function() { emit(this.age, 1) }
function () {
emit(this.age, 1);
}
value 可以使用 JSON Object 传递 (支持多个属性值)。
例如:
emit(this.age, {count:1})
2. Reduce
Reduce 函数接收的参数类似 Group 效果,将 Map 返回的键值序列组合成 { key, [value1, value2, value3, value...] } 传递给 reduce。
> r = function(key, values) {
... var x = 0;
... values.forEach(function(v) { x += v });
... return x;
... }
function (key, values) {
var x = 0;
values.forEach(function (v) {x += v;});
return x;
}
Reduce 函数对这些 values 进行 "统计" 操作,返回结果可以使用 JSON Object。
3. Result
我们不必使用 runCommand,改用 db.<collection>.mapReduce() 更方便一些。
> res = db.users.mapReduce(m, r)
{
"result" : "tmp.mr.mapreduce_1284097299_10",
"timeMillis" : 156,
"counts" : {
"input" : 1000,
"emit" : 1000,
"output" : 40
},
"ok" : 1,
}
> db[res.result].find()
{ "_id" : 1, "value" : 25 }
{ "_id" : 2, "value" : 25 }
{ "_id" : 3, "value" : 25 }
{ "_id" : 4, "value" : 25 }
{ "_id" : 5, "value" : 25 }
{ "_id" : 6, "value" : 25 }
{ "_id" : 7, "value" : 25 }
{ "_id" : 8, "value" : 25 }
{ "_id" : 9, "value" : 25 }
{ "_id" : 10, "value" : 25 }
{ "_id" : 11, "value" : 25 }
{ "_id" : 12, "value" : 25 }
{ "_id" : 13, "value" : 25 }
{ "_id" : 14, "value" : 25 }
{ "_id" : 15, "value" : 25 }
{ "_id" : 16, "value" : 25 }
{ "_id" : 17, "value" : 25 }
{ "_id" : 18, "value" : 25 }
{ "_id" : 19, "value" : 25 }
{ "_id" : 20, "value" : 25 }
has more
mapReduce() 将结果存储在 "tmp.mr.mapreduce_1284097299_10" 临时集合中。
4. Finalize
利用 finalize() 我们可以对 reduce() 的结果做进一步处理。
> f = function(key, value) { return {age:key, count:value}; }
function (key, value) {
return {age:key, count:value};
}
> res = db.users.mapReduce(m, r, {finalize:f})
{
"result" : "tmp.mr.mapreduce_1284098036_26",
"timeMillis" : 158,
"counts" : {
"input" : 1000,
"emit" : 1000,
"output" : 40
},
"ok" : 1,
}
> db[res.result].find()
{ "_id" : 1, "value" : { "age" : 1, "count" : 25 } }
{ "_id" : 2, "value" : { "age" : 2, "count" : 25 } }
{ "_id" : 3, "value" : { "age" : 3, "count" : 25 } }
{ "_id" : 4, "value" : { "age" : 4, "count" : 25 } }
{ "_id" : 5, "value" : { "age" : 5, "count" : 25 } }
{ "_id" : 6, "value" : { "age" : 6, "count" : 25 } }
{ "_id" : 7, "value" : { "age" : 7, "count" : 25 } }
{ "_id" : 8, "value" : { "age" : 8, "count" : 25 } }
{ "_id" : 9, "value" : { "age" : 9, "count" : 25 } }
{ "_id" : 10, "value" : { "age" : 10, "count" : 25 } }
{ "_id" : 11, "value" : { "age" : 11, "count" : 25 } }
{ "_id" : 12, "value" : { "age" : 12, "count" : 25 } }
{ "_id" : 13, "value" : { "age" : 13, "count" : 25 } }
{ "_id" : 14, "value" : { "age" : 14, "count" : 25 } }
{ "_id" : 15, "value" : { "age" : 15, "count" : 25 } }
{ "_id" : 16, "value" : { "age" : 16, "count" : 25 } }
{ "_id" : 17, "value" : { "age" : 17, "count" : 25 } }
{ "_id" : 18, "value" : { "age" : 18, "count" : 25 } }
{ "_id" : 19, "value" : { "age" : 19, "count" : 25 } }
{ "_id" : 20, "value" : { "age" : 20, "count" : 25 } }
has more
5. Options
我们还可以添加更多的控制细节。
> res = db.users.mapReduce(m, r, {query:{age:{$lt:10}}, sort:{name:1}, limit:5})
{
"result" : "tmp.mr.mapreduce_1284097888_25",
"timeMillis" : 20,
"counts" : {
"input" : 5,
"emit" : 5,
"output" : 3
},
"ok" : 1,
}
> db[res.result].find()
{ "_id" : 1, "value" : 2 }
{ "_id" : 2, "value" : 2 }
{ "_id" : 3, "value" : 1 }
6. Example
MapReduce 的作用不仅仅是 "统计",我们可以直接用这种在服务器端高速并发执行机制批量修改数据。
> m = function() { emit(this._id, this) }
function () {
emit(this._id, this);
}
> r = function(key, values) {
... update = function(v) {
...     db.users.update({_id:key}, {$inc:{age:1}}, false, false);
... }
... values.forEach(update);
... return key;
... }
function (key, values) {
update = function (v) {db.users.update({_id:key}, {$inc:{age:1}}, false, false);};
values.forEach(update);
return key;
}
> db.users.find().limit(10)
{ "_id" : ObjectId("4c89bf5b24280691541787b8"), "name" : "user0", "age" : 1, "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787b9"), "name" : "user1", "age" : 2, "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787ba"), "name" : "user2", "age" : 3, "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787bb"), "name" : "user3", "age" : 4, "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787bc"), "name" : "user4", "age" : 5, "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787bd"), "name" : "user5", "age" : 6, "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787be"), "name" : "user6", "age" : 7, "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787bf"), "name" : "user7", "age" : 8, "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787c0"), "name" : "user8", "age" : 9, "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787c1"), "name" : "user9", "age" : 10, "sex" : 1 }
> res = db.users.mapReduce(m, r, {limit:10})
{
"result" : "tmp.mr.mapreduce_1284098486_27",
"timeMillis" : 28,
"counts" : {
"input" : 10,
"emit" : 10,
"output" : 10
},
"ok" : 1,
}
> db.users.find().limit(10)
{ "_id" : ObjectId("4c89bf5b24280691541787b8"), "age" : 2, "name" : "user0", "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787b9"), "age" : 3, "name" : "user1", "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787ba"), "age" : 4, "name" : "user2", "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787bb"), "age" : 5, "name" : "user3", "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787bc"), "age" : 6, "name" : "user4", "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787bd"), "age" : 7, "name" : "user5", "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787be"), "age" : 8, "name" : "user6", "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787bf"), "age" : 9, "name" : "user7", "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787c0"), "age" : 10, "name" : "user8", "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787c1"), "age" : 11, "name" : "user9", "sex" : 1 }
7. PyMongo
最后当然得在 Python 调用一下。
In [1]: from pymongo import *
In [2]: conn = Connection()
In [3]: db = conn.test
In [4]: m = "function() { emit(this.age, 1); }"
In [5]: r = "function(key, values) { var x = 0; values.forEach(function(v){ x += v }); return x; }"
In [6]: res = db.users.map_reduce(m, r, True)
In [7]: for k in db[res["result"]].find(): print k
....:
{u'_id': 1.0, u'value': 24.0}
{u'_id': 2.0, u'value': 25.0}
{u'_id': 3.0, u'value': 25.0}
{u'_id': 4.0, u'value': 25.0}
{u'_id': 5.0, u'value': 25.0}
{u'_id': 6.0, u'value': 25.0}
{u'_id': 7.0, u'value': 25.0}
{u'_id': 8.0, u'value': 25.0}
{u'_id': 9.0, u'value': 25.0}
{u'_id': 10.0, u'value': 25.0}
{u'_id': 11.0, u'value': 26.0}
{u'_id': 12.0, u'value': 25.0}
{u'_id': 13.0, u'value': 25.0}
{u'_id': 14.0, u'value': 25.0}
{u'_id': 15.0, u'value': 25.0}
{u'_id': 16.0, u'value': 25.0}
{u'_id': 17.0, u'value': 25.0}
{u'_id': 18.0, u'value': 25.0}
{u'_id': 19.0, u'value': 25.0}
{u'_id': 20.0, u'value': 25.0}
{u'_id': 21.0, u'value': 25.0}
{u'_id': 22.0, u'value': 25.0}
{u'_id': 23.0, u'value': 25.0}
{u'_id': 24.0, u'value': 25.0}
{u'_id': 25.0, u'value': 25.0}
{u'_id': 26.0, u'value': 25.0}
{u'_id': 27.0, u'value': 25.0}
{u'_id': 28.0, u'value': 25.0}
{u'_id': 29.0, u'value': 25.0}
{u'_id': 30.0, u'value': 25.0}
{u'_id': 31.0, u'value': 25.0}
{u'_id': 32.0, u'value': 25.0}
{u'_id': 33.0, u'value': 25.0}
{u'_id': 34.0, u'value': 25.0}
{u'_id': 35.0, u'value': 25.0}
{u'_id': 36.0, u'value': 25.0}
{u'_id': 37.0, u'value': 25.0}
{u'_id': 38.0, u'value': 25.0}
{u'_id': 39.0, u'value': 25.0}
{u'_id': 40.0, u'value': 25.0}
附加参数也很容易。
In [10]: res = db.users.map_reduce(m, r, True, limit=10)
In [11]: res
Out[11]:
{u'counts': {u'emit': 10, u'input': 10, u'output': 10},
u'ok': 1.0,
u'result': u'tmp.mr.mapreduce_1284099468_31',
u'timeMillis': 20}
In [12]: for k in db[res["result"]].find(): print k
....:
{u'_id': 2.0, u'value': 1.0}
{u'_id': 3.0, u'value': 1.0}
{u'_id': 4.0, u'value': 1.0}
{u'_id': 5.0, u'value': 1.0}
{u'_id': 6.0, u'value': 1.0}
{u'_id': 7.0, u'value': 1.0}
{u'_id': 8.0, u'value': 1.0}
{u'_id': 9.0, u'value': 1.0}
{u'_id': 10.0, u'value': 1.0}
{u'_id': 11.0, u'value': 1.0}
In [13]: res = db.users.map_reduce(m, r, True, query={"age":{"$lt":20}})
In [14]: res
Out[14]:
{u'counts': {u'emit': 475, u'input': 475, u'output': 19},
u'ok': 1.0,
u'result': u'tmp.mr.mapreduce_1284099533_33',
u'timeMillis': 77}
In [15]: for k in db[res["result"]].find(): print k
....:
{u'_id': 1.0, u'value': 24.0}
{u'_id': 2.0, u'value': 25.0}
{u'_id': 3.0, u'value': 25.0}
{u'_id': 4.0, u'value': 25.0}
{u'_id': 5.0, u'value': 25.0}
{u'_id': 6.0, u'value': 25.0}
{u'_id': 7.0, u'value': 25.0}
{u'_id': 8.0, u'value': 25.0}
{u'_id': 9.0, u'value': 25.0}
{u'_id': 10.0, u'value': 25.0}
{u'_id': 11.0, u'value': 26.0}
{u'_id': 12.0, u'value': 25.0}
{u'_id': 13.0, u'value': 25.0}
{u'_id': 14.0, u'value': 25.0}
{u'_id': 15.0, u'value': 25.0}
{u'_id': 16.0, u'value': 25.0}
{u'_id': 17.0, u'value': 25.0}
{u'_id': 18.0, u'value': 25.0}
{u'_id': 19.0, u'value': 25.0}
更多细节参考官方文档。
http://blog.nosqlfan.com/html/617.html
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
Python操作MongoDB数据库
MongoDB
泛型List<T>作为DataGriView数据源
foreach 二维数组循环
字典基本用法
php导出excel电子表格
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服