Files
wixon_blog/app.py
2026-01-16 16:12:48 +09:00

575 lines
21 KiB
Python

from flask import Flask, render_template, request, redirect, url_for, flash, session
import pymysql
from flask import session, g, jsonify
import bcrypt
from bs4 import BeautifulSoup
from werkzeug.utils import secure_filename
import os
import uuid
import re
from markupsafe import Markup
from jinja2 import filters
from functools import wraps
import math
UPLOAD_FOLDER = 'static/upload/img' # 경로를 Flask 앱 루트 기준으로 수정
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'}
app = Flask(__name__)
app.secret_key = 'your secret key'
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
def remove_html_tags(text):
clean = re.compile('<.*?>')
return re.sub(clean, '', text)
@app.before_request
def load_user():
g.is_login = False
g.user_info = None
if 'user_info' in session:
g.is_login = True
g.user_info = session['user_info']
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@app.route('/upload_image', methods=['POST'])
def upload_image():
if 'file' not in request.files:
return jsonify(success=False, message='No file part'), 400
file = request.files['file']
if file.filename == '':
return jsonify(success=False, message='No selected file'), 400
if file and allowed_file(file.filename):
# Secure the filename and keep its extension
filename = secure_filename(file.filename)
ext = filename.rsplit('.', 1)[1].lower()
# Generate a random filename using uuid4
random_filename = f'{uuid.uuid4().hex}.{ext}'
file_path = os.path.join(app.config['UPLOAD_FOLDER'], random_filename)
# Create directories if not exist
os.makedirs(os.path.dirname(file_path), exist_ok=True)
file.save(file_path)
# Generate the URL of the saved image file
file_url = url_for('static', filename='upload/img/' + random_filename)
return jsonify(success=True, fileUrl=file_url), 200
return jsonify(success=False, message='File not allowed'), 400
# MySQL 데이터베이스 연결 설정
def connnect_db():
return pymysql.connect(
host='wxnasso.synology.me',
user='wixon5',
password='Wixon2022@!',
database='test',
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor, # DictCursor를 사용하여 딕셔너리 형태로 결과를 반환
init_command='SET SQL_SAFE_UPDATES = 0;',
)
def sql_execute(q, d, is_data=False, is_last_id=False):
data = None
last_id = None
with connnect_db().cursor(pymysql.cursors.DictCursor) as cursor:
try:
res = cursor.execute(q, d)
data = cursor.fetchall() if is_data else None
last_id = cursor.lastrowid if cursor.lastrowid != 0 else None
cursor.connection.commit()
except Exception as e:
print(e)
cursor.connection.rollback()
res = False
return (res, last_id if is_last_id else data) if is_data or is_last_id else res
@app.route('/')
def index():
if 'user_info' in session: # 로그인된 사용자
query = "SELECT `id`, `title`, `category`, `thumbnail_img`, `contents`, `add_date` FROM `blog` WHERE `use_yn` = 'Y' ORDER BY `add_date` DESC limit 6;"
r_query = "SELECT `id`, `title`, `category`, `thumbnail_img`, `contents`, `add_date` FROM `blog` WHERE `use_yn` = 'Y' ORDER BY RAND() DESC limit 1;"
else:
query = "SELECT `id`, `title`, `category`, `thumbnail_img`, `contents`, `add_date` FROM `blog` WHERE `use_yn` = 'Y' and `public_yn` = 'Y' ORDER BY `add_date` DESC limit 6;"
r_query = "SELECT `id`, `title`, `category`, `thumbnail_img`, `contents`, `add_date` FROM `blog` WHERE `use_yn` = 'Y' and `public_yn` = 'Y' ORDER BY RAND() DESC limit 1;"
r, posts = sql_execute(query, (), is_data=True)
r, random_post = sql_execute(r_query, (), is_data=True)
if random_post:
posts.append(random_post[0])
# 태그 제거 후 150자로 제한
for post in posts:
post['contents'] = remove_html_tags(post['contents'])[:150]
return render_template('index.html', posts=posts)
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password').encode('utf-8')
# DB에서 사용자 정보 가져오기
res, user = sql_execute("SELECT * FROM member WHERE mb_id=%s", (username,), is_data=True)
if res and user and bcrypt.checkpw(password, user[0]['mb_passwd'].encode('utf-8')):
session['username'] = username
session['user_info'] = user[0]
return redirect(url_for('index'))
else:
flash('Username or password is incorrect')
return render_template('login.html')
@app.route('/logout')
def logout():
session.clear()
g.is_login = False
g.user_info = None
return redirect(url_for('index'))
@app.route('/post/<int:post_id>')
def post(post_id):
# SQL 쿼리와 삽입할 데이터 설정
query = "SELECT blog.*, member.mb_name, member.mb_id FROM blog INNER JOIN member ON blog.user_id = member.mb_idx WHERE blog.id = %s"
data = (post_id,)
# Post를 데이터베이스에서 검색
result, fetched_data = sql_execute(query, data, is_data=True)
if result:
# 검색 성공시, 포스트 상세 페이지로 이동
post = fetched_data[0]
# 이전과 다음 포스트를 찾기
if 'user_info' in session: # 로그인된 사용자가 있을 경우
# 외부 공개 안된 글도 포함하여 모든 블로그 포스트를 가져옵니다.
prev_query = "SELECT blog.*, member.mb_name, member.mb_id FROM blog INNER JOIN member ON blog.user_id = member.mb_idx WHERE blog.add_date < %s AND blog.use_yn = 'Y' ORDER BY blog.add_date DESC LIMIT 1"
next_query = "SELECT blog.*, member.mb_name, member.mb_id FROM blog INNER JOIN member ON blog.user_id = member.mb_idx WHERE blog.add_date > %s AND blog.use_yn = 'Y' ORDER BY blog.add_date ASC LIMIT 1"
else: # 로그인된 사용자가 없을 경우
# 외부 공개된 블로그 포스트만 가져옵니다.
prev_query = "SELECT blog.*, member.mb_name, member.mb_id FROM blog INNER JOIN member ON blog.user_id = member.mb_idx WHERE blog.add_date < %s AND blog.public_yn = 'Y' AND blog.use_yn = 'Y' ORDER BY blog.add_date DESC LIMIT 1"
next_query = "SELECT blog.*, member.mb_name, member.mb_id FROM blog INNER JOIN member ON blog.user_id = member.mb_idx WHERE blog.add_date > %s AND blog.public_yn = 'Y' AND blog.use_yn = 'Y' ORDER BY blog.add_date ASC LIMIT 1"
prev_post = None
next_post = None
prev_result, prev_fetched_data = sql_execute(prev_query, (post['add_date'],), is_data=True)
next_result, next_fetched_data = sql_execute(next_query, (post['add_date'],), is_data=True)
if prev_result and prev_fetched_data:
prev_post = prev_fetched_data[0]
if next_result and next_fetched_data:
next_post = next_fetched_data[0]
return render_template('post.html', post=post, prev_post=prev_post, next_post=next_post)
else:
# 검색 실패시, 에러 메시지 반환
return "Failed to fetch the post", 500
@app.route('/edit_post/<int:post_id>', methods=['GET', 'POST'])
def edit_post(post_id):
if 'username' not in session or ('username' in session and session['username'] not in ['admin', 'wixon']):
flash('You are not allowed to edit this post.')
return redirect(url_for('index'))
if request.method == 'POST':
# 포스트 업데이트 로직
title = request.form.get('title')
category = request.form.get('category')
public_yn = 'Y' if request.form.get('public') == 'on' else 'N'
contents = request.form.get('contents')
query = "UPDATE blog SET title = %s, category = %s, public_yn = %s, contents = %s WHERE id = %s"
data = (title, category, public_yn, contents, post_id)
res = sql_execute(query, data)
if res:
flash('Post updated successfully.')
return redirect(url_for('post', post_id=post_id))
else:
flash('Failed to update the post.')
return redirect(url_for('edit_post', post_id=post_id))
else:
# 포스트 가져오기 로직
query = "SELECT * FROM blog WHERE id = %s"
data = (post_id,)
res, fetched_data = sql_execute(query, data, is_data=True)
if res and fetched_data:
return render_template('edit_post.html', post=fetched_data[0])
else:
flash('Failed to fetch the post.')
return redirect(url_for('index'))
@app.route('/write', methods=['GET', 'POST'])
def write():
if 'user_info' not in session:
flash("You need to login first.")
return redirect(url_for('login'))
if request.method == 'POST':
user_id = session['user_info']['mb_idx']
title = request.form['title']
category = request.form['category']
contents = request.form['contents']
is_public = 'Y' if request.form.get('public') == 'on' else 'N'
soup = BeautifulSoup(contents, 'html.parser')
first_image = soup.find('img')
thumbnail_img = first_image['src'] if first_image else None
query = "INSERT INTO blog (user_id, title, category, contents, thumbnail_img, public_yn) VALUES (%s, %s, %s, %s, %s, %s)"
data = (user_id, title, category, contents, thumbnail_img, is_public)
result, last_id = sql_execute(query, data, is_last_id=True)
if result:
return redirect(url_for('index'))
else:
return "Failed to write post", 500
else:
return render_template('write.html')
@app.route('/blog/<int:id>', methods=['DELETE'])
def delete_post(id):
query = "UPDATE `blog` SET `use_yn` = 'N' WHERE `id` = %s;"
res = sql_execute(query, (id,))
if res:
return jsonify(success=True, message='Post deleted successfully'), 200
else:
return jsonify(success=False, message='Could not delete the post'), 500
@app.route('/list')
@app.route('/list/<string:cate>')
def list(cate=None):
params = ()
cate_condition = ""
if cate:
cate_condition = "AND `category` = %s"
params = (cate,)
if 'user_info' in session: # 로그인된 사용자
query = f"""SELECT `id`, `title`, `category`, `thumbnail_img`, `contents`, `add_date`
FROM `blog`
WHERE `use_yn` = 'Y' {cate_condition}
ORDER BY `add_date` DESC LIMIT 10;"""
else:
query = f"""SELECT `id`, `title`, `category`, `thumbnail_img`, `contents`, `add_date`
FROM `blog`
WHERE `use_yn` = 'Y' AND `public_yn` = 'Y' {cate_condition}
ORDER BY `add_date` DESC LIMIT 10;"""
r, posts = sql_execute(query, params, is_data=True)
# 태그 제거 후 150자로 제한
for post in posts:
post['contents'] = remove_html_tags(post['contents'])[:150]
return render_template('list.html', posts=posts)
@app.route('/list_more')
def list_more():
page = int(request.args.get('page', 1))
limit = 10
offset = (page - 1) * limit
cate = request.args.get('cate')
cate_condition = ""
params = []
if cate:
cate_condition = "AND `category` = %s"
params.append(cate)
query = f"""
SELECT `id`, `title`, `category`, `thumbnail_img`, `contents`, `add_date`
FROM `blog`
WHERE `use_yn` = 'Y'
{ "AND `public_yn` = 'Y'" if 'user_info' not in session else "" }
{cate_condition}
ORDER BY `add_date` DESC
LIMIT %s OFFSET %s
"""
params.extend([limit, offset])
r, posts = sql_execute(query, tuple(params), is_data=True)
for post in posts:
post['contents'] = remove_html_tags(post['contents'])[:150]
post['add_date_str'] = post['add_date'].strftime("%b %d / %Y")
return jsonify(posts)
# ============================================
# 관리자 페이지
# ============================================
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'username' not in session or session['username'] not in ['admin', 'wixon', 'javamon']:
flash('관리자 권한이 필요합니다.')
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
@app.route('/admin/')
@admin_required
def admin_dashboard():
stats = {
'total_posts': 0,
'public_posts': 0,
'private_posts': 0,
'deleted_posts': 0,
'total_members': 0
}
r, data = sql_execute("SELECT COUNT(*) as cnt FROM blog WHERE use_yn='Y'", (), is_data=True)
if r and data:
stats['total_posts'] = data[0]['cnt']
r, data = sql_execute("SELECT COUNT(*) as cnt FROM blog WHERE use_yn='Y' AND public_yn='Y'", (), is_data=True)
if r and data:
stats['public_posts'] = data[0]['cnt']
r, data = sql_execute("SELECT COUNT(*) as cnt FROM blog WHERE use_yn='Y' AND public_yn='N'", (), is_data=True)
if r and data:
stats['private_posts'] = data[0]['cnt']
r, data = sql_execute("SELECT COUNT(*) as cnt FROM blog WHERE use_yn='N'", (), is_data=True)
if r and data:
stats['deleted_posts'] = data[0]['cnt']
r, data = sql_execute("SELECT COUNT(*) as cnt FROM member", (), is_data=True)
if r and data:
stats['total_members'] = data[0]['cnt']
return render_template('admin/dashboard.html', stats=stats)
@app.route('/admin/posts')
@admin_required
def admin_posts():
page = request.args.get('page', 1, type=int)
per_page = 20
offset = (page - 1) * per_page
category = request.args.get('category', '')
public_yn = request.args.get('public_yn', '')
use_yn = request.args.get('use_yn', 'Y')
search = request.args.get('search', '')
where_clauses = []
params = []
if category:
where_clauses.append("blog.category = %s")
params.append(category)
if public_yn:
where_clauses.append("blog.public_yn = %s")
params.append(public_yn)
if use_yn:
where_clauses.append("blog.use_yn = %s")
params.append(use_yn)
if search:
where_clauses.append("blog.title LIKE %s")
params.append(f"%{search}%")
where_sql = " AND ".join(where_clauses) if where_clauses else "1=1"
count_query = f"SELECT COUNT(*) as cnt FROM blog INNER JOIN member ON blog.user_id = member.mb_idx WHERE {where_sql}"
r, count_data = sql_execute(count_query, tuple(params), is_data=True)
total = count_data[0]['cnt'] if r and count_data else 0
total_pages = math.ceil(total / per_page) if total > 0 else 1
query = f"""
SELECT blog.*, member.mb_name, member.mb_id
FROM blog
INNER JOIN member ON blog.user_id = member.mb_idx
WHERE {where_sql}
ORDER BY blog.add_date DESC
LIMIT %s OFFSET %s
"""
params.extend([per_page, offset])
r, posts = sql_execute(query, tuple(params), is_data=True)
pagination = {
'page': page,
'per_page': per_page,
'total': total,
'total_pages': total_pages,
'has_prev': page > 1,
'has_next': page < total_pages,
'prev_num': page - 1,
'next_num': page + 1
}
return render_template('admin/posts.html', posts=posts if posts else [], pagination=pagination)
@app.route('/admin/posts/<int:post_id>', methods=['GET', 'POST'])
@admin_required
def admin_post_detail(post_id):
if request.method == 'POST':
title = request.form.get('title')
category = request.form.get('category')
public_yn = 'Y' if request.form.get('public') == 'on' else 'N'
contents = request.form.get('contents')
soup = BeautifulSoup(contents, 'html.parser')
first_image = soup.find('img')
thumbnail_img = first_image['src'] if first_image else None
query = "UPDATE blog SET title=%s, category=%s, public_yn=%s, contents=%s, thumbnail_img=%s WHERE id=%s"
res = sql_execute(query, (title, category, public_yn, contents, thumbnail_img, post_id))
if res:
flash('포스트가 수정되었습니다.')
return redirect(url_for('admin_posts'))
else:
flash('포스트 수정에 실패했습니다.')
query = "SELECT blog.*, member.mb_name, member.mb_id FROM blog INNER JOIN member ON blog.user_id = member.mb_idx WHERE blog.id = %s"
r, data = sql_execute(query, (post_id,), is_data=True)
if r and data:
return render_template('admin/post_detail.html', post=data[0])
else:
flash('포스트를 찾을 수 없습니다.')
return redirect(url_for('admin_posts'))
@app.route('/admin/posts/<int:post_id>/delete', methods=['DELETE'])
@admin_required
def admin_post_delete(post_id):
query = "UPDATE blog SET use_yn='N' WHERE id=%s"
res = sql_execute(query, (post_id,))
if res:
return jsonify(success=True, message='포스트가 삭제되었습니다.')
return jsonify(success=False, message='삭제 실패'), 500
@app.route('/admin/posts/<int:post_id>/restore', methods=['POST'])
@admin_required
def admin_post_restore(post_id):
query = "UPDATE blog SET use_yn='Y' WHERE id=%s"
res = sql_execute(query, (post_id,))
if res:
return jsonify(success=True, message='포스트가 복구되었습니다.')
return jsonify(success=False, message='복구 실패'), 500
@app.route('/admin/members')
@admin_required
def admin_members():
query = "SELECT mb_idx, mb_id, mb_name FROM member ORDER BY mb_idx DESC"
r, members = sql_execute(query, (), is_data=True)
return render_template('admin/members.html', members=members if members else [])
@app.route('/admin/members/add', methods=['GET', 'POST'])
@admin_required
def admin_member_add():
if request.method == 'POST':
mb_id = request.form.get('mb_id')
mb_name = request.form.get('mb_name')
password = request.form.get('password').encode('utf-8')
hashed_pw = bcrypt.hashpw(password, bcrypt.gensalt()).decode('utf-8')
r, existing = sql_execute("SELECT mb_id FROM member WHERE mb_id=%s", (mb_id,), is_data=True)
if r and existing:
flash('이미 존재하는 아이디입니다.')
return render_template('admin/member_form.html', member=None)
query = "INSERT INTO member (mb_id, mb_passwd, mb_name) VALUES (%s, %s, %s)"
res = sql_execute(query, (mb_id, hashed_pw, mb_name))
if res:
flash('회원이 추가되었습니다.')
return redirect(url_for('admin_members'))
flash('회원 추가에 실패했습니다.')
return render_template('admin/member_form.html', member=None)
@app.route('/admin/members/<int:mb_idx>', methods=['GET', 'POST'])
@admin_required
def admin_member_detail(mb_idx):
if request.method == 'POST':
mb_name = request.form.get('mb_name')
query = "UPDATE member SET mb_name=%s WHERE mb_idx=%s"
res = sql_execute(query, (mb_name, mb_idx))
if res:
flash('회원 정보가 수정되었습니다.')
return redirect(url_for('admin_members'))
flash('회원 정보 수정에 실패했습니다.')
query = "SELECT mb_idx, mb_id, mb_name FROM member WHERE mb_idx=%s"
r, member = sql_execute(query, (mb_idx,), is_data=True)
if r and member:
return render_template('admin/member_form.html', member=member[0])
else:
flash('회원을 찾을 수 없습니다.')
return redirect(url_for('admin_members'))
@app.route('/admin/members/<int:mb_idx>/delete', methods=['DELETE'])
@admin_required
def admin_member_delete(mb_idx):
if session.get('user_info', {}).get('mb_idx') == mb_idx:
return jsonify(success=False, message='자기 자신은 삭제할 수 없습니다.'), 400
query = "DELETE FROM member WHERE mb_idx=%s"
res = sql_execute(query, (mb_idx,))
if res:
return jsonify(success=True, message='회원이 삭제되었습니다.')
return jsonify(success=False, message='삭제 실패'), 500
@app.route('/admin/members/<int:mb_idx>/reset-password', methods=['POST'])
@admin_required
def admin_member_reset_password(mb_idx):
data = request.get_json()
new_password = data.get('password', '').encode('utf-8')
if len(new_password) < 4:
return jsonify(success=False, message='비밀번호는 4자 이상이어야 합니다.'), 400
hashed_pw = bcrypt.hashpw(new_password, bcrypt.gensalt()).decode('utf-8')
query = "UPDATE member SET mb_passwd=%s WHERE mb_idx=%s"
res = sql_execute(query, (hashed_pw, mb_idx))
if res:
return jsonify(success=True, message='비밀번호가 재설정되었습니다.')
return jsonify(success=False, message='비밀번호 재설정 실패'), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8899)