import os
import sqlite3
import json
import time
import csv
import io
import ipaddress
import random
import re
import threading
import smtplib
import schedule
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication # 新增:用於附件
from functools import wraps
from datetime import datetime, timedelta
from flask import Flask, render_template, request, jsonify, Response
from flask_cors import CORS
from openpyxl import Workbook
from openpyxl.styles import Alignment, Font, Border, Side, PatternFill
from openpyxl.utils import get_column_letter
app = Flask(__name__)
CORS(app)
DB_FILE = 'ncr_data.db'
# ==========================================
# 1. 系統設定區 (請在此手動設定)
# ==========================================
# --- Email 通知設定 ---
# 若 Mail Server 為內部 Relay 且無需驗證,user/password 請留空字串 ""
MAIL_CONFIG = {
'server_ip': '10.192.130.115', # Mail Server IP
'port': 25, # Port (通常 25, 465, 587)
'sender': 'msdn@hlmt.com.tw', # 寄件者 Email
'receivers': ['ra@hlmt.com.tw'], # 主要收件者 (管理員)
'cc': ['msdn@hlmt.com.tw'], # 副本收件者 (主管/備份收件人)
'user': '', # 帳號 (若無則留空)
'password': '', # 密碼 (若無則留空)
'check_time': '08:00' # [新增] 每日檢查時間 (24小時制)
}
# 備份檢查閾值:超過幾天沒備份就寄信
BACKUP_THRESHOLD_DAYS = 30
# --- 權限設定 (RBAC IP白名單) ---
PERMISSIONS_CONFIG = {
'ADMIN': [
'127.0.0.1',
'::1',
'10.192.130.100',
'192.1.0.159',
'10.192.130.48',
'10.192.130.79',
'10.192.130.105',
'10.192.130.150',
'10.192.130.153',
'10.192.130.179',
],
'EDITOR': [
'10.176.5.0/24', # ht
'10.192.130.114',
'192.1.0.64',
'192.1.0.206',
],
'VIEWER': [
'*'
]
}
# ==========================================
# 2. 工具函式 (權限與排程)
# ==========================================
def check_ip_in_list(ip, ip_list):
try:
target_ip = ipaddress.ip_address(ip)
except ValueError:
return False
for rule in ip_list:
try:
if '/' in rule:
if target_ip in ipaddress.ip_network(rule, strict=False):
return True
else:
if target_ip == ipaddress.ip_address(rule):
return True
except ValueError:
continue
return False
def get_permission_level(ip):
if check_ip_in_list(ip, PERMISSIONS_CONFIG['ADMIN']):
return 'ADMIN'
if check_ip_in_list(ip, PERMISSIONS_CONFIG['EDITOR']):
return 'EDITOR'
return 'VIEWER'
def require_role(min_role):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
user_ip = request.remote_addr
role = get_permission_level(user_ip)
allowed = False
if min_role == 'EDITOR':
if role in ['ADMIN', 'EDITOR']:
allowed = True
elif min_role == 'ADMIN':
if role == 'ADMIN':
allowed = True
if not allowed:
return jsonify({
'success': False,
'msg': f'PERMISSION_DENIED: Your IP ({user_ip}) is [{role}], but [{min_role}] is required.'
}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
# ==========================================
# 3. 資料庫管理 (含 WAL 模式與遷移)
# ==========================================
def get_db():
conn = sqlite3.connect(DB_FILE)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL;")
return conn
def init_and_migrate_db():
conn = sqlite3.connect(DB_FILE)
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS records
(uniqueId TEXT PRIMARY KEY, fullFileName TEXT, category TEXT,
site TEXT, yearMonth TEXT, sequence TEXT, customName TEXT,
status TEXT, remarks TEXT, timestamp TEXT, responsibleUnit TEXT)''')
c.execute('''CREATE TABLE IF NOT EXISTS access_logs
(id INTEGER PRIMARY KEY AUTOINCREMENT, user TEXT, action TEXT, ip_address TEXT, timestamp TEXT)''')
c.execute('''CREATE TABLE IF NOT EXISTS system_info
(key TEXT PRIMARY KEY, value TEXT)''')
c.execute("PRAGMA table_info(records)")
existing_columns = [info[1] for info in c.fetchall()]
new_cols = [
'recurring', 'customerCode', 'complaintId', 'model', 'penaltyDate',
'gracePeriod', 'stopAlert', 'returnQty', 'shippedQty', 'defectThreshold',
'closingDate',
'exceptionDesc', 'severity', 'handlingDesc', 'issueType', 'source',
'issuer', 'respUnitType', 'respUnitName', 'respPerson', 'rootCause', 'correctiveAction',
'trackingResult', 'fileDate',
'version',
'filePath' # 關聯檔案路徑
]
for col in new_cols:
col_type = 'INTEGER DEFAULT 0' if col == 'version' else ('REAL' if col == 'defectThreshold' else ('INTEGER' if 'Qty' in col or 'Period' in col else 'TEXT'))
if col not in existing_columns:
print(f"DB Migration: Adding column {col}")
try:
c.execute(f"ALTER TABLE records ADD COLUMN {col} {col_type}")
except Exception as e:
print(f"Migration error for {col}: {e}")
conn.commit()
conn.close()
def log_activity(user, action, ip_addr=None):
if not ip_addr: ip_addr = request.remote_addr or 'SYSTEM'
try:
conn = get_db()
conn.execute("INSERT INTO access_logs (user, action, ip_address, timestamp) VALUES (?, ?, ?, ?)",
(user, action, ip_addr, datetime.now().isoformat()))
conn.commit()
conn.close()
except: pass
init_and_migrate_db()
# ==========================================
# 4. 自動化檢查與郵件通知 (背景任務)
# ==========================================
def send_alert_email(days_diff, last_backup_time):
"""發送備份逾期警報郵件 (給 Admin + CC)"""
cfg = MAIL_CONFIG
if not cfg['server_ip']: return
subject = f"[Alert] NCR System Backup Overdue ({datetime.now().strftime('%Y-%m-%d')})"
body = f"""
Dear Administrator,
The NCR System database has not been backed up for {days_diff} days.
- Last Backup Time: {last_backup_time}
- Threshold: {BACKUP_THRESHOLD_DAYS} days
Please login and perform a backup immediately.
(A separate backup file has been sent to the CC list.)
NCR Online System Auto-Sender
"""
msg = MIMEMultipart()
msg['From'] = cfg['sender']
msg['To'] = ", ".join(cfg['receivers'])
msg['Cc'] = ", ".join(cfg['cc'])
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
all_recipients = cfg['receivers'] + cfg['cc']
try:
server = smtplib.SMTP(cfg['server_ip'], cfg['port'])
if cfg['user'] and cfg['password']:
server.login(cfg['user'], cfg['password'])
server.sendmail(cfg['sender'], all_recipients, msg.as_string())
server.quit()
print(f"Alert email sent to {all_recipients}")
log_activity("SYSTEM", f"AUTO_ALERT_{days_diff}DAYS", "127.0.0.1")
except Exception as e:
print(f"Failed to send alert email: {str(e)}")
def send_backup_email_to_cc():
"""匯出資料庫並以附件形式單獨寄給 CC 設定的信箱"""
cfg = MAIL_CONFIG
if not cfg['server_ip'] or not cfg['cc']:
print("Mail config or CC list empty, skipping backup email.")
return
# 1. 準備資料 (Dump DB to JSON)
try:
conn = get_db()
cur = conn.cursor()
cur.execute("SELECT * FROM records")
data = [dict(row) for row in cur.fetchall()]
conn.close()
json_str = json.dumps(data, ensure_ascii=False, indent=2)
filename = f"NCR_AutoBackup_{datetime.now().strftime('%Y%m%d')}.json"
except Exception as e:
print(f"Failed to generate backup JSON: {str(e)}")
return
# 2. 準備信件
subject = f"[Auto Backup] NCR Database Dump ({datetime.now().strftime('%Y-%m-%d')})"
body = f"""
Dear Manager,
Attached is the automated JSON backup of the NCR database.
This backup was triggered because the manual backup is overdue.
Record Count: {len(data)}
NCR Online System Auto-Sender
"""
msg = MIMEMultipart()
msg['From'] = cfg['sender']
msg['To'] = ", ".join(cfg['cc'])
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
# 3. 製作附件
try:
attachment = MIMEApplication(json_str.encode('utf-8'), Name=filename)
attachment['Content-Disposition'] = f'attachment; filename="{filename}"'
msg.attach(attachment)
except Exception as e:
print(f"Failed to attach file: {str(e)}")
return
# 4. 寄送
try:
server = smtplib.SMTP(cfg['server_ip'], cfg['port'])
if cfg['user'] and cfg['password']:
server.login(cfg['user'], cfg['password'])
server.sendmail(cfg['sender'], cfg['cc'], msg.as_string())
server.quit()
print(f"Backup attachment sent to CC list: {cfg['cc']}")
log_activity("SYSTEM", "AUTO_BACKUP_SENT_TO_CC", "127.0.0.1")
except Exception as e:
print(f"Failed to send backup email: {str(e)}")
def check_backup_status_job():
"""排程任務:檢查上次備份時間"""
print(f"Running daily backup check at {datetime.now()}")
conn = get_db()
cur = conn.cursor()
try:
cur.execute("SELECT value FROM system_info WHERE key='last_backup'")
row = cur.fetchone()
should_alert = False
days_diff = 0
last_backup_str = "Never"
if row and row['value']:
last_backup_str = row['value']
try:
last_dt = datetime.strptime(last_backup_str, '%Y/%m/%d %H:%M')
diff = datetime.now() - last_dt
days_diff = diff.days
if days_diff >= BACKUP_THRESHOLD_DAYS:
should_alert = True
except ValueError:
should_alert = True
days_diff = "Unknown"
else:
should_alert = True
days_diff = "Infinity"
if should_alert:
print(f"Backup overdue ({days_diff} days). Triggering alerts...")
send_alert_email(days_diff, last_backup_str)
send_backup_email_to_cc()
else:
print(f"Backup status OK. Last backup: {days_diff} days ago.")
except Exception as e:
print(f"Error in backup check job: {e}")
finally:
conn.close()
def run_scheduler():
"""執行排程的迴圈 (獨立執行緒)"""
check_time = MAIL_CONFIG.get('check_time', '08:00')
print(f"Scheduler scheduled for {check_time} daily.")
schedule.every().day.at(check_time).do(check_backup_status_job)
while True:
schedule.run_pending()
time.sleep(60)
# ==========================================
# 5. API 路由
# ==========================================
@app.route('/')
def root(): return "
403 Forbidden
", 403
@app.route('/login.html')
def login_page(): return render_template('login.html')
@app.route('/detail.html')
def detail_page(): return render_template('detail.html')
@app.route('/editor_manual.html')
def editor_manual_page(): return render_template('editor_manual.html')
@app.route('/api/my_role')
def my_role():
user_ip = request.remote_addr
if check_ip_in_list(user_ip, PERMISSIONS_CONFIG['ADMIN']):
return jsonify({'role': 'ADMIN', 'ip': user_ip})
if check_ip_in_list(user_ip, PERMISSIONS_CONFIG['EDITOR']):
return jsonify({'role': 'EDITOR', 'ip': user_ip})
viewer_config = PERMISSIONS_CONFIG.get('VIEWER', [])
is_viewer = False
if '*' in viewer_config: is_viewer = True
elif check_ip_in_list(user_ip, viewer_config): is_viewer = True
if is_viewer: return jsonify({'role': 'VIEWER', 'ip': user_ip})
return jsonify({'role': 'GUEST', 'ip': user_ip})
@app.route('/api/history')
def get_history():
conn = get_db()
cur = conn.cursor()
cur.execute("SELECT * FROM records ORDER BY timestamp DESC")
rows = cur.fetchall()
conn.close()
data = [dict(row) for row in rows]
for item in data:
try: item['responsibleUnit'] = json.loads(item['responsibleUnit'])
except: item['responsibleUnit'] = []
return jsonify(data)
@app.route('/api/record/')
def get_single_record(unique_id):
conn = get_db()
cur = conn.cursor()
cur.execute("SELECT * FROM records WHERE uniqueId = ?", (unique_id,))
row = cur.fetchone()
conn.close()
if row:
item = dict(row)
try: item['responsibleUnit'] = json.loads(item['responsibleUnit'])
except: item['responsibleUnit'] = []
return jsonify(item)
return jsonify({'error': 'Not found'}), 404
@app.route('/api/log_login', methods=['POST'])
def log_login_endpoint():
log_activity(request.json.get('user', 'Unknown'), "LOGIN")
return jsonify({'success': True})
@app.route('/api/logout', methods=['POST'])
def logout():
log_activity(request.json.get('user', 'Unknown'), "LOGOUT")
return jsonify({'success': True})
@app.route('/api/save', methods=['POST'])
@require_role('ADMIN')
def save_record():
data = request.json
cat = data['category']
site = data['site']
ym = data['yearMonth'].replace('-', '')
base_data = {
'customName': data['customName'],
'status': "Open",
'remarks': "",
'responsibleUnit': json.dumps([]),
'recurring': data.get('recurring', 'N'),
'customerCode': data.get('customerCode', ''),
'complaintId': "",
'model': data.get('model', ''),
'fileDate': data.get('fileDate', ''),
'penaltyDate': "", 'gracePeriod': 0, 'stopAlert': 'N',
'returnQty': 0, 'shippedQty': 0, 'defectThreshold': 0.0,
'closingDate': "", 'timestamp': datetime.now().isoformat(),
'exceptionDesc': "", 'severity': "", 'handlingDesc': "",
'issueType': "", 'source': "", 'issuer': "",
'respUnitType': "", 'respUnitName': "", 'respPerson': "",
'rootCause': "", 'correctiveAction': "", 'trackingResult': "",
'version': 0, 'filePath': ""
}
max_retries = 5
for attempt in range(max_retries):
conn = get_db()
cur = conn.cursor()
try:
prefix = f"{cat}-{site}{ym}"
cur.execute("SELECT uniqueId FROM records WHERE uniqueId LIKE ? ORDER BY uniqueId DESC LIMIT 1", (prefix + '%',))
row = cur.fetchone()
current_max = 0
if row:
last_id = row[0]
try:
suffix = last_id[len(prefix):]
if '-' in suffix: suffix = suffix.split('-')[0]
current_max = int(suffix)
except: current_max = 0
new_seq_num = current_max + 1
new_seq_str = str(new_seq_num).zfill(3)
unique_id = f"{prefix}{new_seq_str}"
parts = [unique_id, base_data['model'], base_data['customName'], base_data['customerCode'], base_data['fileDate']]
full_filename = '-'.join(filter(None, parts))
cur.execute('''INSERT INTO records
(uniqueId, fullFileName, category, site, yearMonth, sequence, customName, status, remarks, timestamp, responsibleUnit,
recurring, customerCode, complaintId, model, penaltyDate, gracePeriod, stopAlert, returnQty, shippedQty, defectThreshold, closingDate,
exceptionDesc, severity, handlingDesc, issueType, source, issuer, respUnitType, respUnitName, respPerson, rootCause, correctiveAction, trackingResult, fileDate, version, filePath)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''',
(unique_id, full_filename, cat, site, ym, new_seq_str,
base_data['customName'], base_data['status'], base_data['remarks'], base_data['timestamp'],
base_data['responsibleUnit'], base_data['recurring'], base_data['customerCode'],
base_data['complaintId'], base_data['model'], base_data['penaltyDate'],
base_data['gracePeriod'], base_data['stopAlert'], base_data['returnQty'],
base_data['shippedQty'], base_data['defectThreshold'], base_data['closingDate'],
base_data['exceptionDesc'], base_data['severity'], base_data['handlingDesc'],
base_data['issueType'], base_data['source'], base_data['issuer'],
base_data['respUnitType'], base_data['respUnitName'], base_data['respPerson'],
base_data['rootCause'], base_data['correctiveAction'], base_data['trackingResult'], base_data['fileDate'], base_data['version'], base_data['filePath']))
conn.commit()
conn.close()
return jsonify({'success': True, 'savedId': unique_id, 'savedFileName': full_filename})
except sqlite3.IntegrityError:
conn.close()
if attempt < max_retries - 1:
time.sleep(random.uniform(0.1, 0.4))
continue
else:
return jsonify({'success': False, 'msg': 'SYSTEM_BUSY_ID_CONFLICT'}), 409
except Exception as e:
conn.close()
return jsonify({'success': False, 'msg': str(e)}), 500
@app.route('/api/admin_update', methods=['POST'])
@require_role('ADMIN')
def admin_update_record():
data = request.json
old_unique_id = data.get('originalUniqueId')
new_unique_id = data.get('newUniqueId')
current_version = data.get('version')
new_model = data.get('model', '').strip()
new_custom_name = data.get('customName', '').strip()
new_cust_code = data.get('customerCode', '').strip()
new_file_date = data.get('fileDate', '').strip()
if not old_unique_id or not new_unique_id or not new_custom_name:
return jsonify({'success': False, 'msg': 'Missing ID or Custom Name'}), 400
conn = get_db()
cur = conn.cursor()
try:
if old_unique_id != new_unique_id:
cur.execute("SELECT 1 FROM records WHERE uniqueId = ?", (new_unique_id,))
if cur.fetchone():
return jsonify({'success': False, 'msg': 'NEW_ID_ALREADY_EXISTS'}), 409
parts = [new_unique_id, new_model, new_custom_name, new_cust_code, new_file_date]
new_full_filename = '-'.join(filter(None, parts))
cur.execute('''UPDATE records SET
uniqueId=?, model=?, customName=?, customerCode=?, fileDate=?, fullFileName=?,
version = version + 1
WHERE uniqueId=? AND version=?''',
(new_unique_id, new_model, new_custom_name, new_cust_code, new_file_date, new_full_filename, old_unique_id, current_version))
conn.commit()
if cur.rowcount == 0:
return jsonify({'success': False, 'msg': 'DATA_CONFLICT'}), 409
log_activity(f"Admin Update: {old_unique_id} -> {new_unique_id}", "ADMIN_METADATA_UPDATE", request.remote_addr)
return jsonify({'success': True, 'newFileName': new_full_filename})
except Exception as e:
return jsonify({'success': False, 'msg': str(e)}), 500
finally:
conn.close()
@app.route('/api/update', methods=['POST'])
def update_record():
data = request.json
user_ip = request.remote_addr
role = get_permission_level(user_ip)
if role not in ['ADMIN', 'EDITOR']:
return jsonify({'success': False, 'msg': 'PERMISSION_DENIED'}), 403
conn = get_db()
cur = conn.cursor()
try:
units = data.get('responsibleUnit', [])
if isinstance(units, list): units_json = json.dumps(units)
else: units_json = units
current_version = data.get('version')
cur.execute('''UPDATE records SET
status=?, remarks=?, responsibleUnit=?, recurring=?, complaintId=?,
penaltyDate=?, gracePeriod=?, stopAlert=?, returnQty=?, shippedQty=?,
defectThreshold=?, closingDate=?,
exceptionDesc=?, severity=?, handlingDesc=?, issueType=?, source=?,
issuer=?, respUnitType=?, respUnitName=?, respPerson=?, rootCause=?, correctiveAction=?, trackingResult=?,
version = version + 1
WHERE uniqueId=? AND version=?''',
(data.get('status'), data.get('remarks'), units_json, data.get('recurring','N'),
data.get('complaintId',''), data.get('penaltyDate',''), data.get('gracePeriod',0),
data.get('stopAlert','N'), data.get('returnQty',0), data.get('shippedQty',0),
data.get('defectThreshold',0.0), data.get('closingDate', ''),
data.get('exceptionDesc', ''), data.get('severity', ''), data.get('handlingDesc', ''),
data.get('issueType', ''), data.get('source', ''), data.get('issuer', ''),
data.get('respUnitType', ''), data.get('respUnitName', ''), data.get('respPerson', ''),
data.get('rootCause', ''), data.get('correctiveAction', ''), data.get('trackingResult', ''),
data['uniqueId'], current_version))
conn.commit()
if cur.rowcount == 0:
return jsonify({'success': False, 'msg': 'DATA_CONFLICT'}), 409
return jsonify({'success': True})
except Exception as e:
return jsonify({'success': False, 'msg': str(e)}), 500
finally:
conn.close()
@app.route('/api/update_filepath', methods=['POST'])
def update_filepath():
data = request.json
user_ip = request.remote_addr
role = get_permission_level(user_ip)
if role not in ['ADMIN', 'EDITOR']:
return jsonify({'success': False, 'msg': 'PERMISSION_DENIED'}), 403
conn = get_db()
cur = conn.cursor()
try:
unique_id = data.get('uniqueId')
file_path = data.get('filePath', '').strip()
current_version = data.get('version')
cur.execute('''UPDATE records SET
filePath=?,
version = version + 1
WHERE uniqueId=? AND version=?''',
(file_path, unique_id, current_version))
conn.commit()
if cur.rowcount == 0:
return jsonify({'success': False, 'msg': 'DATA_CONFLICT'}), 409
return jsonify({'success': True})
except Exception as e:
return jsonify({'success': False, 'msg': str(e)}), 500
finally:
conn.close()
@app.route('/api/reset', methods=['POST'])
@require_role('ADMIN')
def reset_db():
conn = get_db()
try:
conn.execute("DELETE FROM records")
conn.commit()
return jsonify({'success': True})
except Exception as e:
return jsonify({'success': False, 'msg': str(e)}), 500
finally:
conn.close()
@app.route('/api/backup')
def backup_db():
conn = get_db()
cur = conn.cursor()
cur.execute("SELECT * FROM records")
data = [dict(row) for row in cur.fetchall()]
cur.execute("INSERT OR REPLACE INTO system_info (key, value) VALUES ('last_backup', ?)", (datetime.now().strftime('%Y/%m/%d %H:%M'),))
conn.commit()
conn.close()
return Response(json.dumps(data, ensure_ascii=False, indent=2), mimetype='application/json',
headers={'Content-Disposition': f'attachment;filename=NCR_Backup_{datetime.now().strftime("%Y%m%d")}.json'})
@app.route('/api/last_backup')
def get_last_backup():
conn = get_db()
cur = conn.cursor()
cur.execute("SELECT value FROM system_info WHERE key='last_backup'")
row = cur.fetchone()
conn.close()
return jsonify({'time': row['value'] if row else "Never"})
@app.route('/api/restore', methods=['POST'])
@require_role('ADMIN')
def restore_db():
if 'file' not in request.files: return jsonify({'success': False, 'msg': 'No file'}), 400
try:
data = json.load(request.files['file'])
conn = get_db()
cur = conn.cursor()
success = 0
skip = 0
date_pattern = re.compile(r'^\d{8}$')
for row in data:
if cur.execute("SELECT 1 FROM records WHERE uniqueId = ?", (row['uniqueId'],)).fetchone():
skip += 1; continue
f_date = row.get('fileDate', '')
if not f_date and row.get('fullFileName'):
parts = row['fullFileName'].split('-')
if parts:
last_part = parts[-1].strip()
if date_pattern.match(last_part):
f_date = last_part
vals = (
row['uniqueId'], row['fullFileName'], row['category'], row['site'],
row['yearMonth'], row['sequence'], row['customName'], row['status'],
row['remarks'], row['timestamp'], row['responsibleUnit'],
row.get('recurring', 'N'), row.get('customerCode', ''), row.get('complaintId', ''),
row.get('model', ''), row.get('penaltyDate', ''), row.get('gracePeriod', 0),
row.get('stopAlert', 'N'), row.get('returnQty', 0), row.get('shippedQty', 0),
row.get('defectThreshold', 0.0), row.get('closingDate', ''),
row.get('exceptionDesc', ''), row.get('severity', ''), row.get('handlingDesc', ''),
row.get('issueType', ''), row.get('source', ''), row.get('issuer', ''),
row.get('respUnitType', ''), row.get('respUnitName', ''), row.get('respPerson', ''),
row.get('rootCause', ''), row.get('correctiveAction', ''), row.get('trackingResult', ''),
f_date,
row.get('version', 0),
row.get('filePath', '')
)
cur.execute('''INSERT INTO records
(uniqueId, fullFileName, category, site, yearMonth, sequence, customName, status, remarks, timestamp, responsibleUnit,
recurring, customerCode, complaintId, model, penaltyDate, gracePeriod, stopAlert, returnQty, shippedQty, defectThreshold, closingDate,
exceptionDesc, severity, handlingDesc, issueType, source, issuer, respUnitType, respUnitName, respPerson, rootCause, correctiveAction, trackingResult, fileDate, version, filePath)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''', vals)
success += 1
conn.commit()
conn.close()
return jsonify({'success': True, 'msg': 'RESTORE_OK', 's': success, 'k': skip})
except Exception as e: return jsonify({'success': False, 'msg': str(e)}), 500
@app.route('/api/export_logs')
def export_logs():
conn = get_db()
cur = conn.cursor()
cur.execute("SELECT * FROM access_logs ORDER BY timestamp DESC")
rows = cur.fetchall()
conn.close()
si = io.StringIO()
si.write('\ufeff')
writer = csv.writer(si)
writer.writerow(['ID', 'User', 'Action', 'IP Address', 'Time'])
for row in rows: writer.writerow([row[0], row[1], row[2], row[3], row[4]])
return Response(si.getvalue(), mimetype="text/csv", headers={"Content-Disposition": "attachment;filename=Logs.csv"})
CSV_HEADERS_MAP = {
'en': {
'stats_title': 'Statistics Summary', 'stats_total': 'Total Records',
'stats_status': '--- Status Statistics ---', 'stats_unit': '--- Responsible Unit (Dept) ---',
'stats_supplier': '--- Supplier ---', 'stats_model': '--- Model ---', 'stats_cust': '--- Customer Code ---',
'cols': ['Filename', 'Status', 'Resp. Unit', 'Model', 'Cust. Code', 'Complaint ID', 'Defect Rate(%)', 'Penalty Date', 'Recurring', 'Closing Date', 'Created Time', 'Remarks', 'Issue Type', 'Source', 'MA/MI', 'Issuer', 'Exception Description', 'Handling Description', 'Resp. Unit Type', 'Resp. Unit Name', 'Resp. Person', 'Root Cause Analysis', 'Corrective Action', 'Response / Tracking']
},
'tw': {
'stats_title': '統計摘要', 'stats_total': '總筆數',
'stats_status': '--- 狀態統計 (開立/結案/作廢) ---', 'stats_unit': '--- 責任單位 (部門) ---',
'stats_supplier': '--- 供應商 ---', 'stats_model': '--- 型號 ---', 'stats_cust': '--- 客戶代號 ---',
'cols': ['完整檔名', '狀態', '責任單位', '型號', '客戶代號', '客訴單號', '不良率(%)', '扣款起算日', '再發', '結案日期', '異常類型', '來源', '嚴重度(MA/MI)', '開單人', '異常說明', '處置說明', '責任單位類別', '責任單位名稱', '負責人', '原因分析', '改善對策', '回覆日期/追蹤結果', '建立時間', '備註']
},
'cn': {
'stats_title': '统计摘要', 'stats_total': '总笔数',
'stats_status': '--- 状态统计 (开立/结案/作废) ---', 'stats_unit': '--- 责任单位 (部门) ---',
'stats_supplier': '--- 供应商 ---', 'stats_model': '--- 型号 ---', 'stats_cust': '--- 客户代号 ---',
'cols': ['完整文件名', '状态', '责任单位', '型号', '客户代号', '客诉单号', '不良率(%)', '扣款起算日', '再发', '结案日期', '异常类型', '来源', '严重度(MA/MI)', '开单人', '异常说明', '处置说明', '责任单位类别', '责任单位名称', '负责人', '原因分析', '改善对策', '回复日期/追踪结果', '建立时间', '备注']
},
'vi': {
'stats_title': 'Tóm tắt thống kê', 'stats_total': 'Tổng số',
'stats_status': '--- Trạng thái (Open/Closed/Void) ---', 'stats_unit': '--- Đơn vị chịu trách nhiệm ---',
'stats_supplier': '--- Nhà cung cấp ---', 'stats_model': '--- Mô hình ---', 'stats_cust': '--- Khách hàng ---',
'cols': ['Tên tệp', 'Trạng thái', 'Đơn vị', 'Mô hình', 'Mã KH', 'Mã khiếu nại', 'Tỷ lệ lỗi(%)', 'Ngày phạt', 'Lặp lại', 'Ngày đóng', 'Loại', 'Nguồn', 'Độ nghiêm trọng', 'Người phát hành', 'Mô tả ngoại lệ', 'Xử lý sản phẩm', 'Loại đơn vị', 'Tên đơn vị', 'Người phụ trách', 'Nguyên nhân gốc rễ', 'Đối sách', 'Ngày phản hồi / Kết quả', 'Thời gian tạo', 'Ghi chú']
}
}
VALUE_TRANS_MAP = {
'en': { 'status_Open': 'Open', 'status_Closed': 'Closed', 'status_Void': 'Void', 'unit_MGMT': 'MGMT', 'unit_ENG': 'ENG', 'unit_PMC': 'PMC', 'unit_QA': 'QA', 'unit_PC': 'PC', 'unit_MFG': 'MFG', 'unit_PUR': 'PUR', 'unit_RD': 'RD', 'unit_SALES': 'SALES', 'unit_RA': 'Regulatory (RA)', 'prefix_sup': 'Supplier: ' },
'tw': { 'status_Open': '開立', 'status_Closed': '結案', 'status_Void': '作廢', 'unit_MGMT': '管理', 'unit_ENG': '工程', 'unit_PMC': '物管', 'unit_QA': '品保', 'unit_PC': '採管', 'unit_MFG': '製造', 'unit_PUR': '採購', 'unit_RD': '研發', 'unit_SALES': '業務', 'unit_RA': '法規', 'prefix_sup': '供應商: ' },
'cn': { 'status_Open': '开立', 'status_Closed': '结案', 'status_Void': '作废', 'unit_MGMT': '管理', 'unit_ENG': '工程', 'unit_PMC': '物管', 'unit_QA': '品保', 'unit_PC': '采管', 'unit_MFG': '制造', 'unit_PUR': '采购', 'unit_RD': '研发', 'unit_SALES': '业务', 'unit_RA': '法规', 'prefix_sup': '供应商: ' },
'vi': { 'status_Open': 'Mở', 'status_Closed': 'Đóng', 'status_Void': 'Vô hiệu', 'unit_MGMT': 'Quản lý', 'unit_ENG': 'Kỹ thuật', 'unit_PMC': 'QL Vật liệu', 'unit_QA': 'QA', 'unit_PC': 'KS Mua', 'unit_MFG': 'Sản xuất', 'unit_PUR': 'Mua hàng', 'unit_RD': 'R&D', 'unit_SALES': 'Kinh doanh', 'unit_RA': 'Pháp quy', 'prefix_sup': 'NCC: ' }
}
@app.route('/api/export_csv')
def export_csv():
try:
conn = get_db()
conn.row_factory = sqlite3.Row
cur = conn.cursor()
lang = request.args.get('lang', 'en')
headers_map = CSV_HEADERS_MAP.get(lang, CSV_HEADERS_MAP['en'])
query = "SELECT * FROM records WHERE 1=1"
params = []
categories = request.args.get('categories')
if categories:
query += f" AND category IN ({','.join(['?']*len(categories.split(',')))})"
params.extend(categories.split(','))
sites = request.args.get('sites')
if sites:
query += f" AND site IN ({','.join(['?']*len(sites.split(',')))})"
params.extend(sites.split(','))
status_filter = request.args.get('status')
if status_filter:
query += f" AND status IN ({','.join(['?']*len(status_filter.split(',')))})"
params.extend(status_filter.split(','))
start_date = request.args.get('start')
if start_date:
query += " AND substr(timestamp, 1, 10) >= ?"
params.append(start_date)
end_date = request.args.get('end')
if end_date:
query += " AND substr(timestamp, 1, 10) <= ?"
params.append(end_date)
search = request.args.get('search')
if search:
search_pattern = f"%{search}%"
query += " AND (fullFileName LIKE ? OR customName LIKE ? OR remarks LIKE ? OR complaintId LIKE ?)"
params.extend([search_pattern, search_pattern, search_pattern, search_pattern])
model = request.args.get('model')
if model:
query += " AND model LIKE ?"
params.append(f"%{model}%")
cust = request.args.get('cust')
if cust:
query += " AND customerCode LIKE ?"
params.append(f"%{cust}%")
query += " ORDER BY timestamp DESC"
cur.execute(query, params)
rows = cur.fetchall()
total_count = len(rows)
stats_status = {}
stats_dept = {}
stats_sup = {}
stats_model = {}
stats_cust = {}
for row in rows:
st = row['status'] if row['status'] else 'Open'
stats_status[st] = stats_status.get(st, 0) + 1
try:
units = json.loads(row['responsibleUnit'])
except:
units = []
if not isinstance(units, list): units = []
for u in units:
if u.startswith('SUP:'):
sup_name = u.split(':', 1)[1] if ':' in u else u
stats_sup[sup_name] = stats_sup.get(sup_name, 0) + 1
else:
stats_dept[u] = stats_dept.get(u, 0) + 1
md = row['model']
if md: stats_model[md] = stats_model.get(md, 0) + 1
cc = row['customerCode']
if cc: stats_cust[cc] = stats_cust.get(cc, 0) + 1
si = io.StringIO()
si.write('\ufeff')
writer = csv.writer(si)
writer.writerow([headers_map['stats_title']])
writer.writerow([headers_map['stats_total'], total_count])
writer.writerow([])
writer.writerow([headers_map['stats_status']])
for k, v in stats_status.items():
writer.writerow(["", k, v])
if stats_dept:
writer.writerow([headers_map['stats_unit']])
for k, v in sorted(stats_dept.items(), key=lambda item: item[1], reverse=True):
writer.writerow(["", k, v])
if stats_sup:
writer.writerow([headers_map['stats_supplier']])
for k, v in sorted(stats_sup.items(), key=lambda item: item[1], reverse=True):
writer.writerow(["", k, v])
if stats_model:
writer.writerow([headers_map['stats_model']])
for k, v in sorted(stats_model.items(), key=lambda item: item[1], reverse=True):
writer.writerow(["", k, v])
if stats_cust:
writer.writerow([headers_map['stats_cust']])
for k, v in sorted(stats_cust.items(), key=lambda item: item[1], reverse=True):
writer.writerow(["", k, v])
writer.writerow([])
writer.writerow(headers_map['cols'])
for row in rows:
try:
units = json.loads(row['responsibleUnit'])
unit_str = ", ".join(units) if isinstance(units, list) else str(units)
except:
unit_str = str(row['responsibleUnit'])
r_qty = row['returnQty'] or 0
s_qty = row['shippedQty'] or 0
rate = (r_qty / s_qty * 100) if s_qty > 0 else 0.0
writer.writerow([
row['fullFileName'],
row['status'],
unit_str,
row['model'],
row['customerCode'],
row['complaintId'],
f"{rate:.2f}%",
row['penaltyDate'],
row['recurring'],
row['closingDate'],
row['issueType'],
row['source'],
row['severity'],
row['issuer'],
row['exceptionDesc'],
row['handlingDesc'],
row['respUnitType'],
row['respUnitName'],
row['respPerson'],
row['rootCause'],
row['correctiveAction'],
row['trackingResult'],
row['timestamp'],
row['remarks'],
])
conn.close()
filename = f"NCR_List_{datetime.now().strftime('%Y%m%d')}.csv"
return Response(si.getvalue(), mimetype="text/csv", headers={"Content-Disposition": f"attachment;filename={filename}"})
except Exception as e:
return str(e), 500
@app.route('/api/export_excel')
def export_excel():
try:
conn = get_db()
conn.row_factory = sqlite3.Row
cur = conn.cursor()
lang = request.args.get('lang', 'en')
headers_map = CSV_HEADERS_MAP.get(lang, CSV_HEADERS_MAP['en'])
trans_map = VALUE_TRANS_MAP.get(lang, VALUE_TRANS_MAP['en'])
query = "SELECT * FROM records WHERE 1=1"
params = []
categories = request.args.get('categories')
if categories:
query += f" AND category IN ({','.join(['?']*len(categories.split(',')))})"
params.extend(categories.split(','))
sites = request.args.get('sites')
if sites:
query += f" AND site IN ({','.join(['?']*len(sites.split(',')))})"
params.extend(sites.split(','))
status_filter = request.args.get('status')
if status_filter:
query += f" AND status IN ({','.join(['?']*len(status_filter.split(',')))})"
params.extend(status_filter.split(','))
start_date = request.args.get('start')
if start_date:
query += " AND substr(timestamp, 1, 10) >= ?"
params.append(start_date)
end_date = request.args.get('end')
if end_date:
query += " AND substr(timestamp, 1, 10) <= ?"
params.append(end_date)
search = request.args.get('search')
if search:
search_pattern = f"%{search}%"
query += " AND (fullFileName LIKE ? OR customName LIKE ? OR remarks LIKE ? OR complaintId LIKE ?)"
params.extend([search_pattern, search_pattern, search_pattern, search_pattern])
model = request.args.get('model')
if model:
query += " AND model LIKE ?"
params.append(f"%{model}%")
cust = request.args.get('cust')
if cust:
query += " AND customerCode LIKE ?"
params.append(f"%{cust}%")
query += " ORDER BY timestamp DESC"
cur.execute(query, params)
rows = cur.fetchall()
total_count = len(rows)
stats_status = {}
stats_dept = {}
stats_sup = {}
for row in rows:
raw_st = row['status'] if row['status'] else 'Open'
st_key = trans_map.get(f'status_{raw_st}', raw_st)
stats_status[st_key] = stats_status.get(st_key, 0) + 1
try:
units = json.loads(row['responsibleUnit'])
except:
units = []
if not isinstance(units, list): units = []
for u in units:
if u.startswith('SUP:'):
sup_name = u.split(':', 1)[1] if ':' in u else u
stats_sup[sup_name] = stats_sup.get(sup_name, 0) + 1
else:
dept_name = trans_map.get(f'unit_{u}', u)
stats_dept[dept_name] = stats_dept.get(dept_name, 0) + 1
conn.close()
wb = Workbook()
ws = wb.active
ws.title = "NCR List"
style_center = Alignment(horizontal='center', vertical='center')
style_wrap = Alignment(wrap_text=True, vertical='top')
style_stat = Alignment(horizontal='left', vertical='center', wrap_text=True)
style_header = Alignment(horizontal='center', vertical='center', wrap_text=True)
font_title = Font(size=16, bold=True)
font_bold = Font(bold=True)
border_thin = Border(left=Side(style='thin'), right=Side(style='thin'), top=Side(style='thin'), bottom=Side(style='thin'))
fill_header = PatternFill(start_color="D9EAD3", end_color="D9EAD3", fill_type="solid")
col_count = len(headers_map['cols'])
last_col_char = get_column_letter(col_count)
ws.merge_cells(f'A1:{last_col_char}3')
cell_title = ws['A1']
cell_title.value = f"NCR Analysis Table ({datetime.now().strftime('%Y-%m-%d')})"
cell_title.alignment = style_center
cell_title.font = font_title
current_row = 5
stat_base_height = 18
def set_stat_row(label, text_lines, row_idx):
ws.cell(row=row_idx, column=1, value=label).font = font_bold
content = "\n".join(text_lines)
ws.merge_cells(start_row=row_idx, start_column=2, end_row=row_idx, end_column=6)
cell_content = ws.cell(row=row_idx, column=2, value=content)
cell_content.alignment = style_stat
line_count = len(text_lines) if text_lines else 1
ws.row_dimensions[row_idx].height = max(stat_base_height, line_count * 16) + 4
ws.cell(row=current_row, column=1, value=f"{headers_map['stats_total']}: {total_count}").font = font_bold
current_row += 1
lines = [f"{k}: {v}" for k, v in stats_status.items()]
set_stat_row(headers_map['stats_status'], lines, current_row)
current_row += 1
if stats_dept:
lines = [f"{k}: {v}" for k, v in sorted(stats_dept.items(), key=lambda x: x[1], reverse=True)]
set_stat_row(headers_map['stats_unit'], lines, current_row)
current_row += 1
if stats_sup:
sup_prefix = trans_map.get('prefix_sup', 'Supplier: ')
lines = [f"{sup_prefix}{k}: {v}" for k, v in sorted(stats_sup.items(), key=lambda x: x[1], reverse=True)]
set_stat_row(headers_map['stats_supplier'], lines, current_row)
current_row += 1
current_row += 1
header_row_num = current_row
for idx, col_name in enumerate(headers_map['cols'], 1):
cell = ws.cell(row=header_row_num, column=idx, value=col_name)
cell.font = font_bold
cell.alignment = style_header
cell.fill = fill_header
cell.border = border_thin
data_start_row = header_row_num + 1
wrap_indices = [14, 15, 19, 20, 21, 23]
for r_idx, row in enumerate(rows):
excel_row = data_start_row + r_idx
try:
raw_units = json.loads(row['responsibleUnit'])
except:
raw_units = []
if not isinstance(raw_units, list): raw_units = []
display_units = []
for u in raw_units:
if u.startswith('SUP:'):
s_name = u.split(':', 1)[1]
display_units.append(f"{trans_map.get('prefix_sup')}{s_name}")
else:
display_units.append(trans_map.get(f'unit_{u}', u))
unit_str = "\n".join(display_units)
raw_st = row['status'] if row['status'] else 'Open'
display_status = trans_map.get(f'status_{raw_st}', raw_st)
r_qty = row['returnQty'] or 0
s_qty = row['shippedQty'] or 0
rate = (r_qty / s_qty * 100) if s_qty > 0 else 0.0
row_data = [
row['fullFileName'],
display_status,
unit_str,
row['model'],
row['customerCode'],
row['complaintId'],
f"{rate:.2f}%",
row['penaltyDate'],
row['recurring'],
row['closingDate'],
row['issueType'],
row['source'],
row['severity'],
row['issuer'],
row['exceptionDesc'],
row['handlingDesc'],
row['respUnitType'],
row['respUnitName'],
row['respPerson'],
row['rootCause'],
row['correctiveAction'],
row['trackingResult'],
row['timestamp'],
row['remarks'],
]
for c_idx, val in enumerate(row_data, 1):
cell = ws.cell(row=excel_row, column=c_idx, value=val)
cell.border = border_thin
if (c_idx - 1) in wrap_indices or c_idx == 3:
cell.alignment = style_wrap
else:
cell.alignment = Alignment(vertical='top')
def get_visual_width(s):
w = 0
for char in str(s):
if ord(char) > 127: w += 1.8
else: w += 1.0
return w
for col_idx in range(1, col_count + 1):
col_letter = get_column_letter(col_idx)
is_wrap_col = (col_idx - 1) in wrap_indices
if is_wrap_col:
ws.column_dimensions[col_letter].width = 50
else:
max_width = 0
header_val = ws.cell(row=header_row_num, column=col_idx).value
if header_val: max_width = get_visual_width(header_val)
for r_idx in range(len(rows)):
val = ws.cell(row=data_start_row + r_idx, column=col_idx).value
if val:
str_val = str(val)
if col_idx == 3:
lines = str_val.split('\n')
line_widths = [get_visual_width(line) for line in lines]
current_max = max(line_widths, default=0)
if current_max > max_width: max_width = current_max
else:
w = get_visual_width(str_val)
if w > max_width: max_width = w
final_width = max_width + 2
if col_idx == 3: # 責任單位
final_width = min(50, max(15, final_width))
elif col_idx == 2: # 狀態
final_width = min(20, max(10, final_width)) # 狀態不需太寬
else:
final_width = min(60, max(10, final_width))
ws.column_dimensions[col_letter].width = final_width
output = io.BytesIO()
wb.save(output)
output.seek(0)
filename = f"NCR_Analysis_{datetime.now().strftime('%Y%m%d')}.xlsx"
return Response(
output,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
headers={"Content-Disposition": f"attachment;filename={filename}"}
)
except Exception as e:
return str(e), 500
if __name__ == '__main__':
# 啟動排程執行緒
scheduler_thread = threading.Thread(target=run_scheduler)
scheduler_thread.daemon = True
scheduler_thread.start()
print("Scheduler started. Daily check at 08:00.")
app.run(host='0.0.0.0', port=5000, debug=False)