快递&外卖
配送平台,我是负责后台开发部分,这里给朋友们介绍下相关开发经验,开发框架和方式有很多,这里给大家介绍一些快捷高效的方法,大家少走弯路!大力推荐
,具体学习可以查看这里,内部代码就不当做demo展示了,哈哈!Flask
框架,Flask是Python编写的轻量级Web应用框架,用过的人都知道简单快捷, 马上花一分钟上手下:sudo pip install flask
from flask import Flaskapp = Flask(__name__)@app.route("/")def hi(): return "hi!"if __name__ == "__main__": app.run()
hi!
,ok,这就可以了, 整个框架我们就搭建起来了,是不是很简单,后面我们来拓展一下!http://127.0.0.1:5000/
教程
可以参考这里http://1.2.3.4:5000/
访问网站了,还缺少一些什么?当然,微信小程序是不允许接口暴露ip地址的,你必须要用自己的域名和自定义的接口名称,比如http://www.happypower.com/这样才行,当我访问这个网站时候,会先到阿里云服务器上做一下映射,确定我们的ip1.2.3.4,然后找到该Ip对应的服务器,之后怎么办? 我需要访问的是http://1.2.3.4:5000/
这个啊,Nginx帮我们做了这个事,下面简单介绍下:/etc/nginx
,每人安装目录可能不同,找到nginx.conf
文件,vim进行编辑,vim的操作如果不熟可以查看这里,不行可以先本地测好再到线上用,下面给出我配置的信息,大家可以仿造,应该是比较精简的:# For more information on configuration, see:# * Official English Documentation: http://nginx.org/en/docs/# * Official Russian Documentation: http://nginx.org/ru/docs/user nginx;worker_processes auto;error_log /var/log/nginx/error.log;pid /run/nginx.pid;# Load dynamic modules. See /usr/share/nginx/README.dynamic.include /usr/share/nginx/modules/*.conf;events { worker_connections 1024;}http { log_format main ''$remote_addr - $remote_user [$time_local] "$request" '' ''$status $body_bytes_sent "$http_referer" '' ''"$http_user_agent" "$http_x_forwarded_for"''; access_log /var/log/nginx/access.log main; sendfile on; tcp_nopush on; tcp_nodelay on; keepalive_timeout 65; types_hash_max_size 2048; include /etc/nginx/mime.types; default_type application/octet-stream; # Load modular configuration files from the /etc/nginx/conf.d directory. # See http://nginx.org/en/docs/ngx_core_module.html#include # for more information. #include /etc/nginx/conf.d/*.conf; server { listen 80 default_server; listen [::]:80 default_server; server_name _; root /usr/share/nginx/html; # Load configuration files for the default server block. include /etc/nginx/default.d/*.conf; location / { return 404; } error_page 404 /404.html; location = /40x.html { } error_page 500 502 503 504 /50x.html; location = /50x.html { } }# Settings for a TLS enabled server.## server {# listen 443 ssl http2 default_server;# listen [::]:443 ssl http2 default_server;# server_name _;# root /usr/share/nginx/html;## ssl_certificate "/etc/pki/nginx/server.crt";# ssl_certificate_key "/etc/pki/nginx/private/server.key";# ssl_session_cache shared:SSL:1m;# ssl_session_timeout 10m;# ssl_ciphers HIGH:!aNULL:!MD5;# ssl_prefer_server_ciphers on;## # Load configuration files for the default server block.# include /etc/nginx/default.d/*.conf;## location / {# }## error_page 404 /404.html;# location = /40x.html {# }## error_page 500 502 503 504 /50x.html;# location = /50x.html {# }# }# 自己主要要写的就下面这个serverserver { # 443是https监听的端口,一般默认就好 listen 443; # 自己接口的主域名,这就是当我们访问http://www.happypower.com时就会映射到这里 server_name www.happypower.com; ssl on; # 这是默认主页,接口没有主页就注释掉了 # root html; # index index.html index.htm; # 下面是阿里云下载的ssl证书,后面会说到ssl的配置,看不懂没关系 ssl_certificate ./xxx.pem; ssl_certificate_key ./xxx.key; ssl_session_timeout 5m; ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE:ECDH:AES:HIGH:!NULL:!aNULL:!MD5:!ADH:!RC4; ssl_protocols TLSv1 TLSv1.1 TLSv1.2; ssl_prefer_server_ciphers on; location / { # 这里的意思是http://www.happypower.com会和http://0.0.0.0:8888做一个映射,这就解决了上面最开始说的那个问题了,这一步很关键! proxy_pass http://0.0.0.0:8888; } # 日志文件路径,有人访问你的网站时都会留下印记,有的话最好,不配置其实也行。 access_log /root/xxx/pjt/log/nginx.log;}}
这两个很简单,其实就是几条命令的事,主要说下作用:
1 gunicorn 可以让你的后台服务多进程方式开启,经过测试可以提升一定的qps(每秒的请求数),简单来说一定程度上防止你的服务器崩掉.
2 supervisor 的作用就是对你的进程进行监控,该框架提供了一个可视化界面,可以通过这个界面去开启,暂停和关闭你的服务进程,即使不动代码的人也能控制后台服务。
3 具体配置没啥好讲的,学会几个命令就行具体推荐看这里
由于小程序需要的接口都是需要https的连接,所以咱们还需要ssl证书
才行,这里我使用的是阿里云
服务器,具体配置可以先参考这里,可能有点难懂,下面简单说下我的配置:
单域名免费型 DV SSL
,其实一般的应用来说,单域名足够了。nginx.conf
文件进行查看!腾讯云
和阿里云
推荐,其他的真的难配,个人遇到了很多坑!有些小程序涉及到微信支付的,就比如校园懒人邦
,这个其实很头疼,做过就知道,难到不是很难,过程很繁琐,官方文档也很多坑,下面简单讲解下:
9
个月的申请时间才行,这个比较坑,当然,退款是不需要等的.python
写了个微信预支付的较为通用的类,大家改下参数
拿去用就行,官方文档一个个试出来的,简直坑:class WeiXinPay(object): """微信支付,返回回客户端需要参数 """ def __init__(self, uu_id, open_id, spbill_create_ip, total_fee, out_trade_no): """ :param total_fee: 订单金额 :param spbill_create_ip: 客户端请求IP地址 """ self.params = { ''appid'': ''小程序的appid'', ''attach'': u''你的应用名称,我这里是(校园懒人邦)'', ''body'': u''校园懒人邦-代取费'', ''mch_id'': ''商户id,你企业的id'', ''nonce_str'': ''给个随机数,一般md5一下就好,时间戳啊或者别的什么'', ''notify_url'': ''http://www.happypower.com/result(这个就是通知地址,会异步返回信息给你),你写一个接口接收就行'', ''openid'': open_id, ''out_trade_no'': out_trade_no, ''spbill_create_ip'': spbill_create_ip, ''total_fee'': str(total_fee), ''trade_type'': ''JSAPI'' } # 官方给的接口 self.url = ''https://api.mch.weixin.qq.com/pay/unifiedorder'' self.error = None def key_value_url(self, value): """将将键值对转为 key1=value1&key2=value2 """ key_az = sorted(value.keys()) pair_array = [] for k in key_az: v = value.get(k, '''').strip() v = v.encode(''utf8'') k = k.encode(''utf8'') pair_array.append(''%s=%s'' % (k, v)) tmp = ''&''.join(pair_array) return tmp def get_sign(self, params): """生成sign """ stringA = self.key_value_url(params) stringSignTemp = stringA + ''&key='' + ''xxx'' # APIKEY, API密钥,需要在商户后台设置 sign = (md5(stringSignTemp).hexdigest()).upper() params[''sign''] = sign def get_req_xml(self): """拼接XML """ self.get_sign(self.params) xml = "<xml>" for k, v in self.params.items(): v = v.encode(''utf8'') k = k.encode(''utf8'') xml += ''<'' + k + ''>'' + v + ''</'' + k + ''>'' xml += "</xml>" print xml return xml def get_prepay_id(self): """ 请求获取prepay_id """ xml = self.get_req_xml() headers = {''Content-Type'': ''application/xml''} r = requests.post(self.url, data=xml, headers=headers) re_xml = ElementTree.fromstring(r.text.encode(''utf8'')) xml_status = re_xml.getiterator(''result_code'')[0].text if xml_status != ''SUCCESS'': self.error = u"连接微信出错啦!" logging.error(u"连接微信出错啦!") return prepay_id = re_xml.getiterator(''prepay_id'')[0].text self.params[''package''] = ''prepay_id=%s'' % prepay_id self.params[''timestamp''] = str(int(time.time())) def re_finall(self): self.get_prepay_id() if self.error: return sign_again_params = { ''appId'': self.params[''appid''], ''timeStamp'': self.params[''timestamp''], ''nonceStr'': self.params[''nonce_str''], ''package'': self.params[''package''], ''signType'': ''MD5'' } self.get_sign(sign_again_params) sign_again_params[''paySign''] = sign_again_params[''sign''] sign_again_params[''total_fee''] = self.params[''total_fee''] sign_again_params[''notify_url''] = self.params[''notify_url''] sign_again_params.pop(''appId'') sign_again_params.pop(''sign'') return json.dumps(sign_again_params)
5 这里最后返回的参数传给前台就能支付了,前台会拿到你的prepay_id,然后就能按照指定金额支付了。
6 退款的话有些不同,首先也是具体先查看文档,下面我也写了一个较为通用的类,大家觉得文档麻烦直接用也行:
class WeiXinReturn(object): def __init__(self, out_trade_no, total_fee, refund_fee): self.params_mach = { # 申请商户号的appid或商户号绑定的appid ''appid'': ''xxx'', ''mch_id'': ''xxx'', ''nonce_str'': ''随机数'', ''out_trade_no'': out_trade_no, ''out_refund_no'': ''微信订单号'', ''total_fee'': str(total_fee), ''refund_fee'': str(refund_fee), ''notify_url'': ''http://www.happypower.com/pay_return/result(和支付一样的意思,就是退款的通知地址,自己开发一个接口就好)'' } def pay_return(self): stringA = self.key_value_url(self.params_mach) stringSignTemp = stringA + ''&key='' + ''xxx'' # APIKEY, API密钥,需要在商户后台设置 sign = (md5(stringSignTemp).hexdigest()).upper() self.params_mach[''sign''] = sign xml = "<xml>" for k, v in self.params_mach.items(): v = v.encode(''utf8'') k = k.encode(''utf8'') xml += ''<'' + k + ''>'' + v + ''</'' + k + ''>'' xml += "</xml>" headers = {''Content-Type'': ''application/xml;charset=UTF-8''} # url = ''https://api.mch.weixin.qq.com/mmpaymkttransfers/promotion/transfers'' url = ''https://api.mch.weixin.qq.com/secapi/pay/refund'' # 请求中需要带有支付的证书,没有证书是无法申请的 r = requests.post(url, data=xml, headers=headers, cert=(''xxx.pem'', ''xxx.pem'')) result = r.text re_xml = ElementTree.fromstring(r.text.encode(''utf8'')) xml_status = re_xml.getiterator(''result_code'')[0].text if xml_status == ''SUCCESS'': return True return False def key_value_url(self, value): """将将键值对转为 key1=value1&key2=value2 """ key_az = sorted(value.keys()) pair_array = [] for k in key_az: v = value.get(k, '''').strip() v = v.encode(''utf8'') k = k.encode(''utf8'') pair_array.append(''%s=%s'' % (k, v)) tmp = ''&''.join(pair_array) return tmp
ps.这里需要商户证书,具体怎么弄看这里
跳转页面
且能够携带参数
,校园懒人邦
中的分享功能就是利用接口A进行开发,主要接口A上线才好测,而接口B开发阶段好测。 def upload_img(self, local_path, upload_name, bucket_name, ttl=7200): """ 上传图片到七牛云 :param local_path: 本地文件路径 :param upload_name: 上传文件名 :param bucket_name: 七牛申请的存储空间名称 :param ttl: 过期时间 :return: 返回图片地址 """ from qiniu import Auth, put_file import re q = Auth(access_key=conf_test.AccessKey, secret_key=conf_test.SecretKey) token = q.upload_token(bucket_name, upload_name, ttl) ret, info = put_file(token, upload_name, local_path) pat_status = ''status_code:(.*?),'' status_code = re.compile(pat_status, re.S).findall(str(info)) if len(status_code) > 0: # 成功返回图片外链名称,失败返回原因以及状态码 if int(status_code[0]) == 200: return {"msg":conf_test.qiniu_domain + upload_name, "status":1} return {"msg":"failed upload img failed", "status":-1}
https
的图片外链地址,而刚开始上传七牛云生成的是http的链接图片,因此这里需要到七牛云上面配置下,具体的可以查看这里三条
消息,当然还是有别的办法可以给用户多发一些消息,具体往下看.form_id
咱们就能对用户无限制的发送消息,前提是用户没有屏蔽你的这个微信小程序,可以参考这里,当然这里是用java
写的,我按照这个封装了一个python版本的,大家可以参考着用: # 刷新用户form_id 存入redis def fresh_formid(self, **kwargs): """ 接收前台传来的form_id存入redis :param kwargs: :return: """ uu_id = kwargs.get("uu_id", "") if not uu_id: return "failed: uu_id cannot be null" open_id = kwargs.get("open_id", "") if not open_id: return "failed: open_id cannot be null" form_id = kwargs.get("form_id", "") if not form_id: return "failed: form_id cannot be null" if "invalid code" in open_id: return "failed: get openid err" msg = {open_id: form_id} self.redis_cli.sadd(uu_id, json.dumps(msg)) return "success" # 发消息模板 def send_template_msg(self, **kwargs): uu_id = kwargs.get("uu_id", "") if not uu_id: return "failed: uu_id cannot be null" data = kwargs.get("data", "") if not data: return "failed: data can not be null" # 这个token可以从文档中去查看怎么得到,内部源码不能提供 token = self.get_refresh_token() if not token: return "failed: cannot find a token" send_msg = {} # 从redis中取出用户对应的form_id redis_msg = self.redis_cli.spop(uu_id) if not redis_msg: return "failed: cannot find a open_id in redis" msg = json.loads(redis_msg.encode("utf8")) touser = msg.keys()[0] form_id = msg[touser] template_id = kwargs.get("template_id", "") if not template_id: return "failed: template_id cannot be null" # page是用户点开消息后跳转到的页面 page = kwargs.get("page", "") # 下面这些参数的含义在文档中都能查到 emphasis_keyword = kwargs.get("emphasis_keyword", "") send_msg["touser"] = touser send_msg["template_id"] = template_id send_msg["page"] = page send_msg["form_id"] = form_id send_msg["data"] = data send_msg["emphasis_keyword"] = emphasis_keyword # 调起发送消息接口 api = "https://api.weixin.qq.com/cgi-bin/message/wxopen/template/send?access_token=%s" % token # post方式发起网络请求 return self.get_html(url=api, data=send_msg)
qps
的测试,因为我们是给用户用的,我们必须估计一下我们的服务器能承载多少用户同时在线
,也就是平均来说能承受多少用户每秒的请求数量, 我们可以对最常用的几个接口做一个极限测试,然后计算出大致的qps, 最简单的做法就是多线程疯狂的call计算平均时间,这里给出一段测试代码:class PressTest(object): def __init__(self): pass def get_html(self, url, headers=conf_test.HEADERS, data=None): if data: data = json.dumps(data) req = urllib2.Request(url=url, headers=headers, data=data) response = urllib2.urlopen(req) html = response.read() return html def get_test(self, api_url): result = self.get_html(url=api_url) if not result: print "url %s result is none" %api_url return return def post_test(self, api_url, **kwargs): import time start_time = time.time() result = self.get_html(url=api_url, data=kwargs) print result if not result: print "url %s result is none" % api_url end_time = time.time() all = end_time - start_time print allif __name__ == ''__main__'': press_test = PressTest() print "requesting........." api_url = ''https://xxx接口1'' api_url2 = ''https://xxx接口2'' start = time.time() times = 400 for i in range(times): t1 = threading.Thread(target=press_test.get_test, args=(api_url,)) t2 = threading.Thread(target=press_test.get_test, args=(api_url2,)) t1.start() t2.start() t1.join() t2.join() end = time.time() ave = (end - start) / 800.0 print "count:%d, start_time :%s, now_time: %s, average_qps: %s" % (800, str(start), str(end), str(ave))
400
个线程并发call
这两个常用接口,最后计算出qps
(每秒的请求数),如果能达到100次/秒
,基本上几千人
同时在线没啥问题,哈哈联系客服