Access 作为一款文件型数据库,与 SQLite 类似,可以作为简单应用解决方案的数据存储。根据 DB-Engines Ranking 的数据库排名数据,MS Access 在 2020年12月的排名为 11 位,不算太小众。
Python 没有专门针对 Access 数据库的驱动,但微软提供了 ODBC 方式,可以访问 Access ,python 有支持 odbc 访问数据库的第三方模块 pyodbc,所以可以通过 pyodbc 作为桥接的工具。
MS Office 有 32 位版本和 64 位版,如果操作系统版本与 Office 版本不一致,使用 pyodbc 的时候可能会遇到错误。假设操作系统是 Windows 64 位,但Office 版本是 32 位的,此时安装 ODBC for MS Access,应该是 32 位的,将出现 pyodbc 找不到 32 位驱动的情况,如果尝试安装 64 位 odbc 驱动,Windows 将提示已经安装 32 位驱动,不允许再安装 64 位驱动。
解决办法是使用微软提供的 Orca 工具(本文的后面的源代码提供了 Orca 工具),修改 64 位 odbc 安装文件的数据表,删除里面的 blockinstallation 限制。先通过下面的网址下载 64 位驱动 (https://www.microsoft.com/en-us/download/details.aspx?id=13255),将下载的 AccessDatabaseEngine_X64.exe 文件解压缩,用 Orca 工具打开里面的文件 AceRedist.msi,找到 launchcondition:
pyodbc 符合 python DB-API 2.0 规范。下面是 pyodbc 访问 MS Access 数据库的示例,包括 CRUD 操作,以及有参数 的 sql 语句操作方法。本篇的目的并不是详细讲解 pyodbc 的用法,只演示一个大概的模式。
import pyodbc import unittest conn = pyodbc.connect(DSN="msaccess_employees") cursor = conn.cursor() class TestPyOdbc(unittest.TestCase): def test_select(self): cursor.execute("select * from employees") result = cursor.fetchall() # result为list类型 for item in result: print(item) # item为pyodbc.Row类型 def test_insert(self): sql = """ INSERT INTO employees ( EMP_ID, FIRST_NAME, LAST_NAME, GENDER, AGE, EMAIL, PHONE_NR, EDUCATION, MARITAL_STAT,NR_OF_CHILDREN ) VALUES ('9001', 'Stone', 'Wang', 'Male', 18, 'stone@126.com', '138xxx', 'Bachelor', 'Married', 2 ); """ cursor.execute(sql) conn.commit() def test_update(self): sql = "update employees set AGE=20 where EMP_ID=9001" cursor.execute(sql) conn.commit() def test_delete(self): cursor.execute("delete from employees where EMP_ID=9001") def test_sql_with_parameter(self): sql = "select * from employees where EMP_ID=?" cursor.execute(sql, ['1001']) print(cursor.fetchone()) if __name__ == "__main__": unittest.main()
提一下连接字符串 (connection string),如果 Windows 版本与 Office 版本一致,连接字符串可以这样写:
conn = pyodbc.connect("Driver={{Microsoft Access Driver (*.mdb, *.accdb)}};DBQ=<ms_access_db_path>")
版本不一致则只能通过 DSN 来访问,打开 ODBC 数据源 (odbcad32.exe),创建一个新的数据源,选择 MS Access 驱动:
adodbapi 库对 ADO 进行了封装,符合 DB-API 2.0 规范。项目在 pypi 的地址:https://pypi.org/project/adodbapi/,源代码托管在 SourceForge 上面。对 adodbapi 网络上没有太多文档,大家可以参考的主要是该项目提供的 quick reference,本文的源代码中附了 quick reference 文档。
以下是 adodbapi 的使用示例:
import adodbapi import os import unittest def get_current_dir(): """ 获取当前文件夹 """ return os.path.dirname(os.path.abspath(__file__)) db_file_path = get_current_dir() + r'\db\Employees.accdb' conn_str = "Provider=Microsoft.ACE.OLEDB.12.0;Data Source=%s;" % db_file_path conn = adodbapi.connect(conn_str) cursor = conn.cursor() class TestAdodbapi(unittest.TestCase): def test_select(self): cursor.execute("select * from employees;") result = cursor.fetchall() # result为adodbpai.apibase.SQLRows类型 for item in result: print(item) # itemadodbpai.apibase.SQLRow类型 def test_insert(self): sql = """ INSERT INTO employees ( EMP_ID, FIRST_NAME, LAST_NAME, GENDER, AGE, EMAIL, PHONE_NR, EDUCATION, MARITAL_STAT, NR_OF_CHILDREN) VALUES ('9001', 'Stone', 'Wang', 'Male', 18, 'stone@126.com', '138xxx', 'Bachelor', 'Married', 2); """ cursor.execute(sql) conn.commit() def test_update(self): sql = "update employees set AGE=20 where EMP_ID=9001" cursor.execute(sql) conn.commit() def test_delete(self): cursor.execute("delete from employees where EMP_ID=9001") def test_sql_with_paramter(self): sql = "select * from employees where EMP_ID=?" cursor.execute(sql, (1001,)) print(cursor.fetchone()) if __name__ == "__main__": unittest.main()
ADO 是微软提供的 数据访问技术,我们也可以通过 pywin32 模块操作 ADO 组件,实现对 MS Access 数据库的访问。与 DB-API 2.0 规范相比,ADO 访问数据库的代码相对繁琐,但如果已经比较熟悉 ADO 编程模型,编写数据访问的代码也比较简单,而且我们可以利用 Python 面向对象的方法,对原生 ADO 按自己的需要进行封装,从而简化代码。
ADO 对象模型中,核心的是 Connection, Command 和 RecordSet 三个。Connection 代表与数据库的连接,Command 对象用于执行 SQL 语句,比如插入数据,修改数据等。RecordSet 对象代表从数据库获取的数据,可以用遍历的方式查看数据。RecordSet 对象本身也可以进行数据的插入、修改和删除等操作。
from win32com.client import Dispatch
class ConnectionWrapper(object):
def __init__(self, conn_str):
self.connection_string = conn_str
def get_connection(self):
conn = Dispatch("ADODB.Connection")
conn.ConnectionString = self.connection_string
return conn
from win32com.client import Dispatch from . import error_handler from . import adoconstants class CommandWrapper(object): @staticmethod def execute(conn, sql): cmd = Dispatch("ADODB.Command") try: conn.Open() cmd.ActiveConnection = conn cmd.CommandText = sql cmd.execute() except Exception as ex: print(ex) for err in conn.Errors: error_handler.print_error(err) finally: if conn.State == adoconstants.adStateOpen: conn.Close()
RecordSetWrapper 对象主要通过 query()
方法获取数据,参数可以是 sql 语句或者 table name。rst_to_list()
方法用于以 list 格式输出结果集,to_excel()
方法用于将 RecordSet 对象导出到 Excel 的工作表。
from win32com.client import Dispatch from ADOWrapper.adoconstants import * class RecordSetWrapper(object): def __init__(self): pass @staticmethod def rst_to_list(recordset): """ Convert recordset to list """ result = [] if not (recordset.BOF and recordset.EOF): # header line header = [] for idx in range(recordset.Fields.Count): header.append(recordset.Fields(idx).Name) result.append(header) # line items # Python对于数据库的NULL值自动转换成None recordset.MoveFirst() while not recordset.EOF: item = [] for idx in range(recordset.Fields.Count): item.append(str(recordset.Fields(idx))) result.append(item) recordset.MoveNext() return result @staticmethod def query(conn, sql): rst = Dispatch("ADODB.Recordset") result = [] try: if conn.state != adStateOpen: conn.Open() rst.Open(sql, conn, adOpenKeyset, adLockReadOnly) result = RecordSetWrapper.rst_to_list(rst) except Exception as ex: print(ex) for err in conn.Errors: print(err.Description) finally: rst.Close() conn.Close() return result @staticmethod def get_recordset(conn, sql): rst = Dispatch("ADODB.Recordset") conn.Open() rst.Open(sql, conn, adOpenKeyset, adLockReadOnly) return rst @staticmethod def to_excel(recordset, excel_file, replace = False): # create excel file excel_app = Dispatch("Excel.Application") excel_app.Visible = True try: work_book = excel_app.Workbooks.Add() target_sheet = work_book.ActiveSheet # copy recordset header for idx in range(0, recordset.Fields.Count): target_sheet.cells(1, idx+1).Value = recordset.Fields(idx).Name # copy recordset lines target_sheet.Range("A2").CopyFromRecordSet(recordset) print("导出成功!") finally: recordset.Close()
以下是单元测试代码:
from ADOWrapper.ado_command import CommandWrapper from ADOWrapper.ado_connection import ConnectionWrapper from ADOWrapper.ado_recordset import RecordSetWrapper from msaccess_db_file_path import get_access_db_file import unittest # 连接MS Access数据库 conn_str = "Provider=Microsoft.ACE.OLEDB.12.0;Data Source=%s;" % get_access_db_file() conn = ConnectionWrapper(conn_str).get_connection() class TestAdoWrapper(unittest.TestCase): def test_insert(self): sql = """ INSERT INTO employees ( EMP_ID, FIRST_NAME, LAST_NAME, GENDER, AGE, EMAIL, PHONE_NR, EDUCATION, MARITAL_STAT, NR_OF_CHILDREN) VALUES ('9001', 'Stone', 'Wang', 'Male', 18, 'stone@126.com', '138xxx', 'Bachelor', 'Married', 2 ); """ CommandWrapper.execute(conn, sql) def test_update(self): sql = "UPDATE employees SET AGE=20 WHERE EMP_ID=9001" CommandWrapper.execute(conn, sql) def test_delete(self): CommandWrapper.execute(conn, "DELETE FROM employees WHERE EMP_ID=9001") def test_query(self): result = RecordSetWrapper.query(conn, "SELECT * FROM employees") for record in result: print(record) def test_query_table(self): result = RecordSetWrapper.query(conn, "employees") for record in result: print(record) def test_export_to_excel(self): rst = RecordSetWrapper.get_recordset(conn, "select * from employees where EMP_ID<1020;") RecordSetWrapper.to_excel(rst, "D:/employee_output.xlsx") if __name__ == "__main__": unittest.main()
原生RecordSet 对象支持数据库的 CRUD 操作,以下代码演示了 Python 使用原生 RecordSet 的方法:
""" 原生的|RecordSet具备CRUD功能,本示例说明其用法 """ from ADOWrapper.ado_connection import ConnectionWrapper from ADOWrapper.ado_recordset import RecordSetWrapper from ADOWrapper.adoconstants import * from win32com.client import Dispatch from msaccess_db_file_path import get_access_db_file import unittest # 连接MS Access数据库 conn_str = "Provider=Microsoft.ACE.OLEDB.12.0;Data Source=%s;" % get_access_db_file() conn = ConnectionWrapper(conn_str).get_connection() class TestRecordSet(unittest.TestCase): def test_list_employees(self): rst = RecordSetWrapper.get_recordset(conn, "select * from employees") if rst.BOF and rst.EOF: return rst.MoveFirst() while not rst.EOF: for idx in range(0, rst.Fields.Count): print(rst.Fields(idx), end=",") print() rst.MoveNext() rst.Close() conn.Close() def test_create_employee(self): rst = Dispatch("ADODB.Recordset") conn.Open() rst.Open('employees', conn, adOpenKeyset, adLockOptimistic) try: rst.AddNew() rst.Fields("EMP_ID").Value = '9002' rst.Fields("FIRST_NAME").Value = 'Stone' rst.Fields("LAST_NAME").Value = "Wang" rst.Update() print('新增记录成功!') except Exception as ex: print(ex) for err in conn.Errors: print(err) finally: rst.Close() conn.Close() def test_modify_employee(self): rst = Dispatch("ADODB.Recordset") try: conn.Open() sql = "SELECT * FROM employees WHERE EMP_ID=9002" rst.Open(sql, conn, adOpenKeyset, adLockOptimistic) if not rst.EOF: rst.Fields('AGE').Value = 18 rst.Update() print('修改成功!') except Exception as ex: print(ex) for err in conn.Errors: print(err) finally: rst.Close() conn.Close() def test_delete_employee(self): rst = Dispatch("ADODB.Recordset") conn.Open() sql = "SELECT * FROM employees WHERE EMP_ID=9002" try: # IMPORTANT: client cursor should be used for deletion rst.CursorLocation = adUseClient rst.Open(sql, conn, adOpenKeyset, adLockOptimistic) if not rst.EOF: rst.Delete(1) # deleter first row print('删除成功!') else: print('没有找到记录!') except Exception as ex: print(ex) for err in conn.Errors: print(err) finally: rst.Close() conn.Close() if __name__ == '__main__': unittest.main()
sqlalchemy 并不直接支持 MS Access 数据库,但可以通过安装 sqlalchemy-access 模块来提供支持。sqlalchemy-access 在 pypi 的地址:https://pypi.org/project/sqlalchemy-access/。安装的方法:
pip install sqlalchemy-access
如果小伙伴对 sqlalchemy 的使用感兴趣,请自行寻找资源学习。本文也只是演示基本的使用。sqlalchemy 的优点是基于 ORM,不需要手工编写 sql 语句,但需要用代码定义 model,这个 model 可以用 sqlacodegen 基于数据库表来自动生成。以下是 model 定义的代码:
from sqlalchemy import Column, String, Integer
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Employee(Base):
__tablename__ = "employees"
emp_id = Column("EMP_ID", String(255), primary_key=True)
first_name = Column("FIRST_NAME", String(255))
last_name = Column("LAST_NAME", String(255))
gender = Column("GENDER", String(255))
age = Column("AGE", Integer)
email = Column("EMAIL", String(255))
phone = Column("PHONE_NR", String(255))
education = Column("EDUCATION", String(255))
marital_stat = Column("MARITAL_STAT", String(255))
children = Column("NR_OF_CHILDREN", Integer)
def __repr__(self):
return "Employee <{emp_id},{first_name},{last_name},{gender},{age},{email},{phone},{education},{marital_stat},{children}>".format(
emp_id=self.emp_id,
first_name=self.first_name,
last_name=self.last_name,
gender=self.gender,
age=self.age,
email=self.email,
phone=self.phone,
education=self.education,
marital_stat=self.marital_stat,
children=self.children
)
以下是基于 sqlalchemy 增删改查的代码示例:
import unittest from employee_sqlalchemy.models import Employee from sqlalchemy import create_engine, MetaData from sqlalchemy.orm import sessionmaker engine = create_engine("access+pyodbc://EMP_MSACCESS") Session = sessionmaker(bind=engine) session = Session() class TestSqlalchemy(unittest.TestCase): def test_query(self): employees = session.query(Employee).all() for emp in employees: print(emp) def test_query_one(self): employees = session.query(Employee).filter(Employee.first_name=="Ted").all() for emp in employees: print(emp) def test_insert(self): new_emp = Employee( emp_id =9001, first_name = "Alice", last_name = "Brown" ) session.add(new_emp) session.commit() session.close() def test_update(self): emp = session.query(Employee).get(9001) if emp is not None: emp.age = 20 session.commit() session.close() def test_delete(self): emp = session.query(Employee).get(9001) if emp is not None: session.delete(emp) session.commit() session.close() if __name__ == "__main__": unittest.main()
github - python-using-msaccess-db
联系客服