import os import sqlite3 import json import time import csv import io import ipaddress import random import re import threading import smtplib import schedule from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.application import MIMEApplication # 新增:用於附件 from functools import wraps from datetime import datetime, timedelta from flask import Flask, render_template, request, jsonify, Response from flask_cors import CORS from openpyxl import Workbook from openpyxl.styles import Alignment, Font, Border, Side, PatternFill from openpyxl.utils import get_column_letter app = Flask(__name__) CORS(app) DB_FILE = 'ncr_data.db' # ========================================== # 1. 系統設定區 (請在此手動設定) # ========================================== # --- Email 通知設定 --- # 若 Mail Server 為內部 Relay 且無需驗證,user/password 請留空字串 "" MAIL_CONFIG = { 'server_ip': '10.192.130.115', # Mail Server IP 'port': 25, # Port (通常 25, 465, 587) 'sender': 'msdn@hlmt.com.tw', # 寄件者 Email 'receivers': ['ra@hlmt.com.tw'], # 主要收件者 (管理員) 'cc': ['msdn@hlmt.com.tw'], # 副本收件者 (主管/備份收件人) 'user': '', # 帳號 (若無則留空) 'password': '', # 密碼 (若無則留空) 'check_time': '08:00' # [新增] 每日檢查時間 (24小時制) } # 備份檢查閾值:超過幾天沒備份就寄信 BACKUP_THRESHOLD_DAYS = 30 # --- 權限設定 (RBAC IP白名單) --- PERMISSIONS_CONFIG = { 'ADMIN': [ '127.0.0.1', '::1', '10.192.130.100', '192.1.0.159', '10.192.130.48', '10.192.130.79', '10.192.130.105', '10.192.130.150', '10.192.130.153', '10.192.130.179', ], 'EDITOR': [ '10.176.5.0/24', # ht '10.192.130.114', '192.1.0.64', '192.1.0.206', ], 'VIEWER': [ '*' ] } # ========================================== # 2. 工具函式 (權限與排程) # ========================================== def check_ip_in_list(ip, ip_list): try: target_ip = ipaddress.ip_address(ip) except ValueError: return False for rule in ip_list: try: if '/' in rule: if target_ip in ipaddress.ip_network(rule, strict=False): return True else: if target_ip == ipaddress.ip_address(rule): return True except ValueError: continue return False def get_permission_level(ip): if check_ip_in_list(ip, PERMISSIONS_CONFIG['ADMIN']): return 'ADMIN' if check_ip_in_list(ip, PERMISSIONS_CONFIG['EDITOR']): return 'EDITOR' return 'VIEWER' def require_role(min_role): def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): user_ip = request.remote_addr role = get_permission_level(user_ip) allowed = False if min_role == 'EDITOR': if role in ['ADMIN', 'EDITOR']: allowed = True elif min_role == 'ADMIN': if role == 'ADMIN': allowed = True if not allowed: return jsonify({ 'success': False, 'msg': f'PERMISSION_DENIED: Your IP ({user_ip}) is [{role}], but [{min_role}] is required.' }), 403 return f(*args, **kwargs) return decorated_function return decorator # ========================================== # 3. 資料庫管理 (含 WAL 模式與遷移) # ========================================== def get_db(): conn = sqlite3.connect(DB_FILE) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL;") return conn def init_and_migrate_db(): conn = sqlite3.connect(DB_FILE) c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS records (uniqueId TEXT PRIMARY KEY, fullFileName TEXT, category TEXT, site TEXT, yearMonth TEXT, sequence TEXT, customName TEXT, status TEXT, remarks TEXT, timestamp TEXT, responsibleUnit TEXT)''') c.execute('''CREATE TABLE IF NOT EXISTS access_logs (id INTEGER PRIMARY KEY AUTOINCREMENT, user TEXT, action TEXT, ip_address TEXT, timestamp TEXT)''') c.execute('''CREATE TABLE IF NOT EXISTS system_info (key TEXT PRIMARY KEY, value TEXT)''') c.execute("PRAGMA table_info(records)") existing_columns = [info[1] for info in c.fetchall()] new_cols = [ 'recurring', 'customerCode', 'complaintId', 'model', 'penaltyDate', 'gracePeriod', 'stopAlert', 'returnQty', 'shippedQty', 'defectThreshold', 'closingDate', 'exceptionDesc', 'severity', 'handlingDesc', 'issueType', 'source', 'issuer', 'respUnitType', 'respUnitName', 'respPerson', 'rootCause', 'correctiveAction', 'trackingResult', 'fileDate', 'version', 'filePath' # 關聯檔案路徑 ] for col in new_cols: col_type = 'INTEGER DEFAULT 0' if col == 'version' else ('REAL' if col == 'defectThreshold' else ('INTEGER' if 'Qty' in col or 'Period' in col else 'TEXT')) if col not in existing_columns: print(f"DB Migration: Adding column {col}") try: c.execute(f"ALTER TABLE records ADD COLUMN {col} {col_type}") except Exception as e: print(f"Migration error for {col}: {e}") conn.commit() conn.close() def log_activity(user, action, ip_addr=None): if not ip_addr: ip_addr = request.remote_addr or 'SYSTEM' try: conn = get_db() conn.execute("INSERT INTO access_logs (user, action, ip_address, timestamp) VALUES (?, ?, ?, ?)", (user, action, ip_addr, datetime.now().isoformat())) conn.commit() conn.close() except: pass init_and_migrate_db() # ========================================== # 4. 自動化檢查與郵件通知 (背景任務) # ========================================== def send_alert_email(days_diff, last_backup_time): """發送備份逾期警報郵件 (給 Admin + CC)""" cfg = MAIL_CONFIG if not cfg['server_ip']: return subject = f"[Alert] NCR System Backup Overdue ({datetime.now().strftime('%Y-%m-%d')})" body = f""" Dear Administrator, The NCR System database has not been backed up for {days_diff} days. - Last Backup Time: {last_backup_time} - Threshold: {BACKUP_THRESHOLD_DAYS} days Please login and perform a backup immediately. (A separate backup file has been sent to the CC list.) NCR Online System Auto-Sender """ msg = MIMEMultipart() msg['From'] = cfg['sender'] msg['To'] = ", ".join(cfg['receivers']) msg['Cc'] = ", ".join(cfg['cc']) msg['Subject'] = subject msg.attach(MIMEText(body, 'plain')) all_recipients = cfg['receivers'] + cfg['cc'] try: server = smtplib.SMTP(cfg['server_ip'], cfg['port']) if cfg['user'] and cfg['password']: server.login(cfg['user'], cfg['password']) server.sendmail(cfg['sender'], all_recipients, msg.as_string()) server.quit() print(f"Alert email sent to {all_recipients}") log_activity("SYSTEM", f"AUTO_ALERT_{days_diff}DAYS", "127.0.0.1") except Exception as e: print(f"Failed to send alert email: {str(e)}") def send_backup_email_to_cc(): """匯出資料庫並以附件形式單獨寄給 CC 設定的信箱""" cfg = MAIL_CONFIG if not cfg['server_ip'] or not cfg['cc']: print("Mail config or CC list empty, skipping backup email.") return # 1. 準備資料 (Dump DB to JSON) try: conn = get_db() cur = conn.cursor() cur.execute("SELECT * FROM records") data = [dict(row) for row in cur.fetchall()] conn.close() json_str = json.dumps(data, ensure_ascii=False, indent=2) filename = f"NCR_AutoBackup_{datetime.now().strftime('%Y%m%d')}.json" except Exception as e: print(f"Failed to generate backup JSON: {str(e)}") return # 2. 準備信件 subject = f"[Auto Backup] NCR Database Dump ({datetime.now().strftime('%Y-%m-%d')})" body = f""" Dear Manager, Attached is the automated JSON backup of the NCR database. This backup was triggered because the manual backup is overdue. Record Count: {len(data)} NCR Online System Auto-Sender """ msg = MIMEMultipart() msg['From'] = cfg['sender'] msg['To'] = ", ".join(cfg['cc']) msg['Subject'] = subject msg.attach(MIMEText(body, 'plain')) # 3. 製作附件 try: attachment = MIMEApplication(json_str.encode('utf-8'), Name=filename) attachment['Content-Disposition'] = f'attachment; filename="{filename}"' msg.attach(attachment) except Exception as e: print(f"Failed to attach file: {str(e)}") return # 4. 寄送 try: server = smtplib.SMTP(cfg['server_ip'], cfg['port']) if cfg['user'] and cfg['password']: server.login(cfg['user'], cfg['password']) server.sendmail(cfg['sender'], cfg['cc'], msg.as_string()) server.quit() print(f"Backup attachment sent to CC list: {cfg['cc']}") log_activity("SYSTEM", "AUTO_BACKUP_SENT_TO_CC", "127.0.0.1") except Exception as e: print(f"Failed to send backup email: {str(e)}") def check_backup_status_job(): """排程任務:檢查上次備份時間""" print(f"Running daily backup check at {datetime.now()}") conn = get_db() cur = conn.cursor() try: cur.execute("SELECT value FROM system_info WHERE key='last_backup'") row = cur.fetchone() should_alert = False days_diff = 0 last_backup_str = "Never" if row and row['value']: last_backup_str = row['value'] try: last_dt = datetime.strptime(last_backup_str, '%Y/%m/%d %H:%M') diff = datetime.now() - last_dt days_diff = diff.days if days_diff >= BACKUP_THRESHOLD_DAYS: should_alert = True except ValueError: should_alert = True days_diff = "Unknown" else: should_alert = True days_diff = "Infinity" if should_alert: print(f"Backup overdue ({days_diff} days). Triggering alerts...") send_alert_email(days_diff, last_backup_str) send_backup_email_to_cc() else: print(f"Backup status OK. Last backup: {days_diff} days ago.") except Exception as e: print(f"Error in backup check job: {e}") finally: conn.close() def run_scheduler(): """執行排程的迴圈 (獨立執行緒)""" check_time = MAIL_CONFIG.get('check_time', '08:00') print(f"Scheduler scheduled for {check_time} daily.") schedule.every().day.at(check_time).do(check_backup_status_job) while True: schedule.run_pending() time.sleep(60) # ========================================== # 5. API 路由 # ========================================== @app.route('/') def root(): return "

403 Forbidden

", 403 @app.route('/login.html') def login_page(): return render_template('login.html') @app.route('/detail.html') def detail_page(): return render_template('detail.html') @app.route('/editor_manual.html') def editor_manual_page(): return render_template('editor_manual.html') @app.route('/api/my_role') def my_role(): user_ip = request.remote_addr if check_ip_in_list(user_ip, PERMISSIONS_CONFIG['ADMIN']): return jsonify({'role': 'ADMIN', 'ip': user_ip}) if check_ip_in_list(user_ip, PERMISSIONS_CONFIG['EDITOR']): return jsonify({'role': 'EDITOR', 'ip': user_ip}) viewer_config = PERMISSIONS_CONFIG.get('VIEWER', []) is_viewer = False if '*' in viewer_config: is_viewer = True elif check_ip_in_list(user_ip, viewer_config): is_viewer = True if is_viewer: return jsonify({'role': 'VIEWER', 'ip': user_ip}) return jsonify({'role': 'GUEST', 'ip': user_ip}) @app.route('/api/history') def get_history(): conn = get_db() cur = conn.cursor() cur.execute("SELECT * FROM records ORDER BY timestamp DESC") rows = cur.fetchall() conn.close() data = [dict(row) for row in rows] for item in data: try: item['responsibleUnit'] = json.loads(item['responsibleUnit']) except: item['responsibleUnit'] = [] return jsonify(data) @app.route('/api/record/') def get_single_record(unique_id): conn = get_db() cur = conn.cursor() cur.execute("SELECT * FROM records WHERE uniqueId = ?", (unique_id,)) row = cur.fetchone() conn.close() if row: item = dict(row) try: item['responsibleUnit'] = json.loads(item['responsibleUnit']) except: item['responsibleUnit'] = [] return jsonify(item) return jsonify({'error': 'Not found'}), 404 @app.route('/api/log_login', methods=['POST']) def log_login_endpoint(): log_activity(request.json.get('user', 'Unknown'), "LOGIN") return jsonify({'success': True}) @app.route('/api/logout', methods=['POST']) def logout(): log_activity(request.json.get('user', 'Unknown'), "LOGOUT") return jsonify({'success': True}) @app.route('/api/save', methods=['POST']) @require_role('ADMIN') def save_record(): data = request.json cat = data['category'] site = data['site'] ym = data['yearMonth'].replace('-', '') base_data = { 'customName': data['customName'], 'status': "Open", 'remarks': "", 'responsibleUnit': json.dumps([]), 'recurring': data.get('recurring', 'N'), 'customerCode': data.get('customerCode', ''), 'complaintId': "", 'model': data.get('model', ''), 'fileDate': data.get('fileDate', ''), 'penaltyDate': "", 'gracePeriod': 0, 'stopAlert': 'N', 'returnQty': 0, 'shippedQty': 0, 'defectThreshold': 0.0, 'closingDate': "", 'timestamp': datetime.now().isoformat(), 'exceptionDesc': "", 'severity': "", 'handlingDesc': "", 'issueType': "", 'source': "", 'issuer': "", 'respUnitType': "", 'respUnitName': "", 'respPerson': "", 'rootCause': "", 'correctiveAction': "", 'trackingResult': "", 'version': 0, 'filePath': "" } max_retries = 5 for attempt in range(max_retries): conn = get_db() cur = conn.cursor() try: prefix = f"{cat}-{site}{ym}" cur.execute("SELECT uniqueId FROM records WHERE uniqueId LIKE ? ORDER BY uniqueId DESC LIMIT 1", (prefix + '%',)) row = cur.fetchone() current_max = 0 if row: last_id = row[0] try: suffix = last_id[len(prefix):] if '-' in suffix: suffix = suffix.split('-')[0] current_max = int(suffix) except: current_max = 0 new_seq_num = current_max + 1 new_seq_str = str(new_seq_num).zfill(3) unique_id = f"{prefix}{new_seq_str}" parts = [unique_id, base_data['model'], base_data['customName'], base_data['customerCode'], base_data['fileDate']] full_filename = '-'.join(filter(None, parts)) cur.execute('''INSERT INTO records (uniqueId, fullFileName, category, site, yearMonth, sequence, customName, status, remarks, timestamp, responsibleUnit, recurring, customerCode, complaintId, model, penaltyDate, gracePeriod, stopAlert, returnQty, shippedQty, defectThreshold, closingDate, exceptionDesc, severity, handlingDesc, issueType, source, issuer, respUnitType, respUnitName, respPerson, rootCause, correctiveAction, trackingResult, fileDate, version, filePath) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''', (unique_id, full_filename, cat, site, ym, new_seq_str, base_data['customName'], base_data['status'], base_data['remarks'], base_data['timestamp'], base_data['responsibleUnit'], base_data['recurring'], base_data['customerCode'], base_data['complaintId'], base_data['model'], base_data['penaltyDate'], base_data['gracePeriod'], base_data['stopAlert'], base_data['returnQty'], base_data['shippedQty'], base_data['defectThreshold'], base_data['closingDate'], base_data['exceptionDesc'], base_data['severity'], base_data['handlingDesc'], base_data['issueType'], base_data['source'], base_data['issuer'], base_data['respUnitType'], base_data['respUnitName'], base_data['respPerson'], base_data['rootCause'], base_data['correctiveAction'], base_data['trackingResult'], base_data['fileDate'], base_data['version'], base_data['filePath'])) conn.commit() conn.close() return jsonify({'success': True, 'savedId': unique_id, 'savedFileName': full_filename}) except sqlite3.IntegrityError: conn.close() if attempt < max_retries - 1: time.sleep(random.uniform(0.1, 0.4)) continue else: return jsonify({'success': False, 'msg': 'SYSTEM_BUSY_ID_CONFLICT'}), 409 except Exception as e: conn.close() return jsonify({'success': False, 'msg': str(e)}), 500 @app.route('/api/admin_update', methods=['POST']) @require_role('ADMIN') def admin_update_record(): data = request.json old_unique_id = data.get('originalUniqueId') new_unique_id = data.get('newUniqueId') current_version = data.get('version') new_model = data.get('model', '').strip() new_custom_name = data.get('customName', '').strip() new_cust_code = data.get('customerCode', '').strip() new_file_date = data.get('fileDate', '').strip() if not old_unique_id or not new_unique_id or not new_custom_name: return jsonify({'success': False, 'msg': 'Missing ID or Custom Name'}), 400 conn = get_db() cur = conn.cursor() try: if old_unique_id != new_unique_id: cur.execute("SELECT 1 FROM records WHERE uniqueId = ?", (new_unique_id,)) if cur.fetchone(): return jsonify({'success': False, 'msg': 'NEW_ID_ALREADY_EXISTS'}), 409 parts = [new_unique_id, new_model, new_custom_name, new_cust_code, new_file_date] new_full_filename = '-'.join(filter(None, parts)) cur.execute('''UPDATE records SET uniqueId=?, model=?, customName=?, customerCode=?, fileDate=?, fullFileName=?, version = version + 1 WHERE uniqueId=? AND version=?''', (new_unique_id, new_model, new_custom_name, new_cust_code, new_file_date, new_full_filename, old_unique_id, current_version)) conn.commit() if cur.rowcount == 0: return jsonify({'success': False, 'msg': 'DATA_CONFLICT'}), 409 log_activity(f"Admin Update: {old_unique_id} -> {new_unique_id}", "ADMIN_METADATA_UPDATE", request.remote_addr) return jsonify({'success': True, 'newFileName': new_full_filename}) except Exception as e: return jsonify({'success': False, 'msg': str(e)}), 500 finally: conn.close() @app.route('/api/update', methods=['POST']) def update_record(): data = request.json user_ip = request.remote_addr role = get_permission_level(user_ip) if role not in ['ADMIN', 'EDITOR']: return jsonify({'success': False, 'msg': 'PERMISSION_DENIED'}), 403 conn = get_db() cur = conn.cursor() try: units = data.get('responsibleUnit', []) if isinstance(units, list): units_json = json.dumps(units) else: units_json = units current_version = data.get('version') cur.execute('''UPDATE records SET status=?, remarks=?, responsibleUnit=?, recurring=?, complaintId=?, penaltyDate=?, gracePeriod=?, stopAlert=?, returnQty=?, shippedQty=?, defectThreshold=?, closingDate=?, exceptionDesc=?, severity=?, handlingDesc=?, issueType=?, source=?, issuer=?, respUnitType=?, respUnitName=?, respPerson=?, rootCause=?, correctiveAction=?, trackingResult=?, version = version + 1 WHERE uniqueId=? AND version=?''', (data.get('status'), data.get('remarks'), units_json, data.get('recurring','N'), data.get('complaintId',''), data.get('penaltyDate',''), data.get('gracePeriod',0), data.get('stopAlert','N'), data.get('returnQty',0), data.get('shippedQty',0), data.get('defectThreshold',0.0), data.get('closingDate', ''), data.get('exceptionDesc', ''), data.get('severity', ''), data.get('handlingDesc', ''), data.get('issueType', ''), data.get('source', ''), data.get('issuer', ''), data.get('respUnitType', ''), data.get('respUnitName', ''), data.get('respPerson', ''), data.get('rootCause', ''), data.get('correctiveAction', ''), data.get('trackingResult', ''), data['uniqueId'], current_version)) conn.commit() if cur.rowcount == 0: return jsonify({'success': False, 'msg': 'DATA_CONFLICT'}), 409 return jsonify({'success': True}) except Exception as e: return jsonify({'success': False, 'msg': str(e)}), 500 finally: conn.close() @app.route('/api/update_filepath', methods=['POST']) def update_filepath(): data = request.json user_ip = request.remote_addr role = get_permission_level(user_ip) if role not in ['ADMIN', 'EDITOR']: return jsonify({'success': False, 'msg': 'PERMISSION_DENIED'}), 403 conn = get_db() cur = conn.cursor() try: unique_id = data.get('uniqueId') file_path = data.get('filePath', '').strip() current_version = data.get('version') cur.execute('''UPDATE records SET filePath=?, version = version + 1 WHERE uniqueId=? AND version=?''', (file_path, unique_id, current_version)) conn.commit() if cur.rowcount == 0: return jsonify({'success': False, 'msg': 'DATA_CONFLICT'}), 409 return jsonify({'success': True}) except Exception as e: return jsonify({'success': False, 'msg': str(e)}), 500 finally: conn.close() @app.route('/api/reset', methods=['POST']) @require_role('ADMIN') def reset_db(): conn = get_db() try: conn.execute("DELETE FROM records") conn.commit() return jsonify({'success': True}) except Exception as e: return jsonify({'success': False, 'msg': str(e)}), 500 finally: conn.close() @app.route('/api/backup') def backup_db(): conn = get_db() cur = conn.cursor() cur.execute("SELECT * FROM records") data = [dict(row) for row in cur.fetchall()] cur.execute("INSERT OR REPLACE INTO system_info (key, value) VALUES ('last_backup', ?)", (datetime.now().strftime('%Y/%m/%d %H:%M'),)) conn.commit() conn.close() return Response(json.dumps(data, ensure_ascii=False, indent=2), mimetype='application/json', headers={'Content-Disposition': f'attachment;filename=NCR_Backup_{datetime.now().strftime("%Y%m%d")}.json'}) @app.route('/api/last_backup') def get_last_backup(): conn = get_db() cur = conn.cursor() cur.execute("SELECT value FROM system_info WHERE key='last_backup'") row = cur.fetchone() conn.close() return jsonify({'time': row['value'] if row else "Never"}) @app.route('/api/restore', methods=['POST']) @require_role('ADMIN') def restore_db(): if 'file' not in request.files: return jsonify({'success': False, 'msg': 'No file'}), 400 try: data = json.load(request.files['file']) conn = get_db() cur = conn.cursor() success = 0 skip = 0 date_pattern = re.compile(r'^\d{8}$') for row in data: if cur.execute("SELECT 1 FROM records WHERE uniqueId = ?", (row['uniqueId'],)).fetchone(): skip += 1; continue f_date = row.get('fileDate', '') if not f_date and row.get('fullFileName'): parts = row['fullFileName'].split('-') if parts: last_part = parts[-1].strip() if date_pattern.match(last_part): f_date = last_part vals = ( row['uniqueId'], row['fullFileName'], row['category'], row['site'], row['yearMonth'], row['sequence'], row['customName'], row['status'], row['remarks'], row['timestamp'], row['responsibleUnit'], row.get('recurring', 'N'), row.get('customerCode', ''), row.get('complaintId', ''), row.get('model', ''), row.get('penaltyDate', ''), row.get('gracePeriod', 0), row.get('stopAlert', 'N'), row.get('returnQty', 0), row.get('shippedQty', 0), row.get('defectThreshold', 0.0), row.get('closingDate', ''), row.get('exceptionDesc', ''), row.get('severity', ''), row.get('handlingDesc', ''), row.get('issueType', ''), row.get('source', ''), row.get('issuer', ''), row.get('respUnitType', ''), row.get('respUnitName', ''), row.get('respPerson', ''), row.get('rootCause', ''), row.get('correctiveAction', ''), row.get('trackingResult', ''), f_date, row.get('version', 0), row.get('filePath', '') ) cur.execute('''INSERT INTO records (uniqueId, fullFileName, category, site, yearMonth, sequence, customName, status, remarks, timestamp, responsibleUnit, recurring, customerCode, complaintId, model, penaltyDate, gracePeriod, stopAlert, returnQty, shippedQty, defectThreshold, closingDate, exceptionDesc, severity, handlingDesc, issueType, source, issuer, respUnitType, respUnitName, respPerson, rootCause, correctiveAction, trackingResult, fileDate, version, filePath) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''', vals) success += 1 conn.commit() conn.close() return jsonify({'success': True, 'msg': 'RESTORE_OK', 's': success, 'k': skip}) except Exception as e: return jsonify({'success': False, 'msg': str(e)}), 500 @app.route('/api/export_logs') def export_logs(): conn = get_db() cur = conn.cursor() cur.execute("SELECT * FROM access_logs ORDER BY timestamp DESC") rows = cur.fetchall() conn.close() si = io.StringIO() si.write('\ufeff') writer = csv.writer(si) writer.writerow(['ID', 'User', 'Action', 'IP Address', 'Time']) for row in rows: writer.writerow([row[0], row[1], row[2], row[3], row[4]]) return Response(si.getvalue(), mimetype="text/csv", headers={"Content-Disposition": "attachment;filename=Logs.csv"}) CSV_HEADERS_MAP = { 'en': { 'stats_title': 'Statistics Summary', 'stats_total': 'Total Records', 'stats_status': '--- Status Statistics ---', 'stats_unit': '--- Responsible Unit (Dept) ---', 'stats_supplier': '--- Supplier ---', 'stats_model': '--- Model ---', 'stats_cust': '--- Customer Code ---', 'cols': ['Filename', 'Status', 'Resp. Unit', 'Model', 'Cust. Code', 'Complaint ID', 'Defect Rate(%)', 'Penalty Date', 'Recurring', 'Closing Date', 'Created Time', 'Remarks', 'Issue Type', 'Source', 'MA/MI', 'Issuer', 'Exception Description', 'Handling Description', 'Resp. Unit Type', 'Resp. Unit Name', 'Resp. Person', 'Root Cause Analysis', 'Corrective Action', 'Response / Tracking'] }, 'tw': { 'stats_title': '統計摘要', 'stats_total': '總筆數', 'stats_status': '--- 狀態統計 (開立/結案/作廢) ---', 'stats_unit': '--- 責任單位 (部門) ---', 'stats_supplier': '--- 供應商 ---', 'stats_model': '--- 型號 ---', 'stats_cust': '--- 客戶代號 ---', 'cols': ['完整檔名', '狀態', '責任單位', '型號', '客戶代號', '客訴單號', '不良率(%)', '扣款起算日', '再發', '結案日期', '異常類型', '來源', '嚴重度(MA/MI)', '開單人', '異常說明', '處置說明', '責任單位類別', '責任單位名稱', '負責人', '原因分析', '改善對策', '回覆日期/追蹤結果', '建立時間', '備註'] }, 'cn': { 'stats_title': '统计摘要', 'stats_total': '总笔数', 'stats_status': '--- 状态统计 (开立/结案/作废) ---', 'stats_unit': '--- 责任单位 (部门) ---', 'stats_supplier': '--- 供应商 ---', 'stats_model': '--- 型号 ---', 'stats_cust': '--- 客户代号 ---', 'cols': ['完整文件名', '状态', '责任单位', '型号', '客户代号', '客诉单号', '不良率(%)', '扣款起算日', '再发', '结案日期', '异常类型', '来源', '严重度(MA/MI)', '开单人', '异常说明', '处置说明', '责任单位类别', '责任单位名称', '负责人', '原因分析', '改善对策', '回复日期/追踪结果', '建立时间', '备注'] }, 'vi': { 'stats_title': 'Tóm tắt thống kê', 'stats_total': 'Tổng số', 'stats_status': '--- Trạng thái (Open/Closed/Void) ---', 'stats_unit': '--- Đơn vị chịu trách nhiệm ---', 'stats_supplier': '--- Nhà cung cấp ---', 'stats_model': '--- Mô hình ---', 'stats_cust': '--- Khách hàng ---', 'cols': ['Tên tệp', 'Trạng thái', 'Đơn vị', 'Mô hình', 'Mã KH', 'Mã khiếu nại', 'Tỷ lệ lỗi(%)', 'Ngày phạt', 'Lặp lại', 'Ngày đóng', 'Loại', 'Nguồn', 'Độ nghiêm trọng', 'Người phát hành', 'Mô tả ngoại lệ', 'Xử lý sản phẩm', 'Loại đơn vị', 'Tên đơn vị', 'Người phụ trách', 'Nguyên nhân gốc rễ', 'Đối sách', 'Ngày phản hồi / Kết quả', 'Thời gian tạo', 'Ghi chú'] } } VALUE_TRANS_MAP = { 'en': { 'status_Open': 'Open', 'status_Closed': 'Closed', 'status_Void': 'Void', 'unit_MGMT': 'MGMT', 'unit_ENG': 'ENG', 'unit_PMC': 'PMC', 'unit_QA': 'QA', 'unit_PC': 'PC', 'unit_MFG': 'MFG', 'unit_PUR': 'PUR', 'unit_RD': 'RD', 'unit_SALES': 'SALES', 'unit_RA': 'Regulatory (RA)', 'prefix_sup': 'Supplier: ' }, 'tw': { 'status_Open': '開立', 'status_Closed': '結案', 'status_Void': '作廢', 'unit_MGMT': '管理', 'unit_ENG': '工程', 'unit_PMC': '物管', 'unit_QA': '品保', 'unit_PC': '採管', 'unit_MFG': '製造', 'unit_PUR': '採購', 'unit_RD': '研發', 'unit_SALES': '業務', 'unit_RA': '法規', 'prefix_sup': '供應商: ' }, 'cn': { 'status_Open': '开立', 'status_Closed': '结案', 'status_Void': '作废', 'unit_MGMT': '管理', 'unit_ENG': '工程', 'unit_PMC': '物管', 'unit_QA': '品保', 'unit_PC': '采管', 'unit_MFG': '制造', 'unit_PUR': '采购', 'unit_RD': '研发', 'unit_SALES': '业务', 'unit_RA': '法规', 'prefix_sup': '供应商: ' }, 'vi': { 'status_Open': 'Mở', 'status_Closed': 'Đóng', 'status_Void': 'Vô hiệu', 'unit_MGMT': 'Quản lý', 'unit_ENG': 'Kỹ thuật', 'unit_PMC': 'QL Vật liệu', 'unit_QA': 'QA', 'unit_PC': 'KS Mua', 'unit_MFG': 'Sản xuất', 'unit_PUR': 'Mua hàng', 'unit_RD': 'R&D', 'unit_SALES': 'Kinh doanh', 'unit_RA': 'Pháp quy', 'prefix_sup': 'NCC: ' } } @app.route('/api/export_csv') def export_csv(): try: conn = get_db() conn.row_factory = sqlite3.Row cur = conn.cursor() lang = request.args.get('lang', 'en') headers_map = CSV_HEADERS_MAP.get(lang, CSV_HEADERS_MAP['en']) query = "SELECT * FROM records WHERE 1=1" params = [] categories = request.args.get('categories') if categories: query += f" AND category IN ({','.join(['?']*len(categories.split(',')))})" params.extend(categories.split(',')) sites = request.args.get('sites') if sites: query += f" AND site IN ({','.join(['?']*len(sites.split(',')))})" params.extend(sites.split(',')) status_filter = request.args.get('status') if status_filter: query += f" AND status IN ({','.join(['?']*len(status_filter.split(',')))})" params.extend(status_filter.split(',')) start_date = request.args.get('start') if start_date: query += " AND substr(timestamp, 1, 10) >= ?" params.append(start_date) end_date = request.args.get('end') if end_date: query += " AND substr(timestamp, 1, 10) <= ?" params.append(end_date) search = request.args.get('search') if search: search_pattern = f"%{search}%" query += " AND (fullFileName LIKE ? OR customName LIKE ? OR remarks LIKE ? OR complaintId LIKE ?)" params.extend([search_pattern, search_pattern, search_pattern, search_pattern]) model = request.args.get('model') if model: query += " AND model LIKE ?" params.append(f"%{model}%") cust = request.args.get('cust') if cust: query += " AND customerCode LIKE ?" params.append(f"%{cust}%") query += " ORDER BY timestamp DESC" cur.execute(query, params) rows = cur.fetchall() total_count = len(rows) stats_status = {} stats_dept = {} stats_sup = {} stats_model = {} stats_cust = {} for row in rows: st = row['status'] if row['status'] else 'Open' stats_status[st] = stats_status.get(st, 0) + 1 try: units = json.loads(row['responsibleUnit']) except: units = [] if not isinstance(units, list): units = [] for u in units: if u.startswith('SUP:'): sup_name = u.split(':', 1)[1] if ':' in u else u stats_sup[sup_name] = stats_sup.get(sup_name, 0) + 1 else: stats_dept[u] = stats_dept.get(u, 0) + 1 md = row['model'] if md: stats_model[md] = stats_model.get(md, 0) + 1 cc = row['customerCode'] if cc: stats_cust[cc] = stats_cust.get(cc, 0) + 1 si = io.StringIO() si.write('\ufeff') writer = csv.writer(si) writer.writerow([headers_map['stats_title']]) writer.writerow([headers_map['stats_total'], total_count]) writer.writerow([]) writer.writerow([headers_map['stats_status']]) for k, v in stats_status.items(): writer.writerow(["", k, v]) if stats_dept: writer.writerow([headers_map['stats_unit']]) for k, v in sorted(stats_dept.items(), key=lambda item: item[1], reverse=True): writer.writerow(["", k, v]) if stats_sup: writer.writerow([headers_map['stats_supplier']]) for k, v in sorted(stats_sup.items(), key=lambda item: item[1], reverse=True): writer.writerow(["", k, v]) if stats_model: writer.writerow([headers_map['stats_model']]) for k, v in sorted(stats_model.items(), key=lambda item: item[1], reverse=True): writer.writerow(["", k, v]) if stats_cust: writer.writerow([headers_map['stats_cust']]) for k, v in sorted(stats_cust.items(), key=lambda item: item[1], reverse=True): writer.writerow(["", k, v]) writer.writerow([]) writer.writerow(headers_map['cols']) for row in rows: try: units = json.loads(row['responsibleUnit']) unit_str = ", ".join(units) if isinstance(units, list) else str(units) except: unit_str = str(row['responsibleUnit']) r_qty = row['returnQty'] or 0 s_qty = row['shippedQty'] or 0 rate = (r_qty / s_qty * 100) if s_qty > 0 else 0.0 writer.writerow([ row['fullFileName'], row['status'], unit_str, row['model'], row['customerCode'], row['complaintId'], f"{rate:.2f}%", row['penaltyDate'], row['recurring'], row['closingDate'], row['issueType'], row['source'], row['severity'], row['issuer'], row['exceptionDesc'], row['handlingDesc'], row['respUnitType'], row['respUnitName'], row['respPerson'], row['rootCause'], row['correctiveAction'], row['trackingResult'], row['timestamp'], row['remarks'], ]) conn.close() filename = f"NCR_List_{datetime.now().strftime('%Y%m%d')}.csv" return Response(si.getvalue(), mimetype="text/csv", headers={"Content-Disposition": f"attachment;filename={filename}"}) except Exception as e: return str(e), 500 @app.route('/api/export_excel') def export_excel(): try: conn = get_db() conn.row_factory = sqlite3.Row cur = conn.cursor() lang = request.args.get('lang', 'en') headers_map = CSV_HEADERS_MAP.get(lang, CSV_HEADERS_MAP['en']) trans_map = VALUE_TRANS_MAP.get(lang, VALUE_TRANS_MAP['en']) query = "SELECT * FROM records WHERE 1=1" params = [] categories = request.args.get('categories') if categories: query += f" AND category IN ({','.join(['?']*len(categories.split(',')))})" params.extend(categories.split(',')) sites = request.args.get('sites') if sites: query += f" AND site IN ({','.join(['?']*len(sites.split(',')))})" params.extend(sites.split(',')) status_filter = request.args.get('status') if status_filter: query += f" AND status IN ({','.join(['?']*len(status_filter.split(',')))})" params.extend(status_filter.split(',')) start_date = request.args.get('start') if start_date: query += " AND substr(timestamp, 1, 10) >= ?" params.append(start_date) end_date = request.args.get('end') if end_date: query += " AND substr(timestamp, 1, 10) <= ?" params.append(end_date) search = request.args.get('search') if search: search_pattern = f"%{search}%" query += " AND (fullFileName LIKE ? OR customName LIKE ? OR remarks LIKE ? OR complaintId LIKE ?)" params.extend([search_pattern, search_pattern, search_pattern, search_pattern]) model = request.args.get('model') if model: query += " AND model LIKE ?" params.append(f"%{model}%") cust = request.args.get('cust') if cust: query += " AND customerCode LIKE ?" params.append(f"%{cust}%") query += " ORDER BY timestamp DESC" cur.execute(query, params) rows = cur.fetchall() total_count = len(rows) stats_status = {} stats_dept = {} stats_sup = {} for row in rows: raw_st = row['status'] if row['status'] else 'Open' st_key = trans_map.get(f'status_{raw_st}', raw_st) stats_status[st_key] = stats_status.get(st_key, 0) + 1 try: units = json.loads(row['responsibleUnit']) except: units = [] if not isinstance(units, list): units = [] for u in units: if u.startswith('SUP:'): sup_name = u.split(':', 1)[1] if ':' in u else u stats_sup[sup_name] = stats_sup.get(sup_name, 0) + 1 else: dept_name = trans_map.get(f'unit_{u}', u) stats_dept[dept_name] = stats_dept.get(dept_name, 0) + 1 conn.close() wb = Workbook() ws = wb.active ws.title = "NCR List" style_center = Alignment(horizontal='center', vertical='center') style_wrap = Alignment(wrap_text=True, vertical='top') style_stat = Alignment(horizontal='left', vertical='center', wrap_text=True) style_header = Alignment(horizontal='center', vertical='center', wrap_text=True) font_title = Font(size=16, bold=True) font_bold = Font(bold=True) border_thin = Border(left=Side(style='thin'), right=Side(style='thin'), top=Side(style='thin'), bottom=Side(style='thin')) fill_header = PatternFill(start_color="D9EAD3", end_color="D9EAD3", fill_type="solid") col_count = len(headers_map['cols']) last_col_char = get_column_letter(col_count) ws.merge_cells(f'A1:{last_col_char}3') cell_title = ws['A1'] cell_title.value = f"NCR Analysis Table ({datetime.now().strftime('%Y-%m-%d')})" cell_title.alignment = style_center cell_title.font = font_title current_row = 5 stat_base_height = 18 def set_stat_row(label, text_lines, row_idx): ws.cell(row=row_idx, column=1, value=label).font = font_bold content = "\n".join(text_lines) ws.merge_cells(start_row=row_idx, start_column=2, end_row=row_idx, end_column=6) cell_content = ws.cell(row=row_idx, column=2, value=content) cell_content.alignment = style_stat line_count = len(text_lines) if text_lines else 1 ws.row_dimensions[row_idx].height = max(stat_base_height, line_count * 16) + 4 ws.cell(row=current_row, column=1, value=f"{headers_map['stats_total']}: {total_count}").font = font_bold current_row += 1 lines = [f"{k}: {v}" for k, v in stats_status.items()] set_stat_row(headers_map['stats_status'], lines, current_row) current_row += 1 if stats_dept: lines = [f"{k}: {v}" for k, v in sorted(stats_dept.items(), key=lambda x: x[1], reverse=True)] set_stat_row(headers_map['stats_unit'], lines, current_row) current_row += 1 if stats_sup: sup_prefix = trans_map.get('prefix_sup', 'Supplier: ') lines = [f"{sup_prefix}{k}: {v}" for k, v in sorted(stats_sup.items(), key=lambda x: x[1], reverse=True)] set_stat_row(headers_map['stats_supplier'], lines, current_row) current_row += 1 current_row += 1 header_row_num = current_row for idx, col_name in enumerate(headers_map['cols'], 1): cell = ws.cell(row=header_row_num, column=idx, value=col_name) cell.font = font_bold cell.alignment = style_header cell.fill = fill_header cell.border = border_thin data_start_row = header_row_num + 1 wrap_indices = [14, 15, 19, 20, 21, 23] for r_idx, row in enumerate(rows): excel_row = data_start_row + r_idx try: raw_units = json.loads(row['responsibleUnit']) except: raw_units = [] if not isinstance(raw_units, list): raw_units = [] display_units = [] for u in raw_units: if u.startswith('SUP:'): s_name = u.split(':', 1)[1] display_units.append(f"{trans_map.get('prefix_sup')}{s_name}") else: display_units.append(trans_map.get(f'unit_{u}', u)) unit_str = "\n".join(display_units) raw_st = row['status'] if row['status'] else 'Open' display_status = trans_map.get(f'status_{raw_st}', raw_st) r_qty = row['returnQty'] or 0 s_qty = row['shippedQty'] or 0 rate = (r_qty / s_qty * 100) if s_qty > 0 else 0.0 row_data = [ row['fullFileName'], display_status, unit_str, row['model'], row['customerCode'], row['complaintId'], f"{rate:.2f}%", row['penaltyDate'], row['recurring'], row['closingDate'], row['issueType'], row['source'], row['severity'], row['issuer'], row['exceptionDesc'], row['handlingDesc'], row['respUnitType'], row['respUnitName'], row['respPerson'], row['rootCause'], row['correctiveAction'], row['trackingResult'], row['timestamp'], row['remarks'], ] for c_idx, val in enumerate(row_data, 1): cell = ws.cell(row=excel_row, column=c_idx, value=val) cell.border = border_thin if (c_idx - 1) in wrap_indices or c_idx == 3: cell.alignment = style_wrap else: cell.alignment = Alignment(vertical='top') def get_visual_width(s): w = 0 for char in str(s): if ord(char) > 127: w += 1.8 else: w += 1.0 return w for col_idx in range(1, col_count + 1): col_letter = get_column_letter(col_idx) is_wrap_col = (col_idx - 1) in wrap_indices if is_wrap_col: ws.column_dimensions[col_letter].width = 50 else: max_width = 0 header_val = ws.cell(row=header_row_num, column=col_idx).value if header_val: max_width = get_visual_width(header_val) for r_idx in range(len(rows)): val = ws.cell(row=data_start_row + r_idx, column=col_idx).value if val: str_val = str(val) if col_idx == 3: lines = str_val.split('\n') line_widths = [get_visual_width(line) for line in lines] current_max = max(line_widths, default=0) if current_max > max_width: max_width = current_max else: w = get_visual_width(str_val) if w > max_width: max_width = w final_width = max_width + 2 if col_idx == 3: # 責任單位 final_width = min(50, max(15, final_width)) elif col_idx == 2: # 狀態 final_width = min(20, max(10, final_width)) # 狀態不需太寬 else: final_width = min(60, max(10, final_width)) ws.column_dimensions[col_letter].width = final_width output = io.BytesIO() wb.save(output) output.seek(0) filename = f"NCR_Analysis_{datetime.now().strftime('%Y%m%d')}.xlsx" return Response( output, mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', headers={"Content-Disposition": f"attachment;filename={filename}"} ) except Exception as e: return str(e), 500 if __name__ == '__main__': # 啟動排程執行緒 scheduler_thread = threading.Thread(target=run_scheduler) scheduler_thread.daemon = True scheduler_thread.start() print("Scheduler started. Daily check at 08:00.") app.run(host='0.0.0.0', port=5000, debug=False)