import os
|
import sqlite3
|
import json
|
import time
|
import csv
|
import io
|
import ipaddress
|
import random
|
import re
|
import threading
|
import smtplib
|
import schedule
|
from email.mime.text import MIMEText
|
from email.mime.multipart import MIMEMultipart
|
from email.mime.application import MIMEApplication # 新增:用於附件
|
from functools import wraps
|
from datetime import datetime, timedelta
|
from flask import Flask, render_template, request, jsonify, Response
|
from flask_cors import CORS
|
from openpyxl import Workbook
|
from openpyxl.styles import Alignment, Font, Border, Side, PatternFill
|
from openpyxl.utils import get_column_letter
|
|
app = Flask(__name__)
|
CORS(app)
|
|
DB_FILE = 'ncr_data.db'
|
|
# ==========================================
|
# 1. 系統設定區 (請在此手動設定)
|
# ==========================================
|
|
# --- Email 通知設定 ---
|
# 若 Mail Server 為內部 Relay 且無需驗證,user/password 請留空字串 ""
|
MAIL_CONFIG = {
|
'server_ip': '10.192.130.115', # Mail Server IP
|
'port': 25, # Port (通常 25, 465, 587)
|
'sender': 'msdn@hlmt.com.tw', # 寄件者 Email
|
'receivers': ['ra@hlmt.com.tw'], # 主要收件者 (管理員)
|
'cc': ['msdn@hlmt.com.tw'], # 副本收件者 (主管/備份收件人)
|
'user': '', # 帳號 (若無則留空)
|
'password': '', # 密碼 (若無則留空)
|
'check_time': '08:00' # [新增] 每日檢查時間 (24小時制)
|
}
|
|
# 備份檢查閾值:超過幾天沒備份就寄信
|
BACKUP_THRESHOLD_DAYS = 30
|
|
# --- 權限設定 (RBAC IP白名單) ---
|
PERMISSIONS_CONFIG = {
|
'ADMIN': [
|
'127.0.0.1',
|
'::1',
|
'10.192.130.100',
|
'192.1.0.159',
|
'10.192.130.48',
|
'10.192.130.79',
|
'10.192.130.105',
|
'10.192.130.150',
|
'10.192.130.153',
|
'10.192.130.179',
|
],
|
'EDITOR': [
|
'10.176.5.0/24', # ht
|
'10.192.130.114',
|
'192.1.0.64',
|
'192.1.0.206',
|
],
|
'VIEWER': [
|
'*'
|
]
|
}
|
|
# ==========================================
|
# 2. 工具函式 (權限與排程)
|
# ==========================================
|
|
def check_ip_in_list(ip, ip_list):
|
try:
|
target_ip = ipaddress.ip_address(ip)
|
except ValueError:
|
return False
|
for rule in ip_list:
|
try:
|
if '/' in rule:
|
if target_ip in ipaddress.ip_network(rule, strict=False):
|
return True
|
else:
|
if target_ip == ipaddress.ip_address(rule):
|
return True
|
except ValueError:
|
continue
|
return False
|
|
def get_permission_level(ip):
|
if check_ip_in_list(ip, PERMISSIONS_CONFIG['ADMIN']):
|
return 'ADMIN'
|
if check_ip_in_list(ip, PERMISSIONS_CONFIG['EDITOR']):
|
return 'EDITOR'
|
return 'VIEWER'
|
|
def require_role(min_role):
|
def decorator(f):
|
@wraps(f)
|
def decorated_function(*args, **kwargs):
|
user_ip = request.remote_addr
|
role = get_permission_level(user_ip)
|
allowed = False
|
if min_role == 'EDITOR':
|
if role in ['ADMIN', 'EDITOR']:
|
allowed = True
|
elif min_role == 'ADMIN':
|
if role == 'ADMIN':
|
allowed = True
|
|
if not allowed:
|
return jsonify({
|
'success': False,
|
'msg': f'PERMISSION_DENIED: Your IP ({user_ip}) is [{role}], but [{min_role}] is required.'
|
}), 403
|
return f(*args, **kwargs)
|
return decorated_function
|
return decorator
|
|
# ==========================================
|
# 3. 資料庫管理 (含 WAL 模式與遷移)
|
# ==========================================
|
|
def get_db():
|
conn = sqlite3.connect(DB_FILE)
|
conn.row_factory = sqlite3.Row
|
conn.execute("PRAGMA journal_mode=WAL;")
|
return conn
|
|
def init_and_migrate_db():
|
conn = sqlite3.connect(DB_FILE)
|
c = conn.cursor()
|
|
c.execute('''CREATE TABLE IF NOT EXISTS records
|
(uniqueId TEXT PRIMARY KEY, fullFileName TEXT, category TEXT,
|
site TEXT, yearMonth TEXT, sequence TEXT, customName TEXT,
|
status TEXT, remarks TEXT, timestamp TEXT, responsibleUnit TEXT)''')
|
|
c.execute('''CREATE TABLE IF NOT EXISTS access_logs
|
(id INTEGER PRIMARY KEY AUTOINCREMENT, user TEXT, action TEXT, ip_address TEXT, timestamp TEXT)''')
|
|
c.execute('''CREATE TABLE IF NOT EXISTS system_info
|
(key TEXT PRIMARY KEY, value TEXT)''')
|
|
c.execute("PRAGMA table_info(records)")
|
existing_columns = [info[1] for info in c.fetchall()]
|
|
new_cols = [
|
'recurring', 'customerCode', 'complaintId', 'model', 'penaltyDate',
|
'gracePeriod', 'stopAlert', 'returnQty', 'shippedQty', 'defectThreshold',
|
'closingDate',
|
'exceptionDesc', 'severity', 'handlingDesc', 'issueType', 'source',
|
'issuer', 'respUnitType', 'respUnitName', 'respPerson', 'rootCause', 'correctiveAction',
|
'trackingResult', 'fileDate',
|
'version',
|
'filePath' # 關聯檔案路徑
|
]
|
|
for col in new_cols:
|
col_type = 'INTEGER DEFAULT 0' if col == 'version' else ('REAL' if col == 'defectThreshold' else ('INTEGER' if 'Qty' in col or 'Period' in col else 'TEXT'))
|
if col not in existing_columns:
|
print(f"DB Migration: Adding column {col}")
|
try:
|
c.execute(f"ALTER TABLE records ADD COLUMN {col} {col_type}")
|
except Exception as e:
|
print(f"Migration error for {col}: {e}")
|
|
conn.commit()
|
conn.close()
|
|
def log_activity(user, action, ip_addr=None):
|
if not ip_addr: ip_addr = request.remote_addr or 'SYSTEM'
|
try:
|
conn = get_db()
|
conn.execute("INSERT INTO access_logs (user, action, ip_address, timestamp) VALUES (?, ?, ?, ?)",
|
(user, action, ip_addr, datetime.now().isoformat()))
|
conn.commit()
|
conn.close()
|
except: pass
|
|
init_and_migrate_db()
|
|
# ==========================================
|
# 4. 自動化檢查與郵件通知 (背景任務)
|
# ==========================================
|
|
def send_alert_email(days_diff, last_backup_time):
|
"""發送備份逾期警報郵件 (給 Admin + CC)"""
|
cfg = MAIL_CONFIG
|
if not cfg['server_ip']: return
|
|
subject = f"[Alert] NCR System Backup Overdue ({datetime.now().strftime('%Y-%m-%d')})"
|
body = f"""
|
Dear Administrator,
|
|
The NCR System database has not been backed up for {days_diff} days.
|
|
- Last Backup Time: {last_backup_time}
|
- Threshold: {BACKUP_THRESHOLD_DAYS} days
|
|
Please login and perform a backup immediately.
|
(A separate backup file has been sent to the CC list.)
|
|
NCR Online System Auto-Sender
|
"""
|
|
msg = MIMEMultipart()
|
msg['From'] = cfg['sender']
|
msg['To'] = ", ".join(cfg['receivers'])
|
msg['Cc'] = ", ".join(cfg['cc'])
|
msg['Subject'] = subject
|
msg.attach(MIMEText(body, 'plain'))
|
|
all_recipients = cfg['receivers'] + cfg['cc']
|
|
try:
|
server = smtplib.SMTP(cfg['server_ip'], cfg['port'])
|
if cfg['user'] and cfg['password']:
|
server.login(cfg['user'], cfg['password'])
|
server.sendmail(cfg['sender'], all_recipients, msg.as_string())
|
server.quit()
|
print(f"Alert email sent to {all_recipients}")
|
log_activity("SYSTEM", f"AUTO_ALERT_{days_diff}DAYS", "127.0.0.1")
|
except Exception as e:
|
print(f"Failed to send alert email: {str(e)}")
|
|
def send_backup_email_to_cc():
|
"""匯出資料庫並以附件形式單獨寄給 CC 設定的信箱"""
|
cfg = MAIL_CONFIG
|
if not cfg['server_ip'] or not cfg['cc']:
|
print("Mail config or CC list empty, skipping backup email.")
|
return
|
|
# 1. 準備資料 (Dump DB to JSON)
|
try:
|
conn = get_db()
|
cur = conn.cursor()
|
cur.execute("SELECT * FROM records")
|
data = [dict(row) for row in cur.fetchall()]
|
conn.close()
|
|
json_str = json.dumps(data, ensure_ascii=False, indent=2)
|
filename = f"NCR_AutoBackup_{datetime.now().strftime('%Y%m%d')}.json"
|
except Exception as e:
|
print(f"Failed to generate backup JSON: {str(e)}")
|
return
|
|
# 2. 準備信件
|
subject = f"[Auto Backup] NCR Database Dump ({datetime.now().strftime('%Y-%m-%d')})"
|
body = f"""
|
Dear Manager,
|
|
Attached is the automated JSON backup of the NCR database.
|
This backup was triggered because the manual backup is overdue.
|
|
Record Count: {len(data)}
|
|
NCR Online System Auto-Sender
|
"""
|
|
msg = MIMEMultipart()
|
msg['From'] = cfg['sender']
|
msg['To'] = ", ".join(cfg['cc'])
|
msg['Subject'] = subject
|
msg.attach(MIMEText(body, 'plain'))
|
|
# 3. 製作附件
|
try:
|
attachment = MIMEApplication(json_str.encode('utf-8'), Name=filename)
|
attachment['Content-Disposition'] = f'attachment; filename="{filename}"'
|
msg.attach(attachment)
|
except Exception as e:
|
print(f"Failed to attach file: {str(e)}")
|
return
|
|
# 4. 寄送
|
try:
|
server = smtplib.SMTP(cfg['server_ip'], cfg['port'])
|
if cfg['user'] and cfg['password']:
|
server.login(cfg['user'], cfg['password'])
|
|
server.sendmail(cfg['sender'], cfg['cc'], msg.as_string())
|
server.quit()
|
print(f"Backup attachment sent to CC list: {cfg['cc']}")
|
log_activity("SYSTEM", "AUTO_BACKUP_SENT_TO_CC", "127.0.0.1")
|
except Exception as e:
|
print(f"Failed to send backup email: {str(e)}")
|
|
def check_backup_status_job():
|
"""排程任務:檢查上次備份時間"""
|
print(f"Running daily backup check at {datetime.now()}")
|
conn = get_db()
|
cur = conn.cursor()
|
|
try:
|
cur.execute("SELECT value FROM system_info WHERE key='last_backup'")
|
row = cur.fetchone()
|
|
should_alert = False
|
days_diff = 0
|
last_backup_str = "Never"
|
|
if row and row['value']:
|
last_backup_str = row['value']
|
try:
|
last_dt = datetime.strptime(last_backup_str, '%Y/%m/%d %H:%M')
|
diff = datetime.now() - last_dt
|
days_diff = diff.days
|
if days_diff >= BACKUP_THRESHOLD_DAYS:
|
should_alert = True
|
except ValueError:
|
should_alert = True
|
days_diff = "Unknown"
|
else:
|
should_alert = True
|
days_diff = "Infinity"
|
|
if should_alert:
|
print(f"Backup overdue ({days_diff} days). Triggering alerts...")
|
send_alert_email(days_diff, last_backup_str)
|
send_backup_email_to_cc()
|
else:
|
print(f"Backup status OK. Last backup: {days_diff} days ago.")
|
|
except Exception as e:
|
print(f"Error in backup check job: {e}")
|
finally:
|
conn.close()
|
|
def run_scheduler():
|
"""執行排程的迴圈 (獨立執行緒)"""
|
check_time = MAIL_CONFIG.get('check_time', '08:00')
|
print(f"Scheduler scheduled for {check_time} daily.")
|
|
schedule.every().day.at(check_time).do(check_backup_status_job)
|
|
while True:
|
schedule.run_pending()
|
time.sleep(60)
|
|
# ==========================================
|
# 5. API 路由
|
# ==========================================
|
|
@app.route('/')
|
def root(): return "<h1>403 Forbidden</h1>", 403
|
|
@app.route('/login.html')
|
def login_page(): return render_template('login.html')
|
|
@app.route('/detail.html')
|
def detail_page(): return render_template('detail.html')
|
|
@app.route('/editor_manual.html')
|
def editor_manual_page(): return render_template('editor_manual.html')
|
|
@app.route('/api/my_role')
|
def my_role():
|
user_ip = request.remote_addr
|
if check_ip_in_list(user_ip, PERMISSIONS_CONFIG['ADMIN']):
|
return jsonify({'role': 'ADMIN', 'ip': user_ip})
|
if check_ip_in_list(user_ip, PERMISSIONS_CONFIG['EDITOR']):
|
return jsonify({'role': 'EDITOR', 'ip': user_ip})
|
|
viewer_config = PERMISSIONS_CONFIG.get('VIEWER', [])
|
is_viewer = False
|
if '*' in viewer_config: is_viewer = True
|
elif check_ip_in_list(user_ip, viewer_config): is_viewer = True
|
|
if is_viewer: return jsonify({'role': 'VIEWER', 'ip': user_ip})
|
return jsonify({'role': 'GUEST', 'ip': user_ip})
|
|
@app.route('/api/history')
|
def get_history():
|
conn = get_db()
|
cur = conn.cursor()
|
cur.execute("SELECT * FROM records ORDER BY timestamp DESC")
|
rows = cur.fetchall()
|
conn.close()
|
data = [dict(row) for row in rows]
|
for item in data:
|
try: item['responsibleUnit'] = json.loads(item['responsibleUnit'])
|
except: item['responsibleUnit'] = []
|
return jsonify(data)
|
|
@app.route('/api/record/<unique_id>')
|
def get_single_record(unique_id):
|
conn = get_db()
|
cur = conn.cursor()
|
cur.execute("SELECT * FROM records WHERE uniqueId = ?", (unique_id,))
|
row = cur.fetchone()
|
conn.close()
|
if row:
|
item = dict(row)
|
try: item['responsibleUnit'] = json.loads(item['responsibleUnit'])
|
except: item['responsibleUnit'] = []
|
return jsonify(item)
|
return jsonify({'error': 'Not found'}), 404
|
|
@app.route('/api/log_login', methods=['POST'])
|
def log_login_endpoint():
|
log_activity(request.json.get('user', 'Unknown'), "LOGIN")
|
return jsonify({'success': True})
|
|
@app.route('/api/logout', methods=['POST'])
|
def logout():
|
log_activity(request.json.get('user', 'Unknown'), "LOGOUT")
|
return jsonify({'success': True})
|
|
@app.route('/api/save', methods=['POST'])
|
@require_role('ADMIN')
|
def save_record():
|
data = request.json
|
cat = data['category']
|
site = data['site']
|
ym = data['yearMonth'].replace('-', '')
|
|
base_data = {
|
'customName': data['customName'],
|
'status': "Open",
|
'remarks': "",
|
'responsibleUnit': json.dumps([]),
|
'recurring': data.get('recurring', 'N'),
|
'customerCode': data.get('customerCode', ''),
|
'complaintId': "",
|
'model': data.get('model', ''),
|
'fileDate': data.get('fileDate', ''),
|
'penaltyDate': "", 'gracePeriod': 0, 'stopAlert': 'N',
|
'returnQty': 0, 'shippedQty': 0, 'defectThreshold': 0.0,
|
'closingDate': "", 'timestamp': datetime.now().isoformat(),
|
'exceptionDesc': "", 'severity': "", 'handlingDesc': "",
|
'issueType': "", 'source': "", 'issuer': "",
|
'respUnitType': "", 'respUnitName': "", 'respPerson': "",
|
'rootCause': "", 'correctiveAction': "", 'trackingResult': "",
|
'version': 0, 'filePath': ""
|
}
|
|
max_retries = 5
|
for attempt in range(max_retries):
|
conn = get_db()
|
cur = conn.cursor()
|
try:
|
prefix = f"{cat}-{site}{ym}"
|
cur.execute("SELECT uniqueId FROM records WHERE uniqueId LIKE ? ORDER BY uniqueId DESC LIMIT 1", (prefix + '%',))
|
row = cur.fetchone()
|
|
current_max = 0
|
if row:
|
last_id = row[0]
|
try:
|
suffix = last_id[len(prefix):]
|
if '-' in suffix: suffix = suffix.split('-')[0]
|
current_max = int(suffix)
|
except: current_max = 0
|
|
new_seq_num = current_max + 1
|
new_seq_str = str(new_seq_num).zfill(3)
|
unique_id = f"{prefix}{new_seq_str}"
|
|
parts = [unique_id, base_data['model'], base_data['customName'], base_data['customerCode'], base_data['fileDate']]
|
full_filename = '-'.join(filter(None, parts))
|
|
cur.execute('''INSERT INTO records
|
(uniqueId, fullFileName, category, site, yearMonth, sequence, customName, status, remarks, timestamp, responsibleUnit,
|
recurring, customerCode, complaintId, model, penaltyDate, gracePeriod, stopAlert, returnQty, shippedQty, defectThreshold, closingDate,
|
exceptionDesc, severity, handlingDesc, issueType, source, issuer, respUnitType, respUnitName, respPerson, rootCause, correctiveAction, trackingResult, fileDate, version, filePath)
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''',
|
(unique_id, full_filename, cat, site, ym, new_seq_str,
|
base_data['customName'], base_data['status'], base_data['remarks'], base_data['timestamp'],
|
base_data['responsibleUnit'], base_data['recurring'], base_data['customerCode'],
|
base_data['complaintId'], base_data['model'], base_data['penaltyDate'],
|
base_data['gracePeriod'], base_data['stopAlert'], base_data['returnQty'],
|
base_data['shippedQty'], base_data['defectThreshold'], base_data['closingDate'],
|
base_data['exceptionDesc'], base_data['severity'], base_data['handlingDesc'],
|
base_data['issueType'], base_data['source'], base_data['issuer'],
|
base_data['respUnitType'], base_data['respUnitName'], base_data['respPerson'],
|
base_data['rootCause'], base_data['correctiveAction'], base_data['trackingResult'], base_data['fileDate'], base_data['version'], base_data['filePath']))
|
|
conn.commit()
|
conn.close()
|
return jsonify({'success': True, 'savedId': unique_id, 'savedFileName': full_filename})
|
|
except sqlite3.IntegrityError:
|
conn.close()
|
if attempt < max_retries - 1:
|
time.sleep(random.uniform(0.1, 0.4))
|
continue
|
else:
|
return jsonify({'success': False, 'msg': 'SYSTEM_BUSY_ID_CONFLICT'}), 409
|
except Exception as e:
|
conn.close()
|
return jsonify({'success': False, 'msg': str(e)}), 500
|
|
@app.route('/api/admin_update', methods=['POST'])
|
@require_role('ADMIN')
|
def admin_update_record():
|
data = request.json
|
old_unique_id = data.get('originalUniqueId')
|
new_unique_id = data.get('newUniqueId')
|
current_version = data.get('version')
|
|
new_model = data.get('model', '').strip()
|
new_custom_name = data.get('customName', '').strip()
|
new_cust_code = data.get('customerCode', '').strip()
|
new_file_date = data.get('fileDate', '').strip()
|
|
if not old_unique_id or not new_unique_id or not new_custom_name:
|
return jsonify({'success': False, 'msg': 'Missing ID or Custom Name'}), 400
|
|
conn = get_db()
|
cur = conn.cursor()
|
try:
|
if old_unique_id != new_unique_id:
|
cur.execute("SELECT 1 FROM records WHERE uniqueId = ?", (new_unique_id,))
|
if cur.fetchone():
|
return jsonify({'success': False, 'msg': 'NEW_ID_ALREADY_EXISTS'}), 409
|
|
parts = [new_unique_id, new_model, new_custom_name, new_cust_code, new_file_date]
|
new_full_filename = '-'.join(filter(None, parts))
|
|
cur.execute('''UPDATE records SET
|
uniqueId=?, model=?, customName=?, customerCode=?, fileDate=?, fullFileName=?,
|
version = version + 1
|
WHERE uniqueId=? AND version=?''',
|
(new_unique_id, new_model, new_custom_name, new_cust_code, new_file_date, new_full_filename, old_unique_id, current_version))
|
|
conn.commit()
|
if cur.rowcount == 0:
|
return jsonify({'success': False, 'msg': 'DATA_CONFLICT'}), 409
|
|
log_activity(f"Admin Update: {old_unique_id} -> {new_unique_id}", "ADMIN_METADATA_UPDATE", request.remote_addr)
|
return jsonify({'success': True, 'newFileName': new_full_filename})
|
|
except Exception as e:
|
return jsonify({'success': False, 'msg': str(e)}), 500
|
finally:
|
conn.close()
|
|
@app.route('/api/update', methods=['POST'])
|
def update_record():
|
data = request.json
|
user_ip = request.remote_addr
|
role = get_permission_level(user_ip)
|
|
if role not in ['ADMIN', 'EDITOR']:
|
return jsonify({'success': False, 'msg': 'PERMISSION_DENIED'}), 403
|
|
conn = get_db()
|
cur = conn.cursor()
|
try:
|
units = data.get('responsibleUnit', [])
|
if isinstance(units, list): units_json = json.dumps(units)
|
else: units_json = units
|
|
current_version = data.get('version')
|
|
cur.execute('''UPDATE records SET
|
status=?, remarks=?, responsibleUnit=?, recurring=?, complaintId=?,
|
penaltyDate=?, gracePeriod=?, stopAlert=?, returnQty=?, shippedQty=?,
|
defectThreshold=?, closingDate=?,
|
exceptionDesc=?, severity=?, handlingDesc=?, issueType=?, source=?,
|
issuer=?, respUnitType=?, respUnitName=?, respPerson=?, rootCause=?, correctiveAction=?, trackingResult=?,
|
version = version + 1
|
WHERE uniqueId=? AND version=?''',
|
(data.get('status'), data.get('remarks'), units_json, data.get('recurring','N'),
|
data.get('complaintId',''), data.get('penaltyDate',''), data.get('gracePeriod',0),
|
data.get('stopAlert','N'), data.get('returnQty',0), data.get('shippedQty',0),
|
data.get('defectThreshold',0.0), data.get('closingDate', ''),
|
data.get('exceptionDesc', ''), data.get('severity', ''), data.get('handlingDesc', ''),
|
data.get('issueType', ''), data.get('source', ''), data.get('issuer', ''),
|
data.get('respUnitType', ''), data.get('respUnitName', ''), data.get('respPerson', ''),
|
data.get('rootCause', ''), data.get('correctiveAction', ''), data.get('trackingResult', ''),
|
data['uniqueId'], current_version))
|
|
conn.commit()
|
if cur.rowcount == 0:
|
return jsonify({'success': False, 'msg': 'DATA_CONFLICT'}), 409
|
|
return jsonify({'success': True})
|
except Exception as e:
|
return jsonify({'success': False, 'msg': str(e)}), 500
|
finally:
|
conn.close()
|
|
@app.route('/api/update_filepath', methods=['POST'])
|
def update_filepath():
|
data = request.json
|
user_ip = request.remote_addr
|
role = get_permission_level(user_ip)
|
|
if role not in ['ADMIN', 'EDITOR']:
|
return jsonify({'success': False, 'msg': 'PERMISSION_DENIED'}), 403
|
|
conn = get_db()
|
cur = conn.cursor()
|
try:
|
unique_id = data.get('uniqueId')
|
file_path = data.get('filePath', '').strip()
|
current_version = data.get('version')
|
|
cur.execute('''UPDATE records SET
|
filePath=?,
|
version = version + 1
|
WHERE uniqueId=? AND version=?''',
|
(file_path, unique_id, current_version))
|
|
conn.commit()
|
if cur.rowcount == 0:
|
return jsonify({'success': False, 'msg': 'DATA_CONFLICT'}), 409
|
|
return jsonify({'success': True})
|
except Exception as e:
|
return jsonify({'success': False, 'msg': str(e)}), 500
|
finally:
|
conn.close()
|
|
@app.route('/api/reset', methods=['POST'])
|
@require_role('ADMIN')
|
def reset_db():
|
conn = get_db()
|
try:
|
conn.execute("DELETE FROM records")
|
conn.commit()
|
return jsonify({'success': True})
|
except Exception as e:
|
return jsonify({'success': False, 'msg': str(e)}), 500
|
finally:
|
conn.close()
|
|
@app.route('/api/backup')
|
def backup_db():
|
conn = get_db()
|
cur = conn.cursor()
|
cur.execute("SELECT * FROM records")
|
data = [dict(row) for row in cur.fetchall()]
|
|
cur.execute("INSERT OR REPLACE INTO system_info (key, value) VALUES ('last_backup', ?)", (datetime.now().strftime('%Y/%m/%d %H:%M'),))
|
conn.commit()
|
conn.close()
|
return Response(json.dumps(data, ensure_ascii=False, indent=2), mimetype='application/json',
|
headers={'Content-Disposition': f'attachment;filename=NCR_Backup_{datetime.now().strftime("%Y%m%d")}.json'})
|
|
@app.route('/api/last_backup')
|
def get_last_backup():
|
conn = get_db()
|
cur = conn.cursor()
|
cur.execute("SELECT value FROM system_info WHERE key='last_backup'")
|
row = cur.fetchone()
|
conn.close()
|
return jsonify({'time': row['value'] if row else "Never"})
|
|
@app.route('/api/restore', methods=['POST'])
|
@require_role('ADMIN')
|
def restore_db():
|
if 'file' not in request.files: return jsonify({'success': False, 'msg': 'No file'}), 400
|
try:
|
data = json.load(request.files['file'])
|
conn = get_db()
|
cur = conn.cursor()
|
success = 0
|
skip = 0
|
date_pattern = re.compile(r'^\d{8}$')
|
|
for row in data:
|
if cur.execute("SELECT 1 FROM records WHERE uniqueId = ?", (row['uniqueId'],)).fetchone():
|
skip += 1; continue
|
|
f_date = row.get('fileDate', '')
|
if not f_date and row.get('fullFileName'):
|
parts = row['fullFileName'].split('-')
|
if parts:
|
last_part = parts[-1].strip()
|
if date_pattern.match(last_part):
|
f_date = last_part
|
|
vals = (
|
row['uniqueId'], row['fullFileName'], row['category'], row['site'],
|
row['yearMonth'], row['sequence'], row['customName'], row['status'],
|
row['remarks'], row['timestamp'], row['responsibleUnit'],
|
row.get('recurring', 'N'), row.get('customerCode', ''), row.get('complaintId', ''),
|
row.get('model', ''), row.get('penaltyDate', ''), row.get('gracePeriod', 0),
|
row.get('stopAlert', 'N'), row.get('returnQty', 0), row.get('shippedQty', 0),
|
row.get('defectThreshold', 0.0), row.get('closingDate', ''),
|
row.get('exceptionDesc', ''), row.get('severity', ''), row.get('handlingDesc', ''),
|
row.get('issueType', ''), row.get('source', ''), row.get('issuer', ''),
|
row.get('respUnitType', ''), row.get('respUnitName', ''), row.get('respPerson', ''),
|
row.get('rootCause', ''), row.get('correctiveAction', ''), row.get('trackingResult', ''),
|
f_date,
|
row.get('version', 0),
|
row.get('filePath', '')
|
)
|
|
cur.execute('''INSERT INTO records
|
(uniqueId, fullFileName, category, site, yearMonth, sequence, customName, status, remarks, timestamp, responsibleUnit,
|
recurring, customerCode, complaintId, model, penaltyDate, gracePeriod, stopAlert, returnQty, shippedQty, defectThreshold, closingDate,
|
exceptionDesc, severity, handlingDesc, issueType, source, issuer, respUnitType, respUnitName, respPerson, rootCause, correctiveAction, trackingResult, fileDate, version, filePath)
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''', vals)
|
success += 1
|
conn.commit()
|
conn.close()
|
return jsonify({'success': True, 'msg': 'RESTORE_OK', 's': success, 'k': skip})
|
except Exception as e: return jsonify({'success': False, 'msg': str(e)}), 500
|
|
@app.route('/api/export_logs')
|
def export_logs():
|
conn = get_db()
|
cur = conn.cursor()
|
cur.execute("SELECT * FROM access_logs ORDER BY timestamp DESC")
|
rows = cur.fetchall()
|
conn.close()
|
si = io.StringIO()
|
si.write('\ufeff')
|
writer = csv.writer(si)
|
writer.writerow(['ID', 'User', 'Action', 'IP Address', 'Time'])
|
for row in rows: writer.writerow([row[0], row[1], row[2], row[3], row[4]])
|
return Response(si.getvalue(), mimetype="text/csv", headers={"Content-Disposition": "attachment;filename=Logs.csv"})
|
|
CSV_HEADERS_MAP = {
|
'en': {
|
'stats_title': 'Statistics Summary', 'stats_total': 'Total Records',
|
'stats_status': '--- Status Statistics ---', 'stats_unit': '--- Responsible Unit (Dept) ---',
|
'stats_supplier': '--- Supplier ---', 'stats_model': '--- Model ---', 'stats_cust': '--- Customer Code ---',
|
'cols': ['Filename', 'Status', 'Resp. Unit', 'Model', 'Cust. Code', 'Complaint ID', 'Defect Rate(%)', 'Penalty Date', 'Recurring', 'Closing Date', 'Created Time', 'Remarks', 'Issue Type', 'Source', 'MA/MI', 'Issuer', 'Exception Description', 'Handling Description', 'Resp. Unit Type', 'Resp. Unit Name', 'Resp. Person', 'Root Cause Analysis', 'Corrective Action', 'Response / Tracking']
|
},
|
'tw': {
|
'stats_title': '統計摘要', 'stats_total': '總筆數',
|
'stats_status': '--- 狀態統計 (開立/結案/作廢) ---', 'stats_unit': '--- 責任單位 (部門) ---',
|
'stats_supplier': '--- 供應商 ---', 'stats_model': '--- 型號 ---', 'stats_cust': '--- 客戶代號 ---',
|
'cols': ['完整檔名', '狀態', '責任單位', '型號', '客戶代號', '客訴單號', '不良率(%)', '扣款起算日', '再發', '結案日期', '異常類型', '來源', '嚴重度(MA/MI)', '開單人', '異常說明', '處置說明', '責任單位類別', '責任單位名稱', '負責人', '原因分析', '改善對策', '回覆日期/追蹤結果', '建立時間', '備註']
|
},
|
'cn': {
|
'stats_title': '统计摘要', 'stats_total': '总笔数',
|
'stats_status': '--- 状态统计 (开立/结案/作废) ---', 'stats_unit': '--- 责任单位 (部门) ---',
|
'stats_supplier': '--- 供应商 ---', 'stats_model': '--- 型号 ---', 'stats_cust': '--- 客户代号 ---',
|
'cols': ['完整文件名', '状态', '责任单位', '型号', '客户代号', '客诉单号', '不良率(%)', '扣款起算日', '再发', '结案日期', '异常类型', '来源', '严重度(MA/MI)', '开单人', '异常说明', '处置说明', '责任单位类别', '责任单位名称', '负责人', '原因分析', '改善对策', '回复日期/追踪结果', '建立时间', '备注']
|
},
|
'vi': {
|
'stats_title': 'Tóm tắt thống kê', 'stats_total': 'Tổng số',
|
'stats_status': '--- Trạng thái (Open/Closed/Void) ---', 'stats_unit': '--- Đơn vị chịu trách nhiệm ---',
|
'stats_supplier': '--- Nhà cung cấp ---', 'stats_model': '--- Mô hình ---', 'stats_cust': '--- Khách hàng ---',
|
'cols': ['Tên tệp', 'Trạng thái', 'Đơn vị', 'Mô hình', 'Mã KH', 'Mã khiếu nại', 'Tỷ lệ lỗi(%)', 'Ngày phạt', 'Lặp lại', 'Ngày đóng', 'Loại', 'Nguồn', 'Độ nghiêm trọng', 'Người phát hành', 'Mô tả ngoại lệ', 'Xử lý sản phẩm', 'Loại đơn vị', 'Tên đơn vị', 'Người phụ trách', 'Nguyên nhân gốc rễ', 'Đối sách', 'Ngày phản hồi / Kết quả', 'Thời gian tạo', 'Ghi chú']
|
}
|
}
|
|
VALUE_TRANS_MAP = {
|
'en': { 'status_Open': 'Open', 'status_Closed': 'Closed', 'status_Void': 'Void', 'unit_MGMT': 'MGMT', 'unit_ENG': 'ENG', 'unit_PMC': 'PMC', 'unit_QA': 'QA', 'unit_PC': 'PC', 'unit_MFG': 'MFG', 'unit_PUR': 'PUR', 'unit_RD': 'RD', 'unit_SALES': 'SALES', 'unit_RA': 'Regulatory (RA)', 'prefix_sup': 'Supplier: ' },
|
'tw': { 'status_Open': '開立', 'status_Closed': '結案', 'status_Void': '作廢', 'unit_MGMT': '管理', 'unit_ENG': '工程', 'unit_PMC': '物管', 'unit_QA': '品保', 'unit_PC': '採管', 'unit_MFG': '製造', 'unit_PUR': '採購', 'unit_RD': '研發', 'unit_SALES': '業務', 'unit_RA': '法規', 'prefix_sup': '供應商: ' },
|
'cn': { 'status_Open': '开立', 'status_Closed': '结案', 'status_Void': '作废', 'unit_MGMT': '管理', 'unit_ENG': '工程', 'unit_PMC': '物管', 'unit_QA': '品保', 'unit_PC': '采管', 'unit_MFG': '制造', 'unit_PUR': '采购', 'unit_RD': '研发', 'unit_SALES': '业务', 'unit_RA': '法规', 'prefix_sup': '供应商: ' },
|
'vi': { 'status_Open': 'Mở', 'status_Closed': 'Đóng', 'status_Void': 'Vô hiệu', 'unit_MGMT': 'Quản lý', 'unit_ENG': 'Kỹ thuật', 'unit_PMC': 'QL Vật liệu', 'unit_QA': 'QA', 'unit_PC': 'KS Mua', 'unit_MFG': 'Sản xuất', 'unit_PUR': 'Mua hàng', 'unit_RD': 'R&D', 'unit_SALES': 'Kinh doanh', 'unit_RA': 'Pháp quy', 'prefix_sup': 'NCC: ' }
|
}
|
|
@app.route('/api/export_csv')
|
def export_csv():
|
try:
|
conn = get_db()
|
conn.row_factory = sqlite3.Row
|
cur = conn.cursor()
|
|
lang = request.args.get('lang', 'en')
|
headers_map = CSV_HEADERS_MAP.get(lang, CSV_HEADERS_MAP['en'])
|
|
query = "SELECT * FROM records WHERE 1=1"
|
params = []
|
|
categories = request.args.get('categories')
|
if categories:
|
query += f" AND category IN ({','.join(['?']*len(categories.split(',')))})"
|
params.extend(categories.split(','))
|
|
sites = request.args.get('sites')
|
if sites:
|
query += f" AND site IN ({','.join(['?']*len(sites.split(',')))})"
|
params.extend(sites.split(','))
|
|
status_filter = request.args.get('status')
|
if status_filter:
|
query += f" AND status IN ({','.join(['?']*len(status_filter.split(',')))})"
|
params.extend(status_filter.split(','))
|
|
start_date = request.args.get('start')
|
if start_date:
|
query += " AND substr(timestamp, 1, 10) >= ?"
|
params.append(start_date)
|
|
end_date = request.args.get('end')
|
if end_date:
|
query += " AND substr(timestamp, 1, 10) <= ?"
|
params.append(end_date)
|
|
search = request.args.get('search')
|
if search:
|
search_pattern = f"%{search}%"
|
query += " AND (fullFileName LIKE ? OR customName LIKE ? OR remarks LIKE ? OR complaintId LIKE ?)"
|
params.extend([search_pattern, search_pattern, search_pattern, search_pattern])
|
|
model = request.args.get('model')
|
if model:
|
query += " AND model LIKE ?"
|
params.append(f"%{model}%")
|
|
cust = request.args.get('cust')
|
if cust:
|
query += " AND customerCode LIKE ?"
|
params.append(f"%{cust}%")
|
|
query += " ORDER BY timestamp DESC"
|
|
cur.execute(query, params)
|
rows = cur.fetchall()
|
|
total_count = len(rows)
|
stats_status = {}
|
stats_dept = {}
|
stats_sup = {}
|
stats_model = {}
|
stats_cust = {}
|
|
for row in rows:
|
st = row['status'] if row['status'] else 'Open'
|
stats_status[st] = stats_status.get(st, 0) + 1
|
|
try:
|
units = json.loads(row['responsibleUnit'])
|
except:
|
units = []
|
if not isinstance(units, list): units = []
|
|
for u in units:
|
if u.startswith('SUP:'):
|
sup_name = u.split(':', 1)[1] if ':' in u else u
|
stats_sup[sup_name] = stats_sup.get(sup_name, 0) + 1
|
else:
|
stats_dept[u] = stats_dept.get(u, 0) + 1
|
|
md = row['model']
|
if md: stats_model[md] = stats_model.get(md, 0) + 1
|
|
cc = row['customerCode']
|
if cc: stats_cust[cc] = stats_cust.get(cc, 0) + 1
|
|
si = io.StringIO()
|
si.write('\ufeff')
|
writer = csv.writer(si)
|
|
writer.writerow([headers_map['stats_title']])
|
writer.writerow([headers_map['stats_total'], total_count])
|
writer.writerow([])
|
|
writer.writerow([headers_map['stats_status']])
|
for k, v in stats_status.items():
|
writer.writerow(["", k, v])
|
|
if stats_dept:
|
writer.writerow([headers_map['stats_unit']])
|
for k, v in sorted(stats_dept.items(), key=lambda item: item[1], reverse=True):
|
writer.writerow(["", k, v])
|
|
if stats_sup:
|
writer.writerow([headers_map['stats_supplier']])
|
for k, v in sorted(stats_sup.items(), key=lambda item: item[1], reverse=True):
|
writer.writerow(["", k, v])
|
|
if stats_model:
|
writer.writerow([headers_map['stats_model']])
|
for k, v in sorted(stats_model.items(), key=lambda item: item[1], reverse=True):
|
writer.writerow(["", k, v])
|
|
if stats_cust:
|
writer.writerow([headers_map['stats_cust']])
|
for k, v in sorted(stats_cust.items(), key=lambda item: item[1], reverse=True):
|
writer.writerow(["", k, v])
|
|
writer.writerow([])
|
|
writer.writerow(headers_map['cols'])
|
|
for row in rows:
|
try:
|
units = json.loads(row['responsibleUnit'])
|
unit_str = ", ".join(units) if isinstance(units, list) else str(units)
|
except:
|
unit_str = str(row['responsibleUnit'])
|
|
r_qty = row['returnQty'] or 0
|
s_qty = row['shippedQty'] or 0
|
rate = (r_qty / s_qty * 100) if s_qty > 0 else 0.0
|
|
writer.writerow([
|
row['fullFileName'],
|
row['status'],
|
unit_str,
|
row['model'],
|
row['customerCode'],
|
row['complaintId'],
|
f"{rate:.2f}%",
|
row['penaltyDate'],
|
row['recurring'],
|
row['closingDate'],
|
row['issueType'],
|
row['source'],
|
row['severity'],
|
row['issuer'],
|
row['exceptionDesc'],
|
row['handlingDesc'],
|
row['respUnitType'],
|
row['respUnitName'],
|
row['respPerson'],
|
row['rootCause'],
|
row['correctiveAction'],
|
row['trackingResult'],
|
row['timestamp'],
|
row['remarks'],
|
])
|
|
conn.close()
|
filename = f"NCR_List_{datetime.now().strftime('%Y%m%d')}.csv"
|
return Response(si.getvalue(), mimetype="text/csv", headers={"Content-Disposition": f"attachment;filename={filename}"})
|
|
except Exception as e:
|
return str(e), 500
|
|
@app.route('/api/export_excel')
|
def export_excel():
|
try:
|
conn = get_db()
|
conn.row_factory = sqlite3.Row
|
cur = conn.cursor()
|
|
lang = request.args.get('lang', 'en')
|
headers_map = CSV_HEADERS_MAP.get(lang, CSV_HEADERS_MAP['en'])
|
trans_map = VALUE_TRANS_MAP.get(lang, VALUE_TRANS_MAP['en'])
|
|
query = "SELECT * FROM records WHERE 1=1"
|
params = []
|
|
categories = request.args.get('categories')
|
if categories:
|
query += f" AND category IN ({','.join(['?']*len(categories.split(',')))})"
|
params.extend(categories.split(','))
|
|
sites = request.args.get('sites')
|
if sites:
|
query += f" AND site IN ({','.join(['?']*len(sites.split(',')))})"
|
params.extend(sites.split(','))
|
|
status_filter = request.args.get('status')
|
if status_filter:
|
query += f" AND status IN ({','.join(['?']*len(status_filter.split(',')))})"
|
params.extend(status_filter.split(','))
|
|
start_date = request.args.get('start')
|
if start_date:
|
query += " AND substr(timestamp, 1, 10) >= ?"
|
params.append(start_date)
|
|
end_date = request.args.get('end')
|
if end_date:
|
query += " AND substr(timestamp, 1, 10) <= ?"
|
params.append(end_date)
|
|
search = request.args.get('search')
|
if search:
|
search_pattern = f"%{search}%"
|
query += " AND (fullFileName LIKE ? OR customName LIKE ? OR remarks LIKE ? OR complaintId LIKE ?)"
|
params.extend([search_pattern, search_pattern, search_pattern, search_pattern])
|
|
model = request.args.get('model')
|
if model:
|
query += " AND model LIKE ?"
|
params.append(f"%{model}%")
|
|
cust = request.args.get('cust')
|
if cust:
|
query += " AND customerCode LIKE ?"
|
params.append(f"%{cust}%")
|
|
query += " ORDER BY timestamp DESC"
|
|
cur.execute(query, params)
|
rows = cur.fetchall()
|
|
total_count = len(rows)
|
stats_status = {}
|
stats_dept = {}
|
stats_sup = {}
|
|
for row in rows:
|
raw_st = row['status'] if row['status'] else 'Open'
|
st_key = trans_map.get(f'status_{raw_st}', raw_st)
|
stats_status[st_key] = stats_status.get(st_key, 0) + 1
|
|
try:
|
units = json.loads(row['responsibleUnit'])
|
except:
|
units = []
|
if not isinstance(units, list): units = []
|
|
for u in units:
|
if u.startswith('SUP:'):
|
sup_name = u.split(':', 1)[1] if ':' in u else u
|
stats_sup[sup_name] = stats_sup.get(sup_name, 0) + 1
|
else:
|
dept_name = trans_map.get(f'unit_{u}', u)
|
stats_dept[dept_name] = stats_dept.get(dept_name, 0) + 1
|
|
conn.close()
|
|
wb = Workbook()
|
ws = wb.active
|
ws.title = "NCR List"
|
|
style_center = Alignment(horizontal='center', vertical='center')
|
style_wrap = Alignment(wrap_text=True, vertical='top')
|
style_stat = Alignment(horizontal='left', vertical='center', wrap_text=True)
|
style_header = Alignment(horizontal='center', vertical='center', wrap_text=True)
|
font_title = Font(size=16, bold=True)
|
font_bold = Font(bold=True)
|
border_thin = Border(left=Side(style='thin'), right=Side(style='thin'), top=Side(style='thin'), bottom=Side(style='thin'))
|
fill_header = PatternFill(start_color="D9EAD3", end_color="D9EAD3", fill_type="solid")
|
|
col_count = len(headers_map['cols'])
|
last_col_char = get_column_letter(col_count)
|
|
ws.merge_cells(f'A1:{last_col_char}3')
|
cell_title = ws['A1']
|
cell_title.value = f"NCR Analysis Table ({datetime.now().strftime('%Y-%m-%d')})"
|
cell_title.alignment = style_center
|
cell_title.font = font_title
|
|
current_row = 5
|
stat_base_height = 18
|
|
def set_stat_row(label, text_lines, row_idx):
|
ws.cell(row=row_idx, column=1, value=label).font = font_bold
|
content = "\n".join(text_lines)
|
ws.merge_cells(start_row=row_idx, start_column=2, end_row=row_idx, end_column=6)
|
cell_content = ws.cell(row=row_idx, column=2, value=content)
|
cell_content.alignment = style_stat
|
line_count = len(text_lines) if text_lines else 1
|
ws.row_dimensions[row_idx].height = max(stat_base_height, line_count * 16) + 4
|
|
ws.cell(row=current_row, column=1, value=f"{headers_map['stats_total']}: {total_count}").font = font_bold
|
current_row += 1
|
|
lines = [f"{k}: {v}" for k, v in stats_status.items()]
|
set_stat_row(headers_map['stats_status'], lines, current_row)
|
current_row += 1
|
|
if stats_dept:
|
lines = [f"{k}: {v}" for k, v in sorted(stats_dept.items(), key=lambda x: x[1], reverse=True)]
|
set_stat_row(headers_map['stats_unit'], lines, current_row)
|
current_row += 1
|
|
if stats_sup:
|
sup_prefix = trans_map.get('prefix_sup', 'Supplier: ')
|
lines = [f"{sup_prefix}{k}: {v}" for k, v in sorted(stats_sup.items(), key=lambda x: x[1], reverse=True)]
|
set_stat_row(headers_map['stats_supplier'], lines, current_row)
|
current_row += 1
|
|
current_row += 1
|
|
header_row_num = current_row
|
for idx, col_name in enumerate(headers_map['cols'], 1):
|
cell = ws.cell(row=header_row_num, column=idx, value=col_name)
|
cell.font = font_bold
|
cell.alignment = style_header
|
cell.fill = fill_header
|
cell.border = border_thin
|
|
data_start_row = header_row_num + 1
|
wrap_indices = [14, 15, 19, 20, 21, 23]
|
|
for r_idx, row in enumerate(rows):
|
excel_row = data_start_row + r_idx
|
|
try:
|
raw_units = json.loads(row['responsibleUnit'])
|
except:
|
raw_units = []
|
if not isinstance(raw_units, list): raw_units = []
|
|
display_units = []
|
for u in raw_units:
|
if u.startswith('SUP:'):
|
s_name = u.split(':', 1)[1]
|
display_units.append(f"{trans_map.get('prefix_sup')}{s_name}")
|
else:
|
display_units.append(trans_map.get(f'unit_{u}', u))
|
unit_str = "\n".join(display_units)
|
|
raw_st = row['status'] if row['status'] else 'Open'
|
display_status = trans_map.get(f'status_{raw_st}', raw_st)
|
|
r_qty = row['returnQty'] or 0
|
s_qty = row['shippedQty'] or 0
|
rate = (r_qty / s_qty * 100) if s_qty > 0 else 0.0
|
|
row_data = [
|
row['fullFileName'],
|
display_status,
|
unit_str,
|
row['model'],
|
row['customerCode'],
|
row['complaintId'],
|
f"{rate:.2f}%",
|
row['penaltyDate'],
|
row['recurring'],
|
row['closingDate'],
|
row['issueType'],
|
row['source'],
|
row['severity'],
|
row['issuer'],
|
row['exceptionDesc'],
|
row['handlingDesc'],
|
row['respUnitType'],
|
row['respUnitName'],
|
row['respPerson'],
|
row['rootCause'],
|
row['correctiveAction'],
|
row['trackingResult'],
|
row['timestamp'],
|
row['remarks'],
|
]
|
|
for c_idx, val in enumerate(row_data, 1):
|
cell = ws.cell(row=excel_row, column=c_idx, value=val)
|
cell.border = border_thin
|
|
if (c_idx - 1) in wrap_indices or c_idx == 3:
|
cell.alignment = style_wrap
|
else:
|
cell.alignment = Alignment(vertical='top')
|
|
def get_visual_width(s):
|
w = 0
|
for char in str(s):
|
if ord(char) > 127: w += 1.8
|
else: w += 1.0
|
return w
|
|
for col_idx in range(1, col_count + 1):
|
col_letter = get_column_letter(col_idx)
|
is_wrap_col = (col_idx - 1) in wrap_indices
|
|
if is_wrap_col:
|
ws.column_dimensions[col_letter].width = 50
|
else:
|
max_width = 0
|
|
header_val = ws.cell(row=header_row_num, column=col_idx).value
|
if header_val: max_width = get_visual_width(header_val)
|
|
for r_idx in range(len(rows)):
|
val = ws.cell(row=data_start_row + r_idx, column=col_idx).value
|
if val:
|
str_val = str(val)
|
if col_idx == 3:
|
lines = str_val.split('\n')
|
line_widths = [get_visual_width(line) for line in lines]
|
current_max = max(line_widths, default=0)
|
if current_max > max_width: max_width = current_max
|
else:
|
w = get_visual_width(str_val)
|
if w > max_width: max_width = w
|
|
final_width = max_width + 2
|
|
if col_idx == 3: # 責任單位
|
final_width = min(50, max(15, final_width))
|
elif col_idx == 2: # 狀態
|
final_width = min(20, max(10, final_width)) # 狀態不需太寬
|
else:
|
final_width = min(60, max(10, final_width))
|
|
ws.column_dimensions[col_letter].width = final_width
|
|
output = io.BytesIO()
|
wb.save(output)
|
output.seek(0)
|
|
filename = f"NCR_Analysis_{datetime.now().strftime('%Y%m%d')}.xlsx"
|
return Response(
|
output,
|
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
|
headers={"Content-Disposition": f"attachment;filename={filename}"}
|
)
|
|
except Exception as e:
|
return str(e), 500
|
|
if __name__ == '__main__':
|
# 啟動排程執行緒
|
scheduler_thread = threading.Thread(target=run_scheduler)
|
scheduler_thread.daemon = True
|
scheduler_thread.start()
|
|
print("Scheduler started. Daily check at 08:00.")
|
app.run(host='0.0.0.0', port=5000, debug=False)
|