打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
MongoDB千万级数据的分析

前段时间网上出来了一批千万级别的数据,本着学习的态度O(∩_∩)O下载下来处理了一下。尊重隐私,所有试验都是只针对所有数据进行统计分析,不针对某条数据的分析。

一、导入

清单1:

读取CSV文件,存储到数据库中

01#-*- coding:UTF-8 -*-
02'''
03Created on 2013-10-20
04  
05  
06@author: tyk
07  
08  
09  
10'''
11from pymongo.connection import Connection
12from time import time
13import codecs
14import csv
15import os
16rootdir = "2000W/"                                   # 指明被遍历的文件夹
17'''
18      
19'''
20def process_data():
21    conn = Connection('localhost', 27017) #获取一个连接
22    ##conn.drop_database('guestHouse') 
23    db = conn.TYK 
24    guest = db.guestHouse              
25  
26  
27    guest_info = []
28    for parent, dirnames, filenames in os.walk(rootdir):    #三个参数:分别返回1.父目录 2.所有文件夹名字(不含路径) 3.所有文件名字
29        for filename in filenames:  
30            ErrorLine = []
31            key_length = 0
32            fullname = os.path.join(parent,filename)
33            try:
34                #with codecs.open(fullname, encoding='utf_8') as file:
35                with codecs.open(fullname, encoding='utf_8_sig') as file:#忽略UTF-8文件前面的BOM
36                    keys = file.readline().split(',')#先读掉第一行的注释
37                    key_length = len(keys)
38                    spamreader = csv.reader(file)#以CSV格式读取,返回的不再是str,而是list
39                    for line in spamreader:
40                        if key_length != len(line):#部分数据不完整,记录下来
41                            ErrorLine.append(line)
42                        else:
43                            each_info = {}
44                            for i in range(1, len(keys)):#过滤第一个字段Name,姓名将不再存到数据库中
45                                each_info[keys[i]] = line[i]
46  
47                            guest_info.append(each_info)
48                            if len(guest_info) == 10000:#每10000条进行一次存储操作
49                                guest.insert(guest_info)  
50                                guest_info = []
51                                  
52            except Exception, e:
53                print filename + "\t" + str(e)
54                  
55            #统一处理错误信息
56            with open('ERR/' + os.path.splitext(filename)[0] + '-ERR.csv', 'w') as log_file:
57                spamwriter = csv.writer(log_file)
58                for line in ErrorLine:
59                    spamwriter.writerow(line)
60    #最后一批
61    guest.insert(guest_info)  
62      
63if __name__ == '__main__':
64    start = time()
65    process_data()
66    stop = time()
67    print(str(stop-start) + "秒")

后来睡着了、关机了,耗时多久也不得而知了⊙﹏⊙b汗

总结:

1.文件编码为UTF-8,不能直接open()打开读取。

2.文件已CSV格式进行存储,读取时用CSV模块处理来读取。这是读出来的数据每行为一个list。注意,不能简单的以","拆分后进行读取。对于这种形状"a,b,c", d的数据是无法正确解析的。

3.对于UTF-8文件,如果有BOM的形式去读是要以'utf_8_sig'编码读取,这样会跳过开头的BOM。如果不处理掉BOM,BOM会随数据一同存到数据库中,造成类似" XXX"的现象(有一个空格的假象)。

参考:http://docs.python.org/2/library/codecs.html#module-encodings.utf_8_sig

http://www.cnblogs.com/DDark/archive/2011/11/28/2266085.html

如果真的已经存到库中了,那只有改key了

1db.guestHouse.update({}, {"$rename" : {" Name" : "Name"}}, false, true)

另外,网上还有一种方法(尝试失败了,具体原因应该是把字符串转换成字节码然后再去比较。怎么转这个我还不会...)

1#with codecs.open(fullname, encoding='utf-8') as file:
2with codecs.open(fullname, encoding='utf_8_sig') as file:
3    keys = file.readline().split(',')
4    if keys[0][:3] == codecs.BOM_UTF8:#将keys[0]转化为字节码再去比较
5        keys[0] = keys[0][3:]

扩展:

今天发现MongoDB本身就带有导入功能mongoimport,可以直接导入CSV文件...

小试一把

1.不做错误数据过滤,直接导入。用专利引用数据做一下实验(《Hadoop权威指南》一书中的实验数据)

实验数据:

01"PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD","SECDLWBD"
023070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,
033070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,
043070803,1963,1096,,"US",
05"IL",,1,,2,6,63,,9,,0.3704,,,,,,,
063070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,
073070805,1963,1096,,"US","CA",,1,,2,6,63,,1,,0,,,,,,,
083070806,1963,1096,,"US","PA",,1,,2,6,63,,0,,,,,,,,,
093070807,1963,1096,,"US","OH",,1,,623,3,39,,3,,0.4444,,,,,,,
103070808,1963,1096,,"US","IA",,1,,623,3,39,,4,,0.375,,,,,,,
113070809,1963,1096,,,,1,,4,6,65,,0,,,,,,,,,
1mongoimport -d TYK -c guest --type csv --file d:\text.csv --headerline
一共11行。第一行注释,9条数据。第3条中间截断,第9条取出中间两个数值"US","AZ"。按照csv规定现在应该是10条数据

结果:

01> db.guest.find({}, {"PATENT" : 1, "_id" : 1})
02{ "_id" : ObjectId("52692c2a0b082a1bbb727d86"), "PATENT" : 3070801 }
03{ "_id" : ObjectId("52692c2a0b082a1bbb727d87"), "PATENT" : 3070802 }
04{ "_id" : ObjectId("52692c2a0b082a1bbb727d88"), "PATENT" : 3070803 }
05{ "_id" : ObjectId("52692c2a0b082a1bbb727d89"), "PATENT" : "IL" }
06{ "_id" : ObjectId("52692c2a0b082a1bbb727d8a"), "PATENT" : 3070804 }
07{ "_id" : ObjectId("52692c2a0b082a1bbb727d8b"), "PATENT" : 3070805 }
08{ "_id" : ObjectId("52692c2a0b082a1bbb727d8c"), "PATENT" : 3070806 }
09{ "_id" : ObjectId("52692c2a0b082a1bbb727d8d"), "PATENT" : 3070807 }
10{ "_id" : ObjectId("52692c2a0b082a1bbb727d8e"), "PATENT" : 3070808 }
11{ "_id" : ObjectId("52692c2a0b082a1bbb727d8f"), "PATENT" : 3070809 }
12> db.guest.count()
1310
14>
刚好10条,可见此命令导入是不会过滤异常数据。

2.以UTF-8有BOM格式再试一次。实验数据同上

01> db.guest.find({}, {"PATENT" : 1, "_id" : 1})
02{ "_id" : ObjectId("52692d730b082a1bbb727d90"), "PATENT" : 3070801 }
03{ "_id" : ObjectId("52692d730b082a1bbb727d91"), "PATENT" : 3070802 }
04{ "_id" : ObjectId("52692d730b082a1bbb727d92"), "PATENT" : 3070803 }
05{ "_id" : ObjectId("52692d730b082a1bbb727d93"), "PATENT" : "IL" }
06{ "_id" : ObjectId("52692d730b082a1bbb727d94"), "PATENT" : 3070804 }
07{ "_id" : ObjectId("52692d730b082a1bbb727d95"), "PATENT" : 3070805 }
08{ "_id" : ObjectId("52692d730b082a1bbb727d96"), "PATENT" : 3070806 }
09{ "_id" : ObjectId("52692d730b082a1bbb727d97"), "PATENT" : 3070807 }
10{ "_id" : ObjectId("52692d730b082a1bbb727d98"), "PATENT" : 3070808 }
11{ "_id" : ObjectId("52692d730b082a1bbb727d99"), "PATENT" : 3070809 }
12> db.guest.count()
1310
结果同上面一样,key"PATENT "中并没有因BOM引起的空格

3.mongoimport命令解释

1mongoimport -d TYK -c guest --type csv --file d:\text.csv --headerline
2-d 数据库
3-c 集合
4--type 数据格式
5--file 文件路径
6--headerline 貌似指定这个后以第一行为key,另 -f 可以指定key “-f Name, age”

二、统计分析

1.根据性别统计

由于数据不规范,先查询一下有多少种方式来表示性别的

1db.runCommand({"distinct" : "guestHouse", "key" : "Gender"})
01{
02        "values" : [
03                "M",
04                "F",
05                "0",
06                " ",
07                "1",
08                "",
09                "19790522",
10                "#0449",
11                "#M",
12                "",
13                "N"
14        ],
15        "stats" : {
16                "n" : 20048891,
17                "nscanned" : 20048891,
18                "nscannedObjects" : 20048891,
19                "timems" : 377764,
20                "cursor" : "BasicCursor"
21        },
22        "ok" : 1
23}
一共有11中方式表示性别的...那就以M、F做下统计吧
1#总数据
2db.guestHouse.count()
320048891
1#男 M
2db.guestHouse.count({"Gender":"M"})
312773070
464%
1#女 F
2db.guestHouse.count({"Gender":"F"})
36478745
432%
饼状图

总结:

1.带条件count时速度是非常慢的,猜测在count时可能先进行的查询操作,如果是查询加索引效果会好很多。对Gender加索引,效果明显提高了,但仍然是N秒级别的。显然在实时情况下还是不行的。另外随意加索引也会遇其它方面的问题。在用索引时能达到一个平衡点很重要的啊。

12013-10-24
2查看count的js解释
3> db.guestHouse.count
4function ( x ){
5    return this.find( x ).count();
6}
7>
8果然是先find,后count

2、根据身份证分析性别

从上面数据看,大约有4%的数据性别不详。

15位身份证号码:第7、8位为出生年份(两位数),第9、10位为出生月份,第11、12位代表出生日期,第15位代表性别,奇数为男,偶数为女。 18位身份证号码:第7、8、9、10位为出生年份(四位数),第11、第12位为出生月份,第13、14位代表出生日期,第17位代表性别,奇数为男,偶数为女。
要根据身份证来分析的话,明显不好直接处理分析了。那么就尝试一下编写MapReduce算一下吧,但是单机MapReduce速度会更慢。

先了解一下数据,看看有多少证件类型

01> db.runCommand({"distinct" : "guestHouse", "key" : "CtfTp"})
02{
03        "values" : [
04                "OTH",
05                "GID",
06                "ID",
07                "TBZ",
08                "VSA",
09                "TXZ",
10                "JID",
11                "JZ",
12                "HXZ",
13                "JLZ",
14                "#ID",
15                "hvz",
16                "待定",
17                "11",
18                "",
19                "SBZ",
20                "JGZ",
21                "HKB",
22                "TSZ",
23                "JZ1",
24                " ",
25                "Id",
26                "#GID",
27                "1"
28        ],
29        "stats" : {
30                "n" : 20048891,
31                "nscanned" : 20048891,
32                "nscannedObjects" : 20048891,
33                "timems" : 610036,
34                "cursor" : "BasicCursor"
35        },
36        "ok" : 1
37}
38>

数据依旧的乱,那就暂且以"ID"来统计一下吧

01>map = function() {
02    if (this.CtfTp == "ID") {
03        if (this.CtfId.length == 18){
04            emit(parseInt(this.CtfId.charAt(16)) % 2, {count : 1}) //1为男,0为女
05        }else if (this.CtfId.length == 15) { 
06            emit(parseInt(this.CtfId.charAt(14)) % 2, {count : 1}) //无法解析时为NaN
07        }
08    } else {
09        emit(-1, {count : 1})
10    }
11}
12  
13>reduce = function(key, emits) {
14    total = 0;
15    for (var i in emits) {
16        total += emits[i].count;
17    }
18    return {"count" : total};
19}
20  
21>mr = db.runCommand(
22               {
23                 mapReduce: "guestHouse",
24                 map: map,
25                 reduce: reduce,
26                 out: "TYK.guestHouse.output",
27                 verbose: true
28               }
29             )
30>{
31        "result" : "guestHouse.output",
32        "timeMillis" : 999097,
33        "timing" : {
34                "mapTime" : 777955,
35                "emitLoop" : 995248,
36                "reduceTime" : 111217,
37                "mode" : "mixed",
38                "total" : 999097
39        },
40        "counts" : {
41                "input" : 20048891,
42                "emit" : 19928098,
43                "reduce" : 594610,
44                "output" : 4
45        },
46        "ok" : 1
47}

结果:

1> db.guestHouse.output.find()
2{ "_id" : NaN, "value" : { "count" : 1360 } }
3{ "_id" : -1, "value" : { "count" : 1161164 } }
4{ "_id" : 0, "value" : { "count" : 6831007 } }
5{ "_id" : 1, "value" : { "count" : 11934567 } }
6>
总结:

1.速度比直接count({"Gender" : "M"}),并且资源占用不明显。IO压力不大,CPU压力不大。

2.结果中数据加起来为“19928098”条,比总条数“20048891”少了“120793”条,少在哪了?


3、统计各省、地区的情况

清单1:


01map = function() {
02    //var idCard_reg = /(^\d{15}$)|(^\d{18}$)|(^\d{17}(\d|X|x)$)/; 
03    //var idCard_reg = /(^[1-6]\d{14}$)|(^[1-6]\d{17}$)|(^[1-6]\d{16}(\d|X|x)$)/; 
04    //((1[1-5])|(2[1-3])|(3[1-7])|(4[1-6])|(5[1-4])|(6[1-5]))
05    var idCard_reg = /(^((1[1-5])|(2[1-3])|(3[1-7])|(4[1-6])|(5[1-4])|(6[1-5]))\d{13}$)|(^((1[1-5])|(2[1-3])|(3[1-7])|(4[1-6])|(5[1-4])|(6[1-5]))\d{16}$)|(^((1[1-5])|(2[1-3])|(3[1-7])|(4[1-6])|(5[1-4])|(6[1-5]))\d{15}(\d|X|x)$)/;
06    if (this.CtfTp == "ID" && idCard_reg.test(this.CtfId)) {
07        emit(this.CtfId.substr(0, 2), {count : 1}) //截取前两位 地区,省、直辖市、自治区
08    } else {
09        emit(-1, {count : 1})
10    }
11}
12  
13reduce = function(key, emits) {
14    total = 0;
15    for (var i in emits) {
16        total += emits[i].count;
17    }
18    return {"count" : total};
19}
20  
21mr  =   db.runCommand(
22               {
23                 mapReduce: "guestHouse",
24                 map: map,
25                 reduce: reduce,
26                 out: "guestHouse.provinceOutput",
27                 verbose: true
28               }
29             )
30  
31{
32        "result" : "guestHouse.provinceOutput",
33        "timeMillis" : 1173216,
34        "timing" : {
35                "mapTime" : 900703,
36                "emitLoop" : 1169954,
37                "reduceTime" : 157916,
38                "mode" : "mixed",
39                "total" : 1173216
40        },
41        "counts" : {
42                "input" : 20048891,
43                "emit" : 20048891,
44                "reduce" : 1613854,
45                "output" : 31
46        },
47        "ok" : 1
48}
身份证号码参考: http://baike.baidu.com/view/188003.htm#1_2

结果信息:

01> db.guestHouse.provinceOutput.find().sort({"value.count" : -1})
02{ "_id" : "32", "value" : { "count" : 2398111 } } //江苏
03{ "_id" : -1, "value" : { "count" : 1670289 } } //不详
04{ "_id" : "37", "value" : { "count" : 1523357 } } //山东
05{ "_id" : "33", "value" : { "count" : 1341274 } } //浙江
06{ "_id" : "41", "value" : { "count" : 1120455 } } //河南
07{ "_id" : "34", "value" : { "count" : 981943 } } //安徽
08{ "_id" : "42", "value" : { "count" : 974855 } } //湖北
09{ "_id" : "31", "value" : { "count" : 921046 } } //上海
10{ "_id" : "13", "value" : { "count" : 791432 } } //河北
11{ "_id" : "21", "value" : { "count" : 754645 } } //辽宁
12{ "_id" : "14", "value" : { "count" : 689738 } } //山西
13{ "_id" : "51", "value" : { "count" : 664918 } } //四川(包含重庆)
14{ "_id" : "36", "value" : { "count" : 594849 } } //江西
15{ "_id" : "23", "value" : { "count" : 581882 } } //黑龙江
16{ "_id" : "61", "value" : { "count" : 571792 } } //陕西
17{ "_id" : "35", "value" : { "count" : 571107 } } //福建
18{ "_id" : "43", "value" : { "count" : 562536 } } //湖南
19{ "_id" : "44", "value" : { "count" : 558249 } } //广东
20{ "_id" : "11", "value" : { "count" : 495897 } } //北京
21{ "_id" : "22", "value" : { "count" : 456159 } } //吉林
22{ "_id" : "15", "value" : { "count" : 392787 } } //内蒙
23{ "_id" : "12", "value" : { "count" : 320711 } } //天津
24{ "_id" : "62", "value" : { "count" : 227366 } } //甘肃
25{ "_id" : "45", "value" : { "count" : 192810 } } //广西
26{ "_id" : "52", "value" : { "count" : 187622 } } //贵州
27{ "_id" : "65", "value" : { "count" : 145040 } } //新疆
28{ "_id" : "53", "value" : { "count" : 141652 } } //云南
29{ "_id" : "63", "value" : { "count" : 75509 } } //青海
30{ "_id" : "64", "value" : { "count" : 75105 } } //宁夏
31{ "_id" : "46", "value" : { "count" : 48279 } } //海南
32{ "_id" : "54", "value" : { "count" : 17476 } } //西藏


对结果在此进行分析,根据地区处理。


01db.guestHouse.provinceOutput.group({
02        keyf:function(doc){return {"key" : doc._id.substr(0,1)}},//<SPAN>以省标识的第一位再次分组统计 </SPAN>                initial : {total : 0},
03        reduce :function(curr, result){
04                result.total += curr.value.count;
05            },
06        cond : {"_id" : {"$ne" : -1}},
07        finalize: function(result) {
08                               var areas= [ "华北", "东北", "华东",
09                                                "中南", "西南",
10                                                "西北" ];
11                               result.area = areas[result.key - 1];
12                   }
13    })
14  
15  
16[
17        {
18                "key" : "1",
19                "total" : 2690565,
20                "area" : "华北"
21        },
22        {
23                "key" : "2",
24                "total" : 1792686,
25                "area" : "东北"
26        },
27        {
28                "key" : "3",
29                "total" : 8331687,
30                "area" : "华东"
31        },
32        {
33                "key" : "4",
34                "total" : 3457184,
35                "area" : "中南"
36        },
37        {
38                "key" : "5",
39                "total" : 1011668,
40                "area" : "西南"
41        },
42        {
43                "key" : "6",
44                "total" : 1094812,
45                "area" : "西北"
46        }
47]

group 函数参考:

http://docs.mongodb.org/manual/reference/method/db.collection.group/

http://docs.mongodb.org/manual/reference/command/group/#dbcmd.group


疑问与总结:

a.前面说的MapReduce没有count占用资源是错误的,今天发现任务管理器不会实时更新了⊙﹏⊙b汗

b.group的'keyf'这个配置有时间很有用处(key、 keyf只能二选一)

c.在map时加上query : {"CtfTp" : "ID"}一定会提高速度吗? 

12013-10-29 22:57:00

        详见测试:http://my.oschina.net/tianyongke/blog/172794

d.部分日志

01           22:10:47.001 [conn62]         M/R: (1/3) Emit Progress: 19874400/20048891 99%
02           22:10:50.000 [conn62]        M/R: (1/3) Emit Progress: 19923500/20048891 99%
03           22:10:53.005 [conn62]        M/R: (1/3) Emit Progress: 19974700/20048891 99%
04           22:10:56.603 [conn62]        M/R: (1/3) Emit Progress: 20016000/20048891 99%
05           22:10:59.001 [conn62]        M/R: (1/3) Emit Progress: 20047200/20048891 99%
06           22:11:02.052 [conn62]        M/R: (3/3) Final Reduce Progress: 84500/112318  75%
07           22:11:02.393 [conn62] CMD: drop TYK.guestHouse.provinceOutput
08           22:11:02.531 [conn62] command admin.$cmd command: { renameCollection: "TYK.tmp.mr.guestHouse_9", to: "TYK.guestHouse.provinceOutput", stayTemp: false } ntoreturn:1 keyUpdates:0  reslen:37 136ms
09           22:11:02.561 [conn62] CMD: drop TYK.tmp.mr.guestHouse_9
10           22:11:02.587 [conn62] CMD: drop TYK.tmp.mr.guestHouse_9
11           22:11:02.587 [conn62] CMD: drop TYK.tmp.mr.guestHouse_9_inc
12           22:11:02.674 [conn62] CMD: drop TYK.tmp.mr.guestHouse_9
13           22:11:02.690 [conn62] CMD: drop TYK.tmp.mr.guestHouse_9_inc
14           22:11:02.894 [conn62] command TYK.$cmd command: { mapReduce: "guestHouse", map: function () {
15//var idCard_reg = /(^\d{15}$)|(^\d{18}$)|(^\d{17}(\d|X|..., reduce: function (key, emits) {
16total = 0;
17for (var i in emits) {
18total += emi..., out: "guestHouse.provinceOutput", verbose: true } ntoreturn:1 keyUpdates:0 numYields: 471197 locks(micros) W:233131 r:1922774271 w:20768395 reslen:232 1173522ms
19           22:56:54.820 [conn62] command TYK.$cmd command: { group: { ns: "TYK.guestHouse.provinceOutput", $keyf: function (doc){return {temp_key : doc._id.subStr(0,1)}}, initial: { total: 0.0 }, $reduce: function (doc, prev){
20prev.total += doc.value.count;
21} } } ntoreturn:1 keyUpdates:0 locks(micros) r:542599 reslen:75 542ms
从日志可以看到的是

    1)map

    2)reduce

    3)drop 指定定集合(以前反复执行时先手动做了这一步,现在看来不用了)

    4)变更临时集合为指定集合

    5)删除临时集合(反复删除,还有一个'_inc'的集合。不知道为什么)

    6)最后处理,执行finalize指定函数

表象的是先map后reduce,实际情况是怎么样的呢?是不是并行?并且reduce执行有1613854次之多,而以日志显示时间推算也就1秒左右。这个1613854是怎么来的,与什么有关?







本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
Python3使用PyMongo
python必掌握库:pymongo库的心你懂吗?
第78天: Python 操作 MongoDB 数据库介绍
如何使用Java在MongoDB中的嵌套数组中添加元素
Python操作MongoDB看这一篇就够了
Python操作MongoDB基本使用_python读取mongodb数据
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服