from sqlalchemy import func from .user import Users from main import db class Article(db.Model): __tablename__ = 'article' # 加入数据库中的所有列 articleid = db.Column(db.String(64), primary_key=True) userid = db.Column(db.String(64), db.ForeignKey('user.userid')) headline = db.Column(db.String(255), nullable=False) content = db.Column(db.String(255), nullable=False) type = db.Column(db.Integer, nullable=False) # 通常使用 db.Integer 而不是 db.int(255) readcount = db.Column(db.Integer, nullable=False) recommended = db.Column(db.Integer, nullable=False) credit = db.Column(db.Integer, nullable=False) hidden = db.Column(db.Integer, nullable=False) drafted = db.Column(db.Integer, nullable=False) checked = db.Column(db.Integer, nullable=False) replycount = db.Column(db.Integer, nullable=False) # 评论数量 createtime = db.Column(db.DateTime, nullable=False) # 使用 DateTime 类型,并设置默认值为当前 UTC 时间 updatetime = db.Column(db.DateTime, nullable=False) # 可以添加其他字段,如 title, content 等 # 关联 Users 表 user = db.relationship('Users', backref=db.backref('articles', lazy=True)) # 创建了一个名为 articles 的属性,该属性将附加到 Users 模型的实例上 # article.user可以直接用 # 查询所有文章 @classmethod def find_all(cls): options = db.joinedload(cls.user) return cls.query.options(options).all() # 根据id查询文章 @classmethod def find_by_id(cls, articleid): # 因为每一篇文章要使用到用户的内容信息,所以要关联到用户信息表user options = db.joinedload(cls.user) return cls.query.options(options).get(articleid) # Ai 给的东西 #---------------------------------------------------------- @classmethod def find_max_less_than_id(cls, articleid): # 首先找到小于给定articleid的最大articleid max_id = cls.query.filter(cls.articleid < articleid).order_by(cls.articleid.desc()).first() if max_id is None: return None # 如果没有找到任何小于给定articleid的文章,则返回None # 然后使用这个最大的articleid来查询文章,并关联用户信息 options = db.joinedload(cls.user) max_article = cls.query.options(options).get(max_id.articleid) return max_article @classmethod def find_min_greater_than_id(cls, articleid): # 首先找到大于给定articleid的最小articleid min_id_query = cls.query.filter(cls.articleid > articleid).order_by(cls.articleid.asc()).limit(1) min_id_result = min_id_query.first() if min_id_result is None: return None # 如果没有找到任何大于给定articleid的文章,则返回None # 提取出最小的articleid min_id = min_id_result.articleid # 然后使用这个最小的articleid来查询文章,并关联用户信息 options = db.joinedload(cls.user) min_article = cls.query.options(options).get(min_id) return min_article #------------------------------------------------------- # 分页查询 @classmethod def find_limit_with_user(cls, offset, limit): ''' 自定义每一页的数量 :param offset: 开始的位置 :param limit: 结束的位置 :return: 查询到的每一页的类 ''' # 加载options options = db.joinedload(cls.user) return cls.query.options(options).filter_by(drafted=0).order_by(Article.articleid.desc()).limit(limit).offset( offset).all() # 统计当前文章的数量 @classmethod def get_total_count(cls): return cls.query.count() # 根据文章类型获得文章 @classmethod def find_by_type(cls, type, offset, limit): return cls.query.filter_by(type=type).order_by(Article.articleid.desc()).limit(limit).offset(offset).all() # 根据文章类型获取总数量 @classmethod def get_total_count_by_type(cls, type): return cls.query.filter_by(type=type).count() # 根据文章标题进行模糊搜索 @classmethod def find_by_headline(cls, headline, offset, limit): return cls.query.filter(Article.headline.like('%' + headline + '%')).limit(limit).offset(offset).all() # 统计模糊搜索的总数量 @classmethod def get_total_count_by_like_headline(cls, headline): return cls.query.filter(Article.headline.like('%' + headline + '%')).count() # 最新文章 @classmethod def side_new(cls): return cls.query.filter().order_by(Article.articleid.desc()).limit(9).all() # 特别推荐 @classmethod def side_recommend(cls): return cls.query.filter(Article.recommended == 1).order_by(func.rand()).limit(9).all() # func.rand()随机抽取 # 最多阅读 @classmethod def side_most(cls): return cls.query.filter().order_by(Article.readcount.desc()).limit(9).all() # 一次性返回三个数据 @classmethod def side_method(cls): return cls.side_new(), cls.side_most(), cls.side_recommend() # 每阅读一次,阅读次数加一 @classmethod def update_readcount(cls, articleid): # 使用SQLAlchemy的update方法直接在数据库层面更新,避免竞态条件 cls.query.filter_by(articleid=articleid).update({cls.readcount: cls.readcount + 1}) db.session.commit() # 评论数量加一 @classmethod def replaycount_add(cls,articleid): cls.query.filter_by(articleid=articleid).update({cls.replycount: cls.replycount + 1}) db.session.commit() # 后台管理部分 # 查询article表中除草稿外的所有数据并返回结果集 @classmethod def find_all_except_draft(cls, start, count): result = cls.query.filter(Article.drafted == 0).order_by( Article.articleid.desc()).limit(count).offset(start).all() return result @classmethod def get_count_except_draft(cls): count = cls.query.filter(Article.drafted == 0).count() return count @classmethod def find_by_type_except_draft(cls, start, count, type): if type == 0: result = cls.find_all_except_draft(start, count) total = cls.get_count_except_draft() else: result = cls.query.filter(Article.drafted == 0, Article.type == type).order_by(Article.articleid.desc())\ .limit(count).offset(start).all() total = cls.query.filter(Article.drafted == 0, Article.type == type).count() return result, total # 返回分页结果集和不分页的总数量 @classmethod def find_by_headline_except_draft(cls, headline): result = cls.query.filter(Article.headline.like('%' + headline + '%'))\ .order_by(Article.articleid.desc()).all() return result @classmethod def switch_hidden(cls, articleid): print("articleid=", articleid) row = cls.query.filter_by(articleid=articleid).first() if row.hidden == 1: row.hidden = 0 else: row.hidden = 1 db.session.commit() print("hidden=", row.hidden) return row.hidden # 将当前最新状态返回给控制层 @classmethod def replaycount_add(cls, articleid): cls.query.filter_by(articleid=articleid).update({cls.replycount: cls.replycount + 1}) db.session.commit() @classmethod def switch_recommended(cls, articleid): row = cls.query.filter_by(articleid=articleid).first() if row.recommended == 1: cls.query.filter_by(articleid=articleid).update({cls.recommended: cls.recommended - 1}) else: cls.query.filter_by(articleid=articleid).update({cls.recommended: cls.recommended + 1}) db.session.commit() return row.recommended @classmethod def switch_checked(cls, articleid): print("数据库中articleid=",articleid) row = cls.query.filter_by(articleid=articleid).first() print("数据库中row.checked=",row.checked) if row.checked == 1: row.checked = 0 else: row.checked = 1 db.session.commit() return row.checked # 删除文章信息 @classmethod def do_deletesign(cls, articleid): result = cls.query.filter_by(articleid=articleid).first() if result: db.session.delete(result) db.session.commit()