from datetime import UTC, datetime from app.common.repository.base_repository import BaseRepository from app.common.security.password_hasher import hash_password from app.constants.model.admin_user.admin_user_status_code import AdminUserStatusCode from app.model.admin_user import AdminUser class AdminUserRepository(BaseRepository): async def find_by_username(self, username: str) -> AdminUser | None: row = await self.database.fetchone( """ SELECT id, username, password, user_type, nickname, phone, email, status, login_ip, login_time, remark FROM admin_user WHERE username = ? LIMIT 1 """, (username,), ) return AdminUser.from_row(row) if row else None async def find_by_id(self, admin_id: int) -> AdminUser | None: row = await self.database.fetchone( """ SELECT id, username, password, user_type, nickname, phone, email, status, login_ip, login_time, remark FROM admin_user WHERE id = ? LIMIT 1 """, (admin_id,), ) return AdminUser.from_row(row) if row else None async def record_login(self, admin_id: int, login_ip: str) -> None: now = self._now() await self.database.execute( """ UPDATE admin_user SET login_ip = ?, login_time = ?, updated_at = ? WHERE id = ? """, (login_ip, now, now, admin_id), ) async def ensure_seed_admin(self, username: str, password: str) -> None: exists = await self.find_by_username(username) if exists is not None: return now = self._now() await self.database.execute( """ INSERT INTO admin_user ( username, password, user_type, nickname, phone, email, status, login_ip, login_time, created_at, updated_at, remark ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( username, hash_password(password), "SuperAdmin", "Super Admin", "", "", int(AdminUserStatusCode.NORMAL), "", "", now, now, "seeded by application startup", ), ) @staticmethod def _now() -> str: return datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S")