Files
fastapi_server/app/core/database.py

88 lines
2.8 KiB
Python

import asyncio
import sqlite3
from pathlib import Path
from typing import Any
class Database:
def __init__(self, path: Path) -> None:
self.path = path
async def initialize(self) -> None:
await asyncio.to_thread(self._initialize)
async def execute(self, sql: str, params: tuple[Any, ...] = ()) -> int:
return await asyncio.to_thread(self._execute, sql, params)
async def fetchone(
self,
sql: str,
params: tuple[Any, ...] = (),
) -> dict[str, Any] | None:
return await asyncio.to_thread(self._fetchone, sql, params)
async def fetchall(
self,
sql: str,
params: tuple[Any, ...] = (),
) -> list[dict[str, Any]]:
return await asyncio.to_thread(self._fetchall, sql, params)
def _connect(self) -> sqlite3.Connection:
self.path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(self.path)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
return conn
def _initialize(self) -> None:
conn = self._connect()
try:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS admin_user (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
password TEXT NOT NULL,
user_type TEXT NOT NULL DEFAULT 'admin',
nickname TEXT NOT NULL DEFAULT '',
phone TEXT NOT NULL DEFAULT '',
email TEXT NOT NULL DEFAULT '',
status INTEGER NOT NULL DEFAULT 1,
login_ip TEXT NOT NULL DEFAULT '',
login_time TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
remark TEXT NOT NULL DEFAULT ''
)
"""
)
conn.commit()
finally:
conn.close()
def _execute(self, sql: str, params: tuple[Any, ...]) -> int:
conn = self._connect()
try:
cursor = conn.execute(sql, params)
conn.commit()
return int(cursor.lastrowid or 0)
finally:
conn.close()
def _fetchone(self, sql: str, params: tuple[Any, ...]) -> dict[str, Any] | None:
conn = self._connect()
try:
row = conn.execute(sql, params).fetchone()
return dict(row) if row else None
finally:
conn.close()
def _fetchall(self, sql: str, params: tuple[Any, ...]) -> list[dict[str, Any]]:
conn = self._connect()
try:
rows = conn.execute(sql, params).fetchall()
return [dict(row) for row in rows]
finally:
conn.close()