今天学习过程中写了一个flask搜索引擎包括爬虫和搜索页面。功能有全文搜索,分页,爬虫等等。
只需安装flask和jieba即可:
pip install flask jieba
搜索引擎后端:
from flask import Flask, render_template, request, session, jsonify
import sqlite3
import jieba
import math
import string
import re
app = Flask(__name__)
DATABASE = 'data.db'
def create_database():
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
c.execute('''CREATE VIRTUAL TABLE IF NOT EXISTS contents USING fts5(
title, url , favicon , description , content , keywords , date,img )''')
conn.commit()
conn.close()
def tokenize(title):
words = [word for word in jieba.cut(title) if word not in string.punctuation] # 分词并去掉标点符号
keywords = [word for word in words if len(word) > 1] # 去掉单个字
keywords = list(set(keywords)) # 去重
keywords.sort(key=words.index) # 按在title中出现的顺序排序
keyword_str = ' '.join(keywords) # 将关键词列表转换为以空格分隔的字符串
keyword_str = ''.join(filter(lambda x: x not in string.punctuation, keyword_str)) # 去掉字符串中的标点符号
return keyword_str
def search_contents(query, offset, per_page):
conn = sqlite3.connect(DATABASE)
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute('SELECT COUNT(*) FROM contents WHERE keywords MATCH :query',
{'query': query})
total_results = c.fetchone()[0] # 获取搜索结果总数
total_pages = calculate_total_pages(total_results, per_page) # 计算总页数
if offset >= total_results:
offset = (total_pages - 1) * per_page
c.execute('SELECT title, url, favicon, description, keywords, date FROM contents WHERE keywords MATCH :query LIMIT :per_page OFFSET :offset',
{'query': query, 'per_page': per_page, 'offset': offset})
rows = c.fetchall()
conn.close()
return {'results': [dict(row) for row in rows],
'total_results': total_results,
'total_pages': total_pages}
def calculate_total_pages(total_results, per_page):
return math.ceil(total_results / per_page)
@app.before_request
def session_online():
session_id = request.cookies.get('session_id')
online = session.get('Online', 0)
if session_id is not None:
online += 1
session['Online'] = online
@app.route('/get_suggestions')
def get_suggestions():
query = request.args.get('q')
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
# 在contents表中查询包含输入关键词的title列,最多返回5个结果
c.execute('SELECT title FROM contents WHERE title LIKE ? LIMIT 5', ('%' + query + '%',))
suggestions = [row[0] for row in c.fetchall()]
conn.close()
return jsonify(suggestions=suggestions)
@app.route('/', methods=['GET'])
def index():
# 处理搜索请求
query = request.args.get('q', '') # 获取查询关键词,默认为空字符串
page = request.args.get('page', '1') # 获取当前页数,默认为第1页
per_page = 10 # 每页显示的结果数量
offset = (int(page) - 1) * per_page # 计算偏移量
online = session.get('Online', 0)
if query:
# 搜索网页内容
content_result = search_contents(tokenize(query), offset, per_page)
return render_template('index.html',
query=query,
content_result=content_result['results'],
total_results=content_result['total_results'], # 显示搜索结果总数
total_pages=content_result['total_pages'],
current_page=int(page),
online=online)
else:
return render_template('index.html',
online=online)
if __name__ == '__main__':
create_database()
app.secret_key = 'pyxueba'
app.run(debug=True)
搜索引擎前端:
<!DOCTYPE html>
<html>
<head>
<meta charset='UTF-8'>
<title>Python学霸搜索引擎</title>
<link rel='icon' type='image/svg+xml' href='favicon.svg'>
<script src='https://ajax.googleapis.com/ajax/libs/jquery/3.6.0/jquery.min.js'></script>
<style>
body {
font-family: Arial, sans-serif;
margin: 50px;
}
h1 {
font-size: 24px;
margin-bottom: 20px;
text-align: center;
}
.search-box {
margin-bottom: 20px;
text-align: center;
}
.search-box input[type='text'] {
padding: 6px 2px;
font-size: 16px;
border-radius: 4px;
border: 1px solid #999;
width: 40%;
max-width: 100%;
}
.search-box button[type='submit'] {
padding: 6px 12px;
font-size: 16px;
border-radius: 4px;
background-color: #006621;
color: #fff;
border: none;
cursor: pointer;
}
.search-box button[type='submit']:hover {
background-color: #00511a;
}
.result-item {
margin-bottom: 20px;
border: 1px solid #ddd;
border-radius: 4px;
padding: 10px;
}
a {
text-decoration: none;
}
.result-title {
font-size: 20px;
font-weight: bold;
text-align: left; /* 修改此行 */
}
.result-title a {
color: #008000;
}
.result-url {
color: #000000;
font-size: 14px;
margin-bottom: 5px;
}
.result-time {
font-size: 14px;
color: #999;
}
.result-description {
margin-top: 10px;
}
.pagination {
margin-top: 20px;
text-align: center;
}
.pagination-link {
display: inline-block;
padding: 6px 12px;
margin-right: 5px;
color: #333;
border-radius: 4px;
background-color: #f5f5f5;
text-decoration: none;
}
.pagination-link:hover {
background-color: #ddd;
}
.highlight {
background-color: #FFD700;
}
.footer {
margin-top: 50px;
text-align: center;
color: #999;
font-size: 12px;
}
.visitor-count {
margin-top: 10px;
}
.visitor-count span {
margin-left: 5px;
}
.favicon {
width: 16px;
height: 16px;
margin-right:3px;
}
</style>
</head>
<body>
<h1>python学霸全文搜索</h1>
<div class='search-box'>
<form action='/' method='get'>
<input type='text' name='q' id='search-input' list='suggestion-list' placeholder='你负责搜,我负责找···'>
<datalist id='suggestion-list--------' class='suggestion-list------'></datalist>
<button type='submit'>搜索</button>
</form>
</div>
{% if content_result %}
<p>共找到 {{ total_results }} 条结果。</p>
{% for result in content_result %}
<div class='search-summary'>
</div>
<div class='result-item'>
<h2 class='result-title'><img src='{{ result.favicon }}' alt='Favicon' class='favicon'
style='border: 1px solid #ccc; border-radius: 5px;' /><a class='result-link' href='{{ result.url }}'
target='_blank'>{{ result.title }}</a></h2>
<p class='result-url'><span class='time'>{{ result.date }}</span> {{ result.description }}</p>
</div>
{% endfor %}
<div class='pagination'>
{% if total_pages > 1 %}
{% for page in range(1, total_pages + 1) %}
{% if page == current_page %}
<a class='pagination-link highlight' href='/?q={{ query }}&page={{ page }}'>{{ page }}</a>
{% else %}
<a class='pagination-link' href='/?q={{ query }}&page={{ page }}'>{{ page }}</a>
{% endif %}
{% endfor %}
{% endif %}
</div>
{% endif %}
<div class='footer'>
@2023 Python学霸.
<div class='visitor-count'>
<p>总访问: {{ online }}</p>
</div>
</div>
<script>
// JavaScript 可选,用于给搜索关键词添加高亮样式
window.onload = function () {
var query = '{{ query }}';
var titles = document.getElementsByClassName('result-title');
for (var i = 0; i < titles.length; i++) {
var title = titles[i];
var highlighted = title.innerHTML.replace(new RegExp(query, 'gi'), '<span class='highlight'>$&</span>');
title.innerHTML = highlighted;
}
};
</script>
<script type='text/javascript'>
$(document).ready(function () {
$('#search-input').on('input', function () {
var query = $(this).val();
if (query.trim().length > 0) { // 确保输入不是空白字符
$.ajax({
url: '/get_suggestions',
data: { q: query },
success: function (response) {
var suggestions = response.suggestions;
var suggestionList = $('#suggestion-list');
suggestionList.empty(); // 清空之前的建议列表
for (var i = 0; i < suggestions.length; i++) {
var suggestionItem = $('<li>').text(suggestions[i]);
suggestionList.append(suggestionItem);
}
suggestionList.show(); // 显示建议列表
}
});
} else {
$('#suggestion-list').empty().hide(); // 输入为空时隐藏建议列表
}
});
// 当用户点击建议项时将其填充到搜索框中
$('#suggestion-list').on('click', 'li', function () {
var selectedSuggestion = $(this).text();
$('#search-input').val(selectedSuggestion);
$('#suggestion-list').empty().hide(); // 填充后隐藏建议列表
});
});
</script>
</body>
</html>
爬虫:
import requests
from bs4 import BeautifulSoup
import sqlite3
import jieba
import threading
import time
import random
import string
import re
from datetime import date
import base64
class Crawler:
def get_image_data_uri(self,image_url):
# 发起GET请求获取图像数据
response = requests.get(image_url)
image_data = response.content
# 将图像数据转换为base64格式
base64_data = base64.b64encode(image_data).decode('utf-8')
# 构建包含base64图像数据的data URI
data_uri = f'data:image/x-icon;base64,{base64_data}'
# 返回data URI
return data_uri
def __init__(self, max_depth=3, num_workers=10):
self.max_depth = max_depth
self.num_workers = num_workers
self.conn = sqlite3.connect('data.db', check_same_thread=False)
self.lock = threading.Lock()
self.url_queue = []
self.crawled_urls = set()
self.create_tables()
self.add_urls(['https://www.hao123.com/'])
self.run()
def create_tables(self):
c = self.conn.cursor()
c.execute('''CREATE VIRTUAL TABLE IF NOT EXISTS contents USING fts5 (
title ,
url ,
favicon ,
description ,
keywords ,
date ,
img )''')
self.conn.commit()
def add_urls(self, urls):
with self.lock:
self.url_queue.extend(urls)
def crawl_and_save(self, url, depth=0):
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
if '.ico' not in url and '.jpg' not in url and '.png' not in url and 'javascript:;' not in url and '#' not in url and 'javascript:void(0)' not in url and 'javascript' not in url and url != '':
response = requests.get(url, headers=headers, timeout=2.5)
response.raise_for_status()
else:
print(f'无效:{url} ')
return
except (requests.exceptions.RequestException, requests.exceptions.HTTPError) as e:
print(f'无法获取链接 {url}:{e}')
return
content_type = response.headers.get('content-type')
if not content_type or not content_type.startswith('text/html'):
return
raw_html = response.content
html_text = response.text
soup = BeautifulSoup(raw_html, 'html.parser')
title_tag = soup.title
title=''
if title_tag is None:
print(f'链接 {url} 未找到标题,跳过...')
return
if title_tag is not None:
title = title_tag.string.strip()
if not title:
print(f'链接 {url} 标题为空,跳过...')
return
title2 = ' '.join(jieba.cut(title))
title2 = ''.join([char for char in title if char not in string.punctuation]) # 去掉标点符号
with self.lock:
if url in self.crawled_urls:
return
date_regex = re.compile(r'\d{4}-\d{2}-\d{2}') # 假设日期格式为YYYY-MM-DD
date_match = date_regex.search(html_text)
if date_match:
shijian = date_match.group()
else:
# 使用meta标签提取日期信息
date_tag = soup.select_one('meta[name='date'], meta[name='pubdate']')
shijian = date_tag.get('content') if date_tag else None
# 如果日期为空,使用当前日期
if not shijian or shijian.strip() == '':
shijian = str(date.today())
print(shijian)
try:
keywords = self.extract_keywords(title2)
description, favicon, img_urls = self.extract_page_info(soup)
if favicon:
favicon=self.get_image_data_uri(favicon);
c = self.conn.cursor()
c.execute(
'INSERT INTO contents(title, url, favicon, description, keywords, date, img) VALUES (?, ?, ?, ?, ?, ?, ?)',
(title, url, favicon, description, ','.join(keywords), shijian, '\n'.join(img_urls)))
self.conn.commit()
self.crawled_urls.add(url)
print(f'正在爬取 '{url}' 并保存到数据库...')
except sqlite3.IntegrityError:
pass
if depth < self.max_depth:
links = soup.find_all('a', href=True)
for link in links:
next_url = link['href']
if not next_url.startswith('http'):
next_url = url + next_url
self.add_urls([next_url]) # 添加新的URL到队列中
@staticmethod
def extract_keywords(title):
words = [word for word in jieba.cut(title) if word not in string.punctuation] # 分词并去掉标点符号
keywords = [word for word in words if len(word) > 0] # 去掉单个字
keywords = list(set(keywords)) # 去重
keywords.sort(key=words.index) # 按在 title 中出现的顺序排序
#keywords = keywords[:10] # 只保留前 10 个关键词
return keywords
@staticmethod
def extract_page_info(soup):
description = ''
favicon = ''
img_urls = []
meta_description = soup.find('meta', attrs={'name': 'description'})
if meta_description and meta_description.has_attr('content'):
description = meta_description['content']
link_favicon = soup.find('link', attrs={'rel': 'icon'})
if link_favicon and link_favicon.has_attr('href'):
favicon = link_favicon['href']
img_links = soup.find_all('img')
img_urls = [img.get('src') for img in img_links]
img_urls = [img for img in img_urls if img is not None]
return description, favicon, img_urls
def worker(self):
while True:
url = None
with self.lock:
if self.url_queue:
url = self.url_queue.pop(0)
if url is None:
break
# 添加随机延时
delay = random.uniform(1, 3)
time.sleep(delay)
self.crawl_and_save(url)
def run(self):
threads = []
for _ in range(self.num_workers):
t = threading.Thread(target=self.worker)
t.start()
threads.append(t)
for t in threads:
t.join()
self.conn.close()
#self.run()
if __name__ == '__main__':
crawler = Crawler(max_depth=5, num_workers=5)
可能有一些bug,提示词功能已经加好了需要html前端中更改id。
联系客服