前段时间网上出来了一批千万级别的数据,本着学习的态度O(∩_∩)O下载下来处理了一下。尊重隐私,所有试验都是只针对所有数据进行统计分析,不针对某条数据的分析。
一、导入
清单1:
读取CSV文件,存储到数据库中
11 | from pymongo.connection import Connection |
16 | rootdir = "2000W/" # 指明被遍历的文件夹 |
21 | conn = Connection( 'localhost' , 27017 ) #获取一个连接 |
22 | ##conn.drop_database('guestHouse') |
28 | for parent, dirnames, filenames in os.walk(rootdir): #三个参数:分别返回1.父目录 2.所有文件夹名字(不含路径) 3.所有文件名字 |
29 | for filename in filenames: |
32 | fullname = os.path.join(parent,filename) |
34 | #with codecs.open(fullname, encoding='utf_8') as file: |
35 | with codecs. open (fullname, encoding = 'utf_8_sig' ) as file : #忽略UTF-8文件前面的BOM |
36 | keys = file .readline().split( ',' ) #先读掉第一行的注释 |
37 | key_length = len (keys) |
38 | spamreader = csv.reader( file ) #以CSV格式读取,返回的不再是str,而是list |
39 | for line in spamreader: |
40 | if key_length ! = len (line): #部分数据不完整,记录下来 |
41 | ErrorLine.append(line) |
44 | for i in range ( 1 , len (keys)): #过滤第一个字段Name,姓名将不再存到数据库中 |
45 | each_info[keys[i]] = line[i] |
47 | guest_info.append(each_info) |
48 | if len (guest_info) = = 10000 : #每10000条进行一次存储操作 |
49 | guest.insert(guest_info) |
53 | print filename + "\t" + str (e) |
56 | with open ( 'ERR/' + os.path.splitext(filename)[ 0 ] + '-ERR.csv' , 'w' ) as log_file: |
57 | spamwriter = csv.writer(log_file) |
58 | for line in ErrorLine: |
59 | spamwriter.writerow(line) |
61 | guest.insert(guest_info) |
63 | if __name__ = = '__main__' : |
67 | print ( str (stop - start) + "秒" ) |
后来睡着了、关机了,耗时多久也不得而知了⊙﹏⊙b汗
总结:
1.文件编码为UTF-8,不能直接open()打开读取。
2.文件已CSV格式进行存储,读取时用CSV模块处理来读取。这是读出来的数据每行为一个list。注意,不能简单的以","拆分后进行读取。对于这种形状"a,b,c", d的数据是无法正确解析的。
3.对于UTF-8文件,如果有BOM的形式去读是要以'utf_8_sig'编码读取,这样会跳过开头的BOM。如果不处理掉BOM,BOM会随数据一同存到数据库中,造成类似" XXX"的现象(有一个空格的假象)。
参考:http://docs.python.org/2/library/codecs.html#module-encodings.utf_8_sig
http://www.cnblogs.com/DDark/archive/2011/11/28/2266085.html
如果真的已经存到库中了,那只有改key了
1 | db.guestHouse.update({}, { "$rename" : { " Name" : "Name" }}, false, true) |
另外,网上还有一种方法(尝试失败了,具体原因应该是把字符串转换成字节码然后再去比较。怎么转这个我还不会...)
1 | #with codecs.open(fullname, encoding='utf-8') as file: |
2 | with codecs. open (fullname, encoding = 'utf_8_sig' ) as file : |
3 | keys = file .readline().split( ',' ) |
4 | if keys[ 0 ][: 3 ] = = codecs.BOM_UTF8: #将keys[0]转化为字节码再去比较 |
扩展:
今天发现MongoDB本身就带有导入功能mongoimport,可以直接导入CSV文件...
小试一把
1.不做错误数据过滤,直接导入。用专利引用数据做一下实验(《Hadoop权威指南》一书中的实验数据)
实验数据:
01 | "PATENT" , "GYEAR" , "GDATE" , "APPYEAR" , "COUNTRY" , "POSTATE" , "ASSIGNEE" , "ASSCODE" , "CLAIMS" , "NCLASS" , "CAT" , "SUBCAT" , "CMADE" , "CRECEIVE" , "RATIOCIT" , "GENERAL" , "ORIGINAL" , "FWDAPLAG" , "BCKGTLAG" , "SELFCTUB" , "SELFCTLB" , "SECDUPBD" , "SECDLWBD" |
02 | 3070801 , 1963 , 1096 ,, "BE" ,"",, 1 ,, 269 , 6 , 69 ,, 1 ,, 0 ,,,,,,, |
03 | 3070802 , 1963 , 1096 ,, "US" , "TX" ,, 1 ,, 2 , 6 , 63 ,, 0 ,,,,,,,,, |
04 | 3070803 , 1963 , 1096 ,, "US" , |
05 | "IL" ,, 1 ,, 2 , 6 , 63 ,, 9 ,, 0.3704 ,,,,,,, |
06 | 3070804 , 1963 , 1096 ,, "US" , "OH" ,, 1 ,, 2 , 6 , 63 ,, 3 ,, 0.6667 ,,,,,,, |
07 | 3070805 , 1963 , 1096 ,, "US" , "CA" ,, 1 ,, 2 , 6 , 63 ,, 1 ,, 0 ,,,,,,, |
08 | 3070806 , 1963 , 1096 ,, "US" , "PA" ,, 1 ,, 2 , 6 , 63 ,, 0 ,,,,,,,,, |
09 | 3070807 , 1963 , 1096 ,, "US" , "OH" ,, 1 ,, 623 , 3 , 39 ,, 3 ,, 0.4444 ,,,,,,, |
10 | 3070808 , 1963 , 1096 ,, "US" , "IA" ,, 1 ,, 623 , 3 , 39 ,, 4 ,, 0.375 ,,,,,,, |
11 | 3070809 , 1963 , 1096 ,,,, 1 ,, 4 , 6 , 65 ,, 0 ,,,,,,,,, |
1 | mongoimport - d TYK - c guest - - type csv - - file d:\text.csv - - headerline |
一共11行。第一行注释,9条数据。第3条中间截断,第9条取出中间两个数值"US","AZ"。按照csv规定现在应该是10条数据
结果:
01 | > db.guest.find({}, { "PATENT" : 1 , "_id" : 1 }) |
02 | { "_id" : ObjectId( "52692c2a0b082a1bbb727d86" ), "PATENT" : 3070801 } |
03 | { "_id" : ObjectId( "52692c2a0b082a1bbb727d87" ), "PATENT" : 3070802 } |
04 | { "_id" : ObjectId( "52692c2a0b082a1bbb727d88" ), "PATENT" : 3070803 } |
05 | { "_id" : ObjectId( "52692c2a0b082a1bbb727d89" ), "PATENT" : "IL" } |
06 | { "_id" : ObjectId( "52692c2a0b082a1bbb727d8a" ), "PATENT" : 3070804 } |
07 | { "_id" : ObjectId( "52692c2a0b082a1bbb727d8b" ), "PATENT" : 3070805 } |
08 | { "_id" : ObjectId( "52692c2a0b082a1bbb727d8c" ), "PATENT" : 3070806 } |
09 | { "_id" : ObjectId( "52692c2a0b082a1bbb727d8d" ), "PATENT" : 3070807 } |
10 | { "_id" : ObjectId( "52692c2a0b082a1bbb727d8e" ), "PATENT" : 3070808 } |
11 | { "_id" : ObjectId( "52692c2a0b082a1bbb727d8f" ), "PATENT" : 3070809 } |
刚好10条,可见此命令导入是不会过滤异常数据。
2.以UTF-8有BOM格式再试一次。实验数据同上
01 | > db.guest.find({}, { "PATENT" : 1 , "_id" : 1 }) |
02 | { "_id" : ObjectId( "52692d730b082a1bbb727d90" ), "PATENT" : 3070801 } |
03 | { "_id" : ObjectId( "52692d730b082a1bbb727d91" ), "PATENT" : 3070802 } |
04 | { "_id" : ObjectId( "52692d730b082a1bbb727d92" ), "PATENT" : 3070803 } |
05 | { "_id" : ObjectId( "52692d730b082a1bbb727d93" ), "PATENT" : "IL" } |
06 | { "_id" : ObjectId( "52692d730b082a1bbb727d94" ), "PATENT" : 3070804 } |
07 | { "_id" : ObjectId( "52692d730b082a1bbb727d95" ), "PATENT" : 3070805 } |
08 | { "_id" : ObjectId( "52692d730b082a1bbb727d96" ), "PATENT" : 3070806 } |
09 | { "_id" : ObjectId( "52692d730b082a1bbb727d97" ), "PATENT" : 3070807 } |
10 | { "_id" : ObjectId( "52692d730b082a1bbb727d98" ), "PATENT" : 3070808 } |
11 | { "_id" : ObjectId( "52692d730b082a1bbb727d99" ), "PATENT" : 3070809 } |
结果同上面一样,key"PATENT
"中并没有因BOM引起的空格
3.mongoimport命令解释
1 | mongoimport - d TYK - c guest - - type csv - - file d:\text.csv - - headerline |
6 | - - headerline 貌似指定这个后以第一行为key,另 - f 可以指定key “ - f Name, age” |
二、统计分析
1.根据性别统计
由于数据不规范,先查询一下有多少种方式来表示性别的
1 | db.runCommand({ "distinct" : "guestHouse" , "key" : "Gender" }) |
17 | "nscanned" : 20048891 , |
18 | "nscannedObjects" : 20048891 , |
20 | "cursor" : "BasicCursor" |
一共有11中方式表示性别的...那就以M、F做下统计吧
2 | db.guestHouse.count({ "Gender" : "M" }) |
2 | db.guestHouse.count({ "Gender" : "F" }) |
饼状图
总结:
1.带条件count时速度是非常慢的,猜测在count时可能先进行的查询操作,如果是查询加索引效果会好很多。对Gender加索引,效果明显提高了,但仍然是N秒级别的。显然在实时情况下还是不行的。另外随意加索引也会遇其它方面的问题。在用索引时能达到一个平衡点很重要的啊。
5 | return this.find( x ).count(); |
2、根据身份证分析性别
从上面数据看,大约有4%的数据性别不详。
15位身份证号码:第7、8位为出生年份(两位数),第9、10位为出生月份,第11、12位代表出生日期,第15位代表性别,奇数为男,偶数为女。 18位身份证号码:第7、8、9、10位为出生年份(四位数),第11、第12位为出生月份,第13、14位代表出生日期,第17位代表性别,奇数为男,偶数为女。
要根据身份证来分析的话,明显不好直接处理分析了。那么就尝试一下编写MapReduce算一下吧,但是单机MapReduce速度会更慢。
先了解一下数据,看看有多少证件类型
01 | > db.runCommand({ "distinct" : "guestHouse" , "key" : "CtfTp" }) |
31 | "nscanned" : 20048891 , |
32 | "nscannedObjects" : 20048891 , |
34 | "cursor" : "BasicCursor" |
数据依旧的乱,那就暂且以"ID"来统计一下吧
02 | if ( this .CtfTp == "ID" ) { |
03 | if ( this .CtfId.length == 18){ |
04 | emit(parseInt( this .CtfId.charAt(16)) % 2, {count : 1}) //1为男,0为女 |
05 | } else if ( this .CtfId.length == 15) { |
06 | emit(parseInt( this .CtfId.charAt(14)) % 2, {count : 1}) //无法解析时为NaN |
13 | >reduce = function (key, emits) { |
15 | for ( var i in emits) { |
16 | total += emits[i].count; |
18 | return { "count" : total}; |
23 | mapReduce: "guestHouse" , |
26 | out: "TYK.guestHouse.output" , |
31 | "result" : "guestHouse.output" , |
32 | "timeMillis" : 999097, |
36 | "reduceTime" : 111217, |
结果:
1 | > db.guestHouse.output.find() |
2 | { "_id" : NaN, "value" : { "count" : 1360 } } |
3 | { "_id" : - 1 , "value" : { "count" : 1161164 } } |
4 | { "_id" : 0 , "value" : { "count" : 6831007 } } |
5 | { "_id" : 1 , "value" : { "count" : 11934567 } } |
总结:
1.速度比直接count({"Gender" : "M"}),并且资源占用不明显。IO压力不大,CPU压力不大。
2.结果中数据加起来为“19928098”条,比总条数“20048891”少了“120793”条,少在哪了?
3、统计各省、地区的情况
清单1:
02 | //var idCard_reg = /(^\d{15}$)|(^\d{18}$)|(^\d{17}(\d|X|x)$)/; |
03 | //var idCard_reg = /(^[1-6]\d{14}$)|(^[1-6]\d{17}$)|(^[1-6]\d{16}(\d|X|x)$)/; |
04 | //((1[1-5])|(2[1-3])|(3[1-7])|(4[1-6])|(5[1-4])|(6[1-5])) |
05 | var idCard_reg = /(^((1[1-5])|(2[1-3])|(3[1-7])|(4[1-6])|(5[1-4])|(6[1-5]))\d{13}$)|(^((1[1-5])|(2[1-3])|(3[1-7])|(4[1-6])|(5[1-4])|(6[1-5]))\d{16}$)|(^((1[1-5])|(2[1-3])|(3[1-7])|(4[1-6])|(5[1-4])|(6[1-5]))\d{15}(\d|X|x)$)/; |
06 | if ( this .CtfTp == "ID" && idCard_reg.test( this .CtfId)) { |
07 | emit( this .CtfId.substr(0, 2), {count : 1}) //截取前两位 地区,省、直辖市、自治区 |
13 | reduce = function (key, emits) { |
15 | for ( var i in emits) { |
16 | total += emits[i].count; |
18 | return { "count" : total}; |
23 | mapReduce: "guestHouse" , |
26 | out: "guestHouse.provinceOutput" , |
32 | "result" : "guestHouse.provinceOutput" , |
33 | "timeMillis" : 1173216, |
37 | "reduceTime" : 157916, |
身份证号码参考:
http://baike.baidu.com/view/188003.htm#1_2 结果信息:
01 | > db.guestHouse.provinceOutput.find().sort({ "value.count" : -1}) |
02 | { "_id" : "32" , "value" : { "count" : 2398111 } } //江苏 |
03 | { "_id" : -1, "value" : { "count" : 1670289 } } //不详 |
04 | { "_id" : "37" , "value" : { "count" : 1523357 } } //山东 |
05 | { "_id" : "33" , "value" : { "count" : 1341274 } } //浙江 |
06 | { "_id" : "41" , "value" : { "count" : 1120455 } } //河南 |
07 | { "_id" : "34" , "value" : { "count" : 981943 } } //安徽 |
08 | { "_id" : "42" , "value" : { "count" : 974855 } } //湖北 |
09 | { "_id" : "31" , "value" : { "count" : 921046 } } //上海 |
10 | { "_id" : "13" , "value" : { "count" : 791432 } } //河北 |
11 | { "_id" : "21" , "value" : { "count" : 754645 } } //辽宁 |
12 | { "_id" : "14" , "value" : { "count" : 689738 } } //山西 |
13 | { "_id" : "51" , "value" : { "count" : 664918 } } //四川(包含重庆) |
14 | { "_id" : "36" , "value" : { "count" : 594849 } } //江西 |
15 | { "_id" : "23" , "value" : { "count" : 581882 } } //黑龙江 |
16 | { "_id" : "61" , "value" : { "count" : 571792 } } //陕西 |
17 | { "_id" : "35" , "value" : { "count" : 571107 } } //福建 |
18 | { "_id" : "43" , "value" : { "count" : 562536 } } //湖南 |
19 | { "_id" : "44" , "value" : { "count" : 558249 } } //广东 |
20 | { "_id" : "11" , "value" : { "count" : 495897 } } //北京 |
21 | { "_id" : "22" , "value" : { "count" : 456159 } } //吉林 |
22 | { "_id" : "15" , "value" : { "count" : 392787 } } //内蒙 |
23 | { "_id" : "12" , "value" : { "count" : 320711 } } //天津 |
24 | { "_id" : "62" , "value" : { "count" : 227366 } } //甘肃 |
25 | { "_id" : "45" , "value" : { "count" : 192810 } } //广西 |
26 | { "_id" : "52" , "value" : { "count" : 187622 } } //贵州 |
27 | { "_id" : "65" , "value" : { "count" : 145040 } } //新疆 |
28 | { "_id" : "53" , "value" : { "count" : 141652 } } //云南 |
29 | { "_id" : "63" , "value" : { "count" : 75509 } } //青海 |
30 | { "_id" : "64" , "value" : { "count" : 75105 } } //宁夏 |
31 | { "_id" : "46" , "value" : { "count" : 48279 } } //海南 |
32 | { "_id" : "54" , "value" : { "count" : 17476 } } //西藏 |
对结果在此进行分析,根据地区处理。
01 | db.guestHouse.provinceOutput.group({ |
02 | keyf: function (doc){ return { "key" : doc._id.substr(0,1)}}, //<SPAN>以省标识的第一位再次分组统计 </SPAN> initial : {total : 0}, |
03 | reduce : function (curr, result){ |
04 | result.total += curr.value.count; |
06 | cond : { "_id" : { "$ne" : -1}}, |
07 | finalize: function (result) { |
08 | var areas= [ "华北" , "东北" , "华东" , |
11 | result.area = areas[result.key - 1]; |
group 函数参考:
http://docs.mongodb.org/manual/reference/method/db.collection.group/
http://docs.mongodb.org/manual/reference/command/group/#dbcmd.group
疑问与总结:
a.前面说的MapReduce没有count占用资源是错误的,今天发现任务管理器不会实时更新了⊙﹏⊙b汗
b.group的'keyf'这个配置有时间很有用处(key、 keyf只能二选一)
c.在map时加上query : {"CtfTp" : "ID"}一定会提高速度吗?
详见测试:http://my.oschina.net/tianyongke/blog/172794
d.部分日志
01 | 22 : 10 : 47.001 [conn62] M / R: ( 1 / 3 ) Emit Progress: 19874400 / 20048891 99 % |
02 | 22 : 10 : 50.000 [conn62] M / R: ( 1 / 3 ) Emit Progress: 19923500 / 20048891 99 % |
03 | 22 : 10 : 53.005 [conn62] M / R: ( 1 / 3 ) Emit Progress: 19974700 / 20048891 99 % |
04 | 22 : 10 : 56.603 [conn62] M / R: ( 1 / 3 ) Emit Progress: 20016000 / 20048891 99 % |
05 | 22 : 10 : 59.001 [conn62] M / R: ( 1 / 3 ) Emit Progress: 20047200 / 20048891 99 % |
06 | 22 : 11 : 02.052 [conn62] M / R: ( 3 / 3 ) Final Reduce Progress: 84500 / 112318 75 % |
07 | 22 : 11 : 02.393 [conn62] CMD: drop TYK.guestHouse.provinceOutput |
08 | 22 : 11 : 02.531 [conn62] command admin.$cmd command: { renameCollection: "TYK.tmp.mr.guestHouse_9" , to: "TYK.guestHouse.provinceOutput" , stayTemp: false } ntoreturn: 1 keyUpdates: 0 reslen: 37 136ms |
09 | 22 : 11 : 02.561 [conn62] CMD: drop TYK.tmp.mr.guestHouse_9 |
10 | 22 : 11 : 02.587 [conn62] CMD: drop TYK.tmp.mr.guestHouse_9 |
11 | 22 : 11 : 02.587 [conn62] CMD: drop TYK.tmp.mr.guestHouse_9_inc |
12 | 22 : 11 : 02.674 [conn62] CMD: drop TYK.tmp.mr.guestHouse_9 |
13 | 22 : 11 : 02.690 [conn62] CMD: drop TYK.tmp.mr.guestHouse_9_inc |
14 | 22 : 11 : 02.894 [conn62] command TYK.$cmd command: { mapReduce: "guestHouse" , map : function () { |
15 | / / var idCard_reg = / (^\d{ 15 }$)|(^\d{ 18 }$)|(^\d{ 17 }(\d|X|..., reduce : function (key, emits) { |
18 | total + = emi..., out: "guestHouse.provinceOutput" , verbose: true } ntoreturn: 1 keyUpdates: 0 numYields: 471197 locks(micros) W: 233131 r: 1922774271 w: 20768395 reslen: 232 1173522ms |
19 | 22 : 56 : 54.820 [conn62] command TYK.$cmd command: { group: { ns: "TYK.guestHouse.provinceOutput" , $keyf: function (doc){ return {temp_key : doc._id.subStr( 0 , 1 )}}, initial: { total: 0.0 }, $ reduce : function (doc, prev){ |
20 | prev.total + = doc.value.count; |
21 | } } } ntoreturn: 1 keyUpdates: 0 locks(micros) r: 542599 reslen: 75 542ms |
从日志可以看到的是
1)map
2)reduce
3)drop 指定定集合(以前反复执行时先手动做了这一步,现在看来不用了)
4)变更临时集合为指定集合
5)删除临时集合(反复删除,还有一个'_inc'的集合。不知道为什么)
6)最后处理,执行finalize指定函数
表象的是先map后reduce,实际情况是怎么样的呢?是不是并行?并且reduce执行有1613854次之多,而以日志显示时间推算也就1秒左右。这个1613854是怎么来的,与什么有关?
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请
点击举报。