From e37902861f3dd406d77e91496f481d486dce4aca Mon Sep 17 00:00:00 2001 From: egg Date: Tue, 24 Feb 2026 19:10:19 +0800 Subject: [PATCH] fix(query-tool): export button hover visibility, jobs tab hide columns and export with txn history - Increase CSS specificity for .btn-export to prevent portal-shell override on hover - Remove RESOURCEID and CONTAINERIDS from jobs tab display columns - Add lot_jobs_with_txn.sql joining JOB with JOBTXNHISTORY for complete export - Route lot_jobs export through get_lot_jobs_with_history() for full transaction data Co-Authored-By: Claude Opus 4.6 --- .../query-tool/components/LotJobsTable.vue | 2 - frontend/src/query-tool/style.css | 7 +- src/mes_dashboard/routes/query_tool_routes.py | 3 +- .../services/query_tool_service.py | 1244 +++++++++-------- .../sql/query_tool/lot_jobs_with_txn.sql | 38 + tests/test_query_tool_routes.py | 33 + 6 files changed, 729 insertions(+), 598 deletions(-) create mode 100644 src/mes_dashboard/sql/query_tool/lot_jobs_with_txn.sql diff --git a/frontend/src/query-tool/components/LotJobsTable.vue b/frontend/src/query-tool/components/LotJobsTable.vue index 9f15fe7..5861dc7 100644 --- a/frontend/src/query-tool/components/LotJobsTable.vue +++ b/frontend/src/query-tool/components/LotJobsTable.vue @@ -24,7 +24,6 @@ ensureMesApiAvailable(); const JOB_COLUMN_PRIORITY = Object.freeze([ 'JOBID', - 'RESOURCEID', 'RESOURCENAME', 'JOBSTATUS', 'JOBMODELNAME', @@ -42,7 +41,6 @@ const JOB_COLUMN_PRIORITY = Object.freeze([ 'PJ_SYMPTOMCODE2NAME', 'CREATE_EMPNAME', 'COMPLETE_EMPNAME', - 'CONTAINERIDS', 'CONTAINERNAMES', ]); diff --git a/frontend/src/query-tool/style.css b/frontend/src/query-tool/style.css index 14df2d7..ffee796 100644 --- a/frontend/src/query-tool/style.css +++ b/frontend/src/query-tool/style.css @@ -116,16 +116,17 @@ body { cursor: not-allowed; } -.btn-export { +.btn.btn-export { background: #0f766e; color: #fff; } -.btn-export:hover { +.btn.btn-export:hover { background: #0b5e59; + color: #fff; } -.btn-export:disabled { +.btn.btn-export:disabled { opacity: 0.6; cursor: not-allowed; } diff --git a/src/mes_dashboard/routes/query_tool_routes.py b/src/mes_dashboard/routes/query_tool_routes.py index 766c1f7..b2b14d9 100644 --- a/src/mes_dashboard/routes/query_tool_routes.py +++ b/src/mes_dashboard/routes/query_tool_routes.py @@ -27,6 +27,7 @@ from mes_dashboard.services.query_tool_service import ( get_lot_holds, get_lot_splits, get_lot_jobs, + get_lot_jobs_with_history, get_lot_associations_batch, get_equipment_status_hours, get_equipment_lots, @@ -683,7 +684,7 @@ def export_csv(): filename = f'lot_splits_{container_id}.csv' elif export_type == 'lot_jobs': - result = get_lot_jobs( + result = get_lot_jobs_with_history( params.get('equipment_id'), params.get('time_start'), params.get('time_end') diff --git a/src/mes_dashboard/services/query_tool_service.py b/src/mes_dashboard/services/query_tool_service.py index c1af9f2..baa1c5c 100644 --- a/src/mes_dashboard/services/query_tool_service.py +++ b/src/mes_dashboard/services/query_tool_service.py @@ -15,14 +15,14 @@ Architecture: - Uses QueryBuilder for dynamic conditions """ -import csv -import io -import logging -import os -import re -from datetime import datetime, timedelta -from decimal import Decimal -from typing import Any, Dict, List, Optional, Generator, Iterable, Tuple +import csv +import io +import logging +import os +import re +from datetime import datetime, timedelta +from decimal import Decimal +from typing import Any, Dict, List, Optional, Generator, Iterable, Tuple import pandas as pd @@ -37,29 +37,29 @@ except ImportError: """Compatibility wrapper when read_sql_df_slow is unavailable.""" return read_sql_df(sql, params) -logger = logging.getLogger('mes_dashboard.query_tool') +logger = logging.getLogger('mes_dashboard.query_tool') -# Constants -BATCH_SIZE = 1000 # Oracle IN clause limit -MAX_LOT_IDS = 50 -MAX_SERIAL_NUMBERS = 50 -MAX_WORK_ORDERS = 10 -MAX_EQUIPMENTS = 20 +# Constants +BATCH_SIZE = 1000 # Oracle IN clause limit +MAX_LOT_IDS = 50 +MAX_SERIAL_NUMBERS = 50 +MAX_WORK_ORDERS = 10 +MAX_EQUIPMENTS = 20 MAX_DATE_RANGE_DAYS = 90 DEFAULT_TIME_WINDOW_HOURS = 168 # 1 week for better PJ_TYPE detection -ADJACENT_LOTS_COUNT = 3 - - -def _max_batch_container_ids() -> int: - try: - return max(int(os.getenv("QUERY_TOOL_MAX_CONTAINER_IDS", "200")), 1) - except (TypeError, ValueError): - return 200 - - -# ============================================================ -# Validation Functions -# ============================================================ +ADJACENT_LOTS_COUNT = 3 + + +def _max_batch_container_ids() -> int: + try: + return max(int(os.getenv("QUERY_TOOL_MAX_CONTAINER_IDS", "200")), 1) + except (TypeError, ValueError): + return 200 + + +# ============================================================ +# Validation Functions +# ============================================================ def validate_date_range(start_date: str, end_date: str, max_days: int = MAX_DATE_RANGE_DAYS) -> Optional[str]: """Validate date range. @@ -88,12 +88,12 @@ def validate_date_range(start_date: str, end_date: str, max_days: int = MAX_DATE return f'日期格式錯誤: {e}' -def validate_lot_input(input_type: str, values: List[str]) -> Optional[str]: +def validate_lot_input(input_type: str, values: List[str]) -> Optional[str]: """Validate LOT input based on type. - Args: - input_type: Type of input - values: List of input values + Args: + input_type: Type of input + values: List of input values Returns: Error message if validation fails, None if valid. @@ -101,14 +101,14 @@ def validate_lot_input(input_type: str, values: List[str]) -> Optional[str]: if not values: return '請輸入至少一個查詢條件' - limits = { - 'lot_id': MAX_LOT_IDS, - 'wafer_lot': MAX_LOT_IDS, - 'gd_lot_id': MAX_LOT_IDS, - 'serial_number': MAX_SERIAL_NUMBERS, - 'work_order': MAX_WORK_ORDERS, - 'gd_work_order': MAX_WORK_ORDERS, - } + limits = { + 'lot_id': MAX_LOT_IDS, + 'wafer_lot': MAX_LOT_IDS, + 'gd_lot_id': MAX_LOT_IDS, + 'serial_number': MAX_SERIAL_NUMBERS, + 'work_order': MAX_WORK_ORDERS, + 'gd_work_order': MAX_WORK_ORDERS, + } limit = limits.get(input_type, MAX_LOT_IDS) if len(values) > limit: @@ -135,7 +135,7 @@ def validate_equipment_input(equipment_ids: List[str]) -> Optional[str]: return None -def _df_to_records(df: pd.DataFrame) -> List[Dict[str, Any]]: +def _df_to_records(df: pd.DataFrame) -> List[Dict[str, Any]]: """Convert DataFrame to list of records with proper type handling. Args: @@ -164,174 +164,174 @@ def _df_to_records(df: pd.DataFrame) -> List[Dict[str, Any]]: record[col] = value data.append(record) - return data - - -def _normalize_search_tokens(values: Iterable[str]) -> List[str]: - """Normalize user-provided search tokens while preserving order.""" - normalized: List[str] = [] - seen = set() - for raw in values or []: - token = str(raw or '').strip() - if not token or token in seen: - continue - seen.add(token) - normalized.append(token) - return normalized - - -def _normalize_wildcard_token(value: str) -> str: - """Normalize user wildcard syntax. - - Supports both SQL wildcard (`%`) and shell-style wildcard (`*`). - """ - return str(value or '').replace('*', '%') - - -def _is_pattern_token(value: str) -> bool: - token = _normalize_wildcard_token(value) - return '%' in token or '_' in token - - -def _to_like_regex(pattern: str, *, case_insensitive: bool = False) -> re.Pattern: - """Convert SQL LIKE pattern (`%`, `_`, `\\` escape) to Python regex.""" - token = _normalize_wildcard_token(pattern) - parts: List[str] = ['^'] - i = 0 - while i < len(token): - ch = token[i] - if ch == '\\': - # Keep Oracle ESCAPE semantics: \% or \_ means literal. - if i + 1 < len(token): - i += 1 - parts.append(re.escape(token[i])) - else: - parts.append(re.escape(ch)) - elif ch == '%': - parts.append('.*') - elif ch == '_': - parts.append('.') - else: - parts.append(re.escape(ch)) - i += 1 - parts.append('$') - flags = re.IGNORECASE if case_insensitive else 0 - return re.compile(''.join(parts), flags) - - -def _add_exact_or_pattern_condition( - builder: QueryBuilder, - column: str, - values: List[str], - *, - case_insensitive: bool = False, -) -> None: - """Add a single OR-group condition supporting exact and wildcard tokens.""" - tokens = _normalize_search_tokens(values) - if not tokens: - return - - col_expr = f"UPPER(NVL({column}, ''))" if case_insensitive else f"NVL({column}, '')" - conditions: List[str] = [] - - exact_tokens = [token for token in tokens if not _is_pattern_token(token)] - pattern_tokens = [token for token in tokens if _is_pattern_token(token)] - - if exact_tokens: - placeholders: List[str] = [] - for token in exact_tokens: - param = builder._next_param() - placeholders.append(f":{param}") - builder.params[param] = token.upper() if case_insensitive else token - conditions.append(f"{col_expr} IN ({', '.join(placeholders)})") - - for token in pattern_tokens: - param = builder._next_param() - normalized = _normalize_wildcard_token(token) - builder.params[param] = normalized.upper() if case_insensitive else normalized - conditions.append(f"{col_expr} LIKE :{param} ESCAPE '\\'") - - if conditions: - builder.add_condition(f"({' OR '.join(conditions)})") - - -def _match_rows_by_tokens( - tokens: List[str], - rows: List[Dict[str, Any]], - *, - row_key: str, - case_insensitive: bool = False, -) -> Tuple[List[Dict[str, Any]], List[str], Dict[str, int]]: - """Map query tokens to matching rows and report not-found tokens.""" - normalized_tokens = _normalize_search_tokens(tokens) - if not normalized_tokens: - return [], [], {} - - def normalize_text(value: Any) -> str: - text = str(value or '').strip() - return text.upper() if case_insensitive else text - - row_pairs: List[Tuple[str, Dict[str, Any]]] = [ - (normalize_text(row.get(row_key)), row) - for row in rows - if normalize_text(row.get(row_key)) - ] - - exact_index: Dict[str, List[Dict[str, Any]]] = {} - for key, row in row_pairs: - exact_index.setdefault(key, []).append(row) - - matches: List[Dict[str, Any]] = [] - not_found: List[str] = [] - expansion_info: Dict[str, int] = {} - seen_pairs = set() - - for token in normalized_tokens: - token_key = normalize_text(token) - matched_rows: List[Dict[str, Any]] - - if _is_pattern_token(token): - regex = _to_like_regex(token, case_insensitive=case_insensitive) - matched_rows = [ - row - for value, row in row_pairs - if regex.fullmatch(value) - ] - else: - matched_rows = exact_index.get(token_key, []) - - if not matched_rows: - not_found.append(token) - continue - - expansion_info[token] = len(matched_rows) - for row in matched_rows: - cid = str(row.get('CONTAINERID') or row.get('container_id') or '').strip() - dedup_key = (token, cid) - if dedup_key in seen_pairs: - continue - seen_pairs.add(dedup_key) - item = dict(row) - item['input_value'] = token - matches.append(item) - - return matches, not_found, expansion_info + return data + + +def _normalize_search_tokens(values: Iterable[str]) -> List[str]: + """Normalize user-provided search tokens while preserving order.""" + normalized: List[str] = [] + seen = set() + for raw in values or []: + token = str(raw or '').strip() + if not token or token in seen: + continue + seen.add(token) + normalized.append(token) + return normalized + + +def _normalize_wildcard_token(value: str) -> str: + """Normalize user wildcard syntax. + + Supports both SQL wildcard (`%`) and shell-style wildcard (`*`). + """ + return str(value or '').replace('*', '%') + + +def _is_pattern_token(value: str) -> bool: + token = _normalize_wildcard_token(value) + return '%' in token or '_' in token + + +def _to_like_regex(pattern: str, *, case_insensitive: bool = False) -> re.Pattern: + """Convert SQL LIKE pattern (`%`, `_`, `\\` escape) to Python regex.""" + token = _normalize_wildcard_token(pattern) + parts: List[str] = ['^'] + i = 0 + while i < len(token): + ch = token[i] + if ch == '\\': + # Keep Oracle ESCAPE semantics: \% or \_ means literal. + if i + 1 < len(token): + i += 1 + parts.append(re.escape(token[i])) + else: + parts.append(re.escape(ch)) + elif ch == '%': + parts.append('.*') + elif ch == '_': + parts.append('.') + else: + parts.append(re.escape(ch)) + i += 1 + parts.append('$') + flags = re.IGNORECASE if case_insensitive else 0 + return re.compile(''.join(parts), flags) + + +def _add_exact_or_pattern_condition( + builder: QueryBuilder, + column: str, + values: List[str], + *, + case_insensitive: bool = False, +) -> None: + """Add a single OR-group condition supporting exact and wildcard tokens.""" + tokens = _normalize_search_tokens(values) + if not tokens: + return + + col_expr = f"UPPER(NVL({column}, ''))" if case_insensitive else f"NVL({column}, '')" + conditions: List[str] = [] + + exact_tokens = [token for token in tokens if not _is_pattern_token(token)] + pattern_tokens = [token for token in tokens if _is_pattern_token(token)] + + if exact_tokens: + placeholders: List[str] = [] + for token in exact_tokens: + param = builder._next_param() + placeholders.append(f":{param}") + builder.params[param] = token.upper() if case_insensitive else token + conditions.append(f"{col_expr} IN ({', '.join(placeholders)})") + + for token in pattern_tokens: + param = builder._next_param() + normalized = _normalize_wildcard_token(token) + builder.params[param] = normalized.upper() if case_insensitive else normalized + conditions.append(f"{col_expr} LIKE :{param} ESCAPE '\\'") + + if conditions: + builder.add_condition(f"({' OR '.join(conditions)})") + + +def _match_rows_by_tokens( + tokens: List[str], + rows: List[Dict[str, Any]], + *, + row_key: str, + case_insensitive: bool = False, +) -> Tuple[List[Dict[str, Any]], List[str], Dict[str, int]]: + """Map query tokens to matching rows and report not-found tokens.""" + normalized_tokens = _normalize_search_tokens(tokens) + if not normalized_tokens: + return [], [], {} + + def normalize_text(value: Any) -> str: + text = str(value or '').strip() + return text.upper() if case_insensitive else text + + row_pairs: List[Tuple[str, Dict[str, Any]]] = [ + (normalize_text(row.get(row_key)), row) + for row in rows + if normalize_text(row.get(row_key)) + ] + + exact_index: Dict[str, List[Dict[str, Any]]] = {} + for key, row in row_pairs: + exact_index.setdefault(key, []).append(row) + + matches: List[Dict[str, Any]] = [] + not_found: List[str] = [] + expansion_info: Dict[str, int] = {} + seen_pairs = set() + + for token in normalized_tokens: + token_key = normalize_text(token) + matched_rows: List[Dict[str, Any]] + + if _is_pattern_token(token): + regex = _to_like_regex(token, case_insensitive=case_insensitive) + matched_rows = [ + row + for value, row in row_pairs + if regex.fullmatch(value) + ] + else: + matched_rows = exact_index.get(token_key, []) + + if not matched_rows: + not_found.append(token) + continue + + expansion_info[token] = len(matched_rows) + for row in matched_rows: + cid = str(row.get('CONTAINERID') or row.get('container_id') or '').strip() + dedup_key = (token, cid) + if dedup_key in seen_pairs: + continue + seen_pairs.add(dedup_key) + item = dict(row) + item['input_value'] = token + matches.append(item) + + return matches, not_found, expansion_info # ============================================================ # LOT Resolution Functions # ============================================================ -def resolve_lots(input_type: str, values: List[str]) -> Dict[str, Any]: +def resolve_lots(input_type: str, values: List[str]) -> Dict[str, Any]: """Resolve input to CONTAINERID list. All historical tables (LOTWIPHISTORY, LOTMATERIALSHISTORY, etc.) use CONTAINERID as primary key, NOT CONTAINERNAME. This function converts user input to CONTAINERID for subsequent queries. - Args: - input_type: Type of input - values: List of input values + Args: + input_type: Type of input + values: List of input values Returns: Dict with 'data' (list of {container_id, input_value}), @@ -348,27 +348,27 @@ def resolve_lots(input_type: str, values: List[str]) -> Dict[str, Any]: return {'error': '請輸入有效的查詢條件'} try: - if input_type == 'lot_id': - return _resolve_by_lot_id(cleaned) - elif input_type == 'wafer_lot': - return _resolve_by_wafer_lot(cleaned) - elif input_type == 'gd_lot_id': - return _resolve_by_gd_lot_id(cleaned) - elif input_type == 'serial_number': - return _resolve_by_serial_number(cleaned) - elif input_type == 'work_order': - return _resolve_by_work_order(cleaned) - elif input_type == 'gd_work_order': - return _resolve_by_gd_work_order(cleaned) - else: - return {'error': f'不支援的輸入類型: {input_type}'} + if input_type == 'lot_id': + return _resolve_by_lot_id(cleaned) + elif input_type == 'wafer_lot': + return _resolve_by_wafer_lot(cleaned) + elif input_type == 'gd_lot_id': + return _resolve_by_gd_lot_id(cleaned) + elif input_type == 'serial_number': + return _resolve_by_serial_number(cleaned) + elif input_type == 'work_order': + return _resolve_by_work_order(cleaned) + elif input_type == 'gd_work_order': + return _resolve_by_gd_work_order(cleaned) + else: + return {'error': f'不支援的輸入類型: {input_type}'} except Exception as exc: logger.error(f"LOT resolution failed: {exc}") return {'error': f'解析失敗: {str(exc)}'} -def _resolve_by_lot_id(lot_ids: List[str]) -> Dict[str, Any]: +def _resolve_by_lot_id(lot_ids: List[str]) -> Dict[str, Any]: """Resolve LOT IDs (CONTAINERNAME) to CONTAINERID. Args: @@ -377,266 +377,266 @@ def _resolve_by_lot_id(lot_ids: List[str]) -> Dict[str, Any]: Returns: Resolution result dict. """ - builder = QueryBuilder() - _add_exact_or_pattern_condition(builder, "CONTAINERNAME", lot_ids) - sql = SQLLoader.load_with_params( - "query_tool/lot_resolve_id", - CONTAINER_FILTER=builder.get_conditions_sql(), - ) - - df = read_sql_df(sql, builder.params) - data = _df_to_records(df) - matched, not_found, expansion_info = _match_rows_by_tokens( - lot_ids, - data, - row_key='CONTAINERNAME', - ) - - results = [] - for row in matched: - results.append({ - 'container_id': row.get('CONTAINERID'), - 'lot_id': row.get('CONTAINERNAME'), - 'input_value': row.get('input_value'), - 'spec_name': row.get('SPECNAME'), - 'qty': row.get('QTY'), - }) - - logger.info(f"LOT ID resolution: {len(results)} found, {len(not_found)} not found") - - return { - 'data': results, - 'total': len(results), - 'input_count': len(lot_ids), - 'not_found': not_found, - 'expansion_info': expansion_info, - } - - -def _resolve_by_wafer_lot(wafer_lots: List[str]) -> Dict[str, Any]: - """Resolve wafer lot values (FIRSTNAME) to CONTAINERID.""" - builder = QueryBuilder() - _add_exact_or_pattern_condition(builder, "FIRSTNAME", wafer_lots) - builder.add_condition("OBJECTTYPE = 'LOT'") - sql = SQLLoader.load_with_params( - "query_tool/lot_resolve_wafer_lot", - WAFER_FILTER=builder.get_conditions_sql(), - ) - - df = read_sql_df(sql, builder.params) - data = _df_to_records(df) - matched, not_found, expansion_info = _match_rows_by_tokens( - wafer_lots, - data, - row_key='FIRSTNAME', - ) - - results = [] - for row in matched: - cid = row.get('CONTAINERID') - if not cid: - continue - results.append({ - 'container_id': cid, - 'lot_id': row.get('CONTAINERNAME'), - 'input_value': row.get('input_value'), - 'spec_name': row.get('SPECNAME'), - 'qty': row.get('QTY'), - }) - - logger.info(f"Wafer lot resolution: {len(results)} containers from {len(wafer_lots)} wafer lots") - return { - 'data': results, - 'total': len(results), - 'input_count': len(wafer_lots), - 'not_found': not_found, - 'expansion_info': expansion_info, - } - - -def _is_gd_like(value: str) -> bool: - text = str(value or '').strip().upper() - return text.startswith('GD') - - -def _literal_prefix_before_wildcard(value: str) -> str: - token = _normalize_wildcard_token(value) - for idx, ch in enumerate(token): - if ch in ('%', '_'): - return token[:idx] - return token - - -def _resolve_by_gd_lot_id(gd_lot_ids: List[str]) -> Dict[str, Any]: - """Resolve GD lot IDs to CONTAINERID with strict GD validation.""" - invalid = [value for value in gd_lot_ids if not _is_gd_like(_literal_prefix_before_wildcard(value))] - if invalid: - return {'error': f'GD LOT ID 格式錯誤: {", ".join(invalid)}'} - - builder = QueryBuilder() - _add_exact_or_pattern_condition(builder, "CONTAINERNAME", gd_lot_ids, case_insensitive=True) - builder.add_condition("(UPPER(NVL(CONTAINERNAME, '')) LIKE 'GD%' OR UPPER(NVL(MFGORDERNAME, '')) LIKE 'GD%')") - sql = SQLLoader.load_with_params( - "query_tool/lot_resolve_id", - CONTAINER_FILTER=builder.get_conditions_sql(), - ) - - df = read_sql_df(sql, builder.params) - data = _df_to_records(df) - matched, not_found, expansion_info = _match_rows_by_tokens( - gd_lot_ids, - data, - row_key='CONTAINERNAME', - case_insensitive=True, - ) - - results = [] - for row in matched: - results.append({ - 'container_id': row.get('CONTAINERID'), - 'lot_id': row.get('CONTAINERNAME'), - 'input_value': row.get('input_value'), - 'spec_name': row.get('SPECNAME'), - 'qty': row.get('QTY'), - }) - - logger.info(f"GD lot resolution: {len(results)} found, {len(not_found)} not found") - return { - 'data': results, - 'total': len(results), - 'input_count': len(gd_lot_ids), - 'not_found': not_found, - 'expansion_info': expansion_info, - } + builder = QueryBuilder() + _add_exact_or_pattern_condition(builder, "CONTAINERNAME", lot_ids) + sql = SQLLoader.load_with_params( + "query_tool/lot_resolve_id", + CONTAINER_FILTER=builder.get_conditions_sql(), + ) + + df = read_sql_df(sql, builder.params) + data = _df_to_records(df) + matched, not_found, expansion_info = _match_rows_by_tokens( + lot_ids, + data, + row_key='CONTAINERNAME', + ) + + results = [] + for row in matched: + results.append({ + 'container_id': row.get('CONTAINERID'), + 'lot_id': row.get('CONTAINERNAME'), + 'input_value': row.get('input_value'), + 'spec_name': row.get('SPECNAME'), + 'qty': row.get('QTY'), + }) + + logger.info(f"LOT ID resolution: {len(results)} found, {len(not_found)} not found") + + return { + 'data': results, + 'total': len(results), + 'input_count': len(lot_ids), + 'not_found': not_found, + 'expansion_info': expansion_info, + } -def _resolve_by_serial_number(serial_numbers: List[str]) -> Dict[str, Any]: - """Resolve serial-related inputs to CONTAINERID. - - Matching sources (in priority order): - 1. DW_MES_PJ_COMBINEDASSYLOTS.FINISHEDNAME (new serial path) - 2. DW_MES_CONTAINER.CONTAINERNAME (old serial / lot-id style inputs) - 3. DW_MES_CONTAINER.FIRSTNAME (bridge from serial to related lots) - """ - tokens = _normalize_search_tokens(serial_numbers) - if not tokens: - return { - 'data': [], - 'total': 0, - 'input_count': 0, - 'not_found': [], - 'expansion_info': {}, - } - - source_configs = [ - { - 'name': 'finished_name', - 'priority': 0, - 'sql_name': 'query_tool/lot_resolve_serial', - 'filter_key': 'SERIAL_FILTER', - 'filter_column': 'p.FINISHEDNAME', - 'match_key': 'FINISHEDNAME', - 'extra_conditions': [], - }, - { - 'name': 'container_name', - 'priority': 1, - 'sql_name': 'query_tool/lot_resolve_id', - 'filter_key': 'CONTAINER_FILTER', - 'filter_column': 'CONTAINERNAME', - 'match_key': 'CONTAINERNAME', - 'extra_conditions': ["OBJECTTYPE = 'LOT'"], - }, - { - 'name': 'first_name', - 'priority': 2, - 'sql_name': 'query_tool/lot_resolve_wafer_lot', - 'filter_key': 'WAFER_FILTER', - 'filter_column': 'FIRSTNAME', - 'match_key': 'FIRSTNAME', - 'extra_conditions': ["OBJECTTYPE = 'LOT'"], - }, - ] - - best_match_by_key: Dict[Tuple[str, str], Dict[str, Any]] = {} - - for config in source_configs: - builder = QueryBuilder() - _add_exact_or_pattern_condition(builder, config['filter_column'], tokens) - for cond in config['extra_conditions']: - builder.add_condition(cond) - - if not builder.conditions: - continue - - sql = SQLLoader.load_with_params( - config['sql_name'], - **{config['filter_key']: builder.get_conditions_sql()}, - ) - df = read_sql_df(sql, builder.params) - data = _df_to_records(df) - matched, _, _ = _match_rows_by_tokens( - tokens, - data, - row_key=config['match_key'], - ) - - for row in matched: - input_value = str(row.get('input_value') or '').strip() - cid = str(row.get('CONTAINERID') or '').strip() - if not input_value or not cid: - continue - - candidate = { - 'container_id': cid, - 'lot_id': row.get('CONTAINERNAME') or cid, - 'input_value': input_value, - 'spec_name': row.get('SPECNAME'), - 'match_source': config['name'], - '_priority': config['priority'], - } - key = (input_value, cid) - existing = best_match_by_key.get(key) - if existing is None or candidate['_priority'] < existing['_priority']: - best_match_by_key[key] = candidate - - grouped_by_input: Dict[str, List[Dict[str, Any]]] = {} - for item in best_match_by_key.values(): - grouped_by_input.setdefault(item['input_value'], []).append(item) - - results: List[Dict[str, Any]] = [] - not_found: List[str] = [] - expansion_info: Dict[str, int] = {} - - for token in tokens: - rows = grouped_by_input.get(token, []) - rows.sort(key=lambda row: (row.get('_priority', 999), str(row.get('lot_id') or ''))) - if not rows: - not_found.append(token) - continue - - expansion_info[token] = len(rows) - for row in rows: - row.pop('_priority', None) - results.append(row) - - logger.info( - "Serial number resolution: %s containers from %s inputs (not_found=%s)", - len(results), - len(tokens), - len(not_found), - ) - - return { - 'data': results, - 'total': len(results), - 'input_count': len(tokens), - 'not_found': not_found, - 'expansion_info': expansion_info, - } +def _resolve_by_wafer_lot(wafer_lots: List[str]) -> Dict[str, Any]: + """Resolve wafer lot values (FIRSTNAME) to CONTAINERID.""" + builder = QueryBuilder() + _add_exact_or_pattern_condition(builder, "FIRSTNAME", wafer_lots) + builder.add_condition("OBJECTTYPE = 'LOT'") + sql = SQLLoader.load_with_params( + "query_tool/lot_resolve_wafer_lot", + WAFER_FILTER=builder.get_conditions_sql(), + ) + + df = read_sql_df(sql, builder.params) + data = _df_to_records(df) + matched, not_found, expansion_info = _match_rows_by_tokens( + wafer_lots, + data, + row_key='FIRSTNAME', + ) + + results = [] + for row in matched: + cid = row.get('CONTAINERID') + if not cid: + continue + results.append({ + 'container_id': cid, + 'lot_id': row.get('CONTAINERNAME'), + 'input_value': row.get('input_value'), + 'spec_name': row.get('SPECNAME'), + 'qty': row.get('QTY'), + }) + + logger.info(f"Wafer lot resolution: {len(results)} containers from {len(wafer_lots)} wafer lots") + return { + 'data': results, + 'total': len(results), + 'input_count': len(wafer_lots), + 'not_found': not_found, + 'expansion_info': expansion_info, + } -def _resolve_by_work_order(work_orders: List[str]) -> Dict[str, Any]: +def _is_gd_like(value: str) -> bool: + text = str(value or '').strip().upper() + return text.startswith('GD') + + +def _literal_prefix_before_wildcard(value: str) -> str: + token = _normalize_wildcard_token(value) + for idx, ch in enumerate(token): + if ch in ('%', '_'): + return token[:idx] + return token + + +def _resolve_by_gd_lot_id(gd_lot_ids: List[str]) -> Dict[str, Any]: + """Resolve GD lot IDs to CONTAINERID with strict GD validation.""" + invalid = [value for value in gd_lot_ids if not _is_gd_like(_literal_prefix_before_wildcard(value))] + if invalid: + return {'error': f'GD LOT ID 格式錯誤: {", ".join(invalid)}'} + + builder = QueryBuilder() + _add_exact_or_pattern_condition(builder, "CONTAINERNAME", gd_lot_ids, case_insensitive=True) + builder.add_condition("(UPPER(NVL(CONTAINERNAME, '')) LIKE 'GD%' OR UPPER(NVL(MFGORDERNAME, '')) LIKE 'GD%')") + sql = SQLLoader.load_with_params( + "query_tool/lot_resolve_id", + CONTAINER_FILTER=builder.get_conditions_sql(), + ) + + df = read_sql_df(sql, builder.params) + data = _df_to_records(df) + matched, not_found, expansion_info = _match_rows_by_tokens( + gd_lot_ids, + data, + row_key='CONTAINERNAME', + case_insensitive=True, + ) + + results = [] + for row in matched: + results.append({ + 'container_id': row.get('CONTAINERID'), + 'lot_id': row.get('CONTAINERNAME'), + 'input_value': row.get('input_value'), + 'spec_name': row.get('SPECNAME'), + 'qty': row.get('QTY'), + }) + + logger.info(f"GD lot resolution: {len(results)} found, {len(not_found)} not found") + return { + 'data': results, + 'total': len(results), + 'input_count': len(gd_lot_ids), + 'not_found': not_found, + 'expansion_info': expansion_info, + } + + +def _resolve_by_serial_number(serial_numbers: List[str]) -> Dict[str, Any]: + """Resolve serial-related inputs to CONTAINERID. + + Matching sources (in priority order): + 1. DW_MES_PJ_COMBINEDASSYLOTS.FINISHEDNAME (new serial path) + 2. DW_MES_CONTAINER.CONTAINERNAME (old serial / lot-id style inputs) + 3. DW_MES_CONTAINER.FIRSTNAME (bridge from serial to related lots) + """ + tokens = _normalize_search_tokens(serial_numbers) + if not tokens: + return { + 'data': [], + 'total': 0, + 'input_count': 0, + 'not_found': [], + 'expansion_info': {}, + } + + source_configs = [ + { + 'name': 'finished_name', + 'priority': 0, + 'sql_name': 'query_tool/lot_resolve_serial', + 'filter_key': 'SERIAL_FILTER', + 'filter_column': 'p.FINISHEDNAME', + 'match_key': 'FINISHEDNAME', + 'extra_conditions': [], + }, + { + 'name': 'container_name', + 'priority': 1, + 'sql_name': 'query_tool/lot_resolve_id', + 'filter_key': 'CONTAINER_FILTER', + 'filter_column': 'CONTAINERNAME', + 'match_key': 'CONTAINERNAME', + 'extra_conditions': ["OBJECTTYPE = 'LOT'"], + }, + { + 'name': 'first_name', + 'priority': 2, + 'sql_name': 'query_tool/lot_resolve_wafer_lot', + 'filter_key': 'WAFER_FILTER', + 'filter_column': 'FIRSTNAME', + 'match_key': 'FIRSTNAME', + 'extra_conditions': ["OBJECTTYPE = 'LOT'"], + }, + ] + + best_match_by_key: Dict[Tuple[str, str], Dict[str, Any]] = {} + + for config in source_configs: + builder = QueryBuilder() + _add_exact_or_pattern_condition(builder, config['filter_column'], tokens) + for cond in config['extra_conditions']: + builder.add_condition(cond) + + if not builder.conditions: + continue + + sql = SQLLoader.load_with_params( + config['sql_name'], + **{config['filter_key']: builder.get_conditions_sql()}, + ) + df = read_sql_df(sql, builder.params) + data = _df_to_records(df) + matched, _, _ = _match_rows_by_tokens( + tokens, + data, + row_key=config['match_key'], + ) + + for row in matched: + input_value = str(row.get('input_value') or '').strip() + cid = str(row.get('CONTAINERID') or '').strip() + if not input_value or not cid: + continue + + candidate = { + 'container_id': cid, + 'lot_id': row.get('CONTAINERNAME') or cid, + 'input_value': input_value, + 'spec_name': row.get('SPECNAME'), + 'match_source': config['name'], + '_priority': config['priority'], + } + key = (input_value, cid) + existing = best_match_by_key.get(key) + if existing is None or candidate['_priority'] < existing['_priority']: + best_match_by_key[key] = candidate + + grouped_by_input: Dict[str, List[Dict[str, Any]]] = {} + for item in best_match_by_key.values(): + grouped_by_input.setdefault(item['input_value'], []).append(item) + + results: List[Dict[str, Any]] = [] + not_found: List[str] = [] + expansion_info: Dict[str, int] = {} + + for token in tokens: + rows = grouped_by_input.get(token, []) + rows.sort(key=lambda row: (row.get('_priority', 999), str(row.get('lot_id') or ''))) + if not rows: + not_found.append(token) + continue + + expansion_info[token] = len(rows) + for row in rows: + row.pop('_priority', None) + results.append(row) + + logger.info( + "Serial number resolution: %s containers from %s inputs (not_found=%s)", + len(results), + len(tokens), + len(not_found), + ) + + return { + 'data': results, + 'total': len(results), + 'input_count': len(tokens), + 'not_found': not_found, + 'expansion_info': expansion_info, + } + + +def _resolve_by_work_order(work_orders: List[str]) -> Dict[str, Any]: """Resolve work orders (MFGORDERNAME) to CONTAINERID. Note: One work order may expand to many CONTAINERIDs (can be 100+). @@ -644,93 +644,93 @@ def _resolve_by_work_order(work_orders: List[str]) -> Dict[str, Any]: Args: work_orders: List of work orders - Returns: - Resolution result dict. - """ - invalid = [value for value in work_orders if _is_gd_like(_literal_prefix_before_wildcard(value))] - if invalid: - return {'error': f'正向工單僅支援 GA/GC,請改用反向 GD 工單查詢: {", ".join(invalid)}'} - - builder = QueryBuilder() - _add_exact_or_pattern_condition(builder, "MFGORDERNAME", work_orders, case_insensitive=True) - builder.add_condition("(UPPER(NVL(MFGORDERNAME, '')) LIKE 'GA%' OR UPPER(NVL(MFGORDERNAME, '')) LIKE 'GC%')") - sql = SQLLoader.load_with_params( - "query_tool/lot_resolve_work_order", - WORK_ORDER_FILTER=builder.get_conditions_sql(), - ) - - df = read_sql_df(sql, builder.params) - data = _df_to_records(df) - matched, not_found, expansion_info = _match_rows_by_tokens( - work_orders, - data, - row_key='MFGORDERNAME', - case_insensitive=True, - ) - - results = [] - for row in matched: - results.append({ - 'container_id': row.get('CONTAINERID'), - 'lot_id': row.get('CONTAINERNAME'), - 'input_value': row.get('input_value'), - 'spec_name': row.get('SPECNAME'), - }) + Returns: + Resolution result dict. + """ + invalid = [value for value in work_orders if _is_gd_like(_literal_prefix_before_wildcard(value))] + if invalid: + return {'error': f'正向工單僅支援 GA/GC,請改用反向 GD 工單查詢: {", ".join(invalid)}'} + + builder = QueryBuilder() + _add_exact_or_pattern_condition(builder, "MFGORDERNAME", work_orders, case_insensitive=True) + builder.add_condition("(UPPER(NVL(MFGORDERNAME, '')) LIKE 'GA%' OR UPPER(NVL(MFGORDERNAME, '')) LIKE 'GC%')") + sql = SQLLoader.load_with_params( + "query_tool/lot_resolve_work_order", + WORK_ORDER_FILTER=builder.get_conditions_sql(), + ) + + df = read_sql_df(sql, builder.params) + data = _df_to_records(df) + matched, not_found, expansion_info = _match_rows_by_tokens( + work_orders, + data, + row_key='MFGORDERNAME', + case_insensitive=True, + ) + + results = [] + for row in matched: + results.append({ + 'container_id': row.get('CONTAINERID'), + 'lot_id': row.get('CONTAINERNAME'), + 'input_value': row.get('input_value'), + 'spec_name': row.get('SPECNAME'), + }) logger.info(f"Work order resolution: {len(results)} containers from {len(work_orders)} orders") - return { - 'data': results, - 'total': len(results), - 'input_count': len(work_orders), - 'not_found': not_found, - 'expansion_info': expansion_info, - } - - -def _resolve_by_gd_work_order(work_orders: List[str]) -> Dict[str, Any]: - """Resolve GD work orders to CONTAINERID.""" - invalid = [value for value in work_orders if not _is_gd_like(_literal_prefix_before_wildcard(value))] - if invalid: - return {'error': f'GD 工單格式錯誤: {", ".join(invalid)}'} - - builder = QueryBuilder() - _add_exact_or_pattern_condition(builder, "MFGORDERNAME", work_orders, case_insensitive=True) - builder.add_condition("UPPER(NVL(MFGORDERNAME, '')) LIKE 'GD%'") - sql = SQLLoader.load_with_params( - "query_tool/lot_resolve_work_order", - WORK_ORDER_FILTER=builder.get_conditions_sql(), - ) - - df = read_sql_df(sql, builder.params) - data = _df_to_records(df) - matched, not_found, expansion_info = _match_rows_by_tokens( - work_orders, - data, - row_key='MFGORDERNAME', - case_insensitive=True, - ) - - results = [] - for row in matched: - cid = row.get('CONTAINERID') - if not cid: - continue - results.append({ - 'container_id': cid, - 'lot_id': row.get('CONTAINERNAME'), - 'input_value': row.get('input_value'), - 'spec_name': row.get('SPECNAME'), - }) - - logger.info(f"GD work order resolution: {len(results)} containers from {len(work_orders)} orders") - return { - 'data': results, - 'total': len(results), - 'input_count': len(work_orders), - 'not_found': not_found, - 'expansion_info': expansion_info, - } + return { + 'data': results, + 'total': len(results), + 'input_count': len(work_orders), + 'not_found': not_found, + 'expansion_info': expansion_info, + } + + +def _resolve_by_gd_work_order(work_orders: List[str]) -> Dict[str, Any]: + """Resolve GD work orders to CONTAINERID.""" + invalid = [value for value in work_orders if not _is_gd_like(_literal_prefix_before_wildcard(value))] + if invalid: + return {'error': f'GD 工單格式錯誤: {", ".join(invalid)}'} + + builder = QueryBuilder() + _add_exact_or_pattern_condition(builder, "MFGORDERNAME", work_orders, case_insensitive=True) + builder.add_condition("UPPER(NVL(MFGORDERNAME, '')) LIKE 'GD%'") + sql = SQLLoader.load_with_params( + "query_tool/lot_resolve_work_order", + WORK_ORDER_FILTER=builder.get_conditions_sql(), + ) + + df = read_sql_df(sql, builder.params) + data = _df_to_records(df) + matched, not_found, expansion_info = _match_rows_by_tokens( + work_orders, + data, + row_key='MFGORDERNAME', + case_insensitive=True, + ) + + results = [] + for row in matched: + cid = row.get('CONTAINERID') + if not cid: + continue + results.append({ + 'container_id': cid, + 'lot_id': row.get('CONTAINERNAME'), + 'input_value': row.get('input_value'), + 'spec_name': row.get('SPECNAME'), + }) + + logger.info(f"GD work order resolution: {len(results)} containers from {len(work_orders)} orders") + return { + 'data': results, + 'total': len(results), + 'input_count': len(work_orders), + 'not_found': not_found, + 'expansion_info': expansion_info, + } # ============================================================ @@ -874,7 +874,7 @@ def get_adjacent_lots( # LOT Batch Query Functions # ============================================================ -def get_lot_history_batch( +def get_lot_history_batch( container_ids: List[str], workcenter_groups: Optional[List[str]] = None, ) -> Dict[str, Any]: @@ -886,14 +886,14 @@ def get_lot_history_batch( Returns: Dict with 'data' (merged history records) and 'total'. - """ - if not container_ids: - return {'error': '請指定 CONTAINERID'} - max_ids = _max_batch_container_ids() - if len(container_ids) > max_ids: - return {'error': f'container_ids 數量不可超過 {max_ids} 筆'} - - try: + """ + if not container_ids: + return {'error': '請指定 CONTAINERID'} + max_ids = _max_batch_container_ids() + if len(container_ids) > max_ids: + return {'error': f'container_ids 數量不可超過 {max_ids} 筆'} + + try: events_by_cid = EventFetcher.fetch_events(container_ids, "history") rows = [] @@ -929,10 +929,10 @@ def get_lot_history_batch( return {'error': f'查詢失敗: {str(exc)}'} -def get_lot_associations_batch( - container_ids: List[str], - assoc_type: str, -) -> Dict[str, Any]: +def get_lot_associations_batch( + container_ids: List[str], + assoc_type: str, +) -> Dict[str, Any]: """Get association data for multiple LOTs in a single EventFetcher call. Args: @@ -941,30 +941,30 @@ def get_lot_associations_batch( Returns: Dict with 'data' (merged records) and 'total'. - """ - if not container_ids: - return {'error': '請指定 CONTAINERID'} - max_ids = _max_batch_container_ids() - if len(container_ids) > max_ids: - return {'error': f'container_ids 數量不可超過 {max_ids} 筆'} - - valid_batch_types = {'materials', 'rejects', 'holds'} + """ + if not container_ids: + return {'error': '請指定 CONTAINERID'} + max_ids = _max_batch_container_ids() + if len(container_ids) > max_ids: + return {'error': f'container_ids 數量不可超過 {max_ids} 筆'} + + valid_batch_types = {'materials', 'rejects', 'holds'} if assoc_type not in valid_batch_types: return {'error': f'批次查詢不支援類型: {assoc_type}'} - try: - events_by_cid = EventFetcher.fetch_events(container_ids, assoc_type) - - rows = [] - for cid in container_ids: - rows.extend(events_by_cid.get(cid, [])) - - # Keep timeline grouping consistent with history rows. - # Especially for materials, workcenter names like "焊_DB_料" need to map - # to the same WORKCENTER_GROUP used by LOT history tracks. - _enrich_workcenter_group(rows) - - data = _df_to_records(pd.DataFrame(rows)) + try: + events_by_cid = EventFetcher.fetch_events(container_ids, assoc_type) + + rows = [] + for cid in container_ids: + rows.extend(events_by_cid.get(cid, [])) + + # Keep timeline grouping consistent with history rows. + # Especially for materials, workcenter names like "焊_DB_料" need to map + # to the same WORKCENTER_GROUP used by LOT history tracks. + _enrich_workcenter_group(rows) + + data = _df_to_records(pd.DataFrame(rows)) logger.debug( f"LOT {assoc_type} batch: {len(data)} records for " @@ -986,7 +986,7 @@ def get_lot_associations_batch( # LOT Association Functions # ============================================================ -def get_lot_materials(container_id: str) -> Dict[str, Any]: +def get_lot_materials(container_id: str) -> Dict[str, Any]: """Get material consumption records for a LOT. Args: @@ -998,11 +998,11 @@ def get_lot_materials(container_id: str) -> Dict[str, Any]: if not container_id: return {'error': '請指定 CONTAINERID'} - try: - events_by_cid = EventFetcher.fetch_events([container_id], "materials") - rows = list(events_by_cid.get(container_id, [])) - _enrich_workcenter_group(rows) - data = _df_to_records(pd.DataFrame(rows)) + try: + events_by_cid = EventFetcher.fetch_events([container_id], "materials") + rows = list(events_by_cid.get(container_id, [])) + _enrich_workcenter_group(rows) + data = _df_to_records(pd.DataFrame(rows)) logger.debug(f"LOT materials: {len(data)} records for {container_id}") @@ -1411,6 +1411,66 @@ def get_lot_jobs( return {'error': f'查詢失敗: {str(exc)}'} +def get_lot_jobs_with_history( + equipment_id: str, + time_start: str, + time_end: str +) -> Dict[str, Any]: + """Get JOB records with full transaction history for export. + + Joins DW_MES_JOB with DW_MES_JOBTXNHISTORY so each row contains + both job-level and transaction-level columns, matching the pattern + used by the job-query export. + + Args: + equipment_id: Equipment ID (RESOURCEID) + time_start: Start time (ISO format) + time_end: End time (ISO format) + + Returns: + Dict with 'data' (flattened job+txn records) and 'total', or 'error'. + """ + if not all([equipment_id, time_start, time_end]): + return {'error': '請指定設備和時間範圍'} + + try: + if isinstance(time_start, str): + start = datetime.strptime(time_start, '%Y-%m-%d %H:%M:%S') + else: + start = time_start + + if isinstance(time_end, str): + end = datetime.strptime(time_end, '%Y-%m-%d %H:%M:%S') + else: + end = time_end + + sql = SQLLoader.load("query_tool/lot_jobs_with_txn") + params = { + 'equipment_id': equipment_id, + 'time_start': start, + 'time_end': end, + } + + df = read_sql_df(sql, params) + data = _df_to_records(df) + + logger.debug( + f"LOT jobs with txn history: {len(data)} records for {equipment_id}" + ) + + return { + 'data': data, + 'total': len(data), + 'equipment_id': equipment_id, + } + + except Exception as exc: + logger.error( + f"LOT jobs with txn history query failed for {equipment_id}: {exc}" + ) + return {'error': f'查詢失敗: {str(exc)}'} + + # ============================================================ # Equipment Period Query Functions # ============================================================ diff --git a/src/mes_dashboard/sql/query_tool/lot_jobs_with_txn.sql b/src/mes_dashboard/sql/query_tool/lot_jobs_with_txn.sql new file mode 100644 index 0000000..1a6127c --- /dev/null +++ b/src/mes_dashboard/sql/query_tool/lot_jobs_with_txn.sql @@ -0,0 +1,38 @@ +-- LOT Related JOB Records with Transaction History Export +-- Joins JOB with JOBTXNHISTORY for complete CSV export +-- +-- Parameters: +-- :equipment_id - Equipment ID (EQUIPMENTID = RESOURCEID) +-- :time_start - Start time of LOT processing +-- :time_end - End time of LOT processing + +SELECT + j.RESOURCENAME, + j.JOBID, + j.JOBSTATUS AS JOB_FINAL_STATUS, + j.JOBMODELNAME, + j.JOBORDERNAME, + j.CREATEDATE AS JOB_CREATEDATE, + j.COMPLETEDATE AS JOB_COMPLETEDATE, + j.CAUSECODENAME AS JOB_CAUSECODENAME, + j.REPAIRCODENAME AS JOB_REPAIRCODENAME, + j.SYMPTOMCODENAME AS JOB_SYMPTOMCODENAME, + h.TXNDATE, + h.FROMJOBSTATUS, + h.JOBSTATUS AS TXN_JOBSTATUS, + h.STAGENAME, + h.CAUSECODENAME AS TXN_CAUSECODENAME, + h.REPAIRCODENAME AS TXN_REPAIRCODENAME, + h.SYMPTOMCODENAME AS TXN_SYMPTOMCODENAME, + h.USER_NAME, + h.EMP_NAME, + h.COMMENTS +FROM DWH.DW_MES_JOB j +JOIN DWH.DW_MES_JOBTXNHISTORY h ON j.JOBID = h.JOBID +WHERE j.RESOURCEID = :equipment_id + AND ( + (j.CREATEDATE BETWEEN :time_start AND :time_end) + OR (j.COMPLETEDATE BETWEEN :time_start AND :time_end) + OR (j.CREATEDATE <= :time_start AND (j.COMPLETEDATE IS NULL OR j.COMPLETEDATE >= :time_end)) + ) +ORDER BY j.JOBID, h.TXNDATE diff --git a/tests/test_query_tool_routes.py b/tests/test_query_tool_routes.py index a6479f1..8cf7e11 100644 --- a/tests/test_query_tool_routes.py +++ b/tests/test_query_tool_routes.py @@ -1027,6 +1027,39 @@ class TestExportCsvBatchEndpoint: data = response.get_json() assert 'CONTAINERID' in data.get('error', '') + @patch('mes_dashboard.routes.query_tool_routes.get_lot_jobs_with_history') + def test_export_lot_jobs_calls_with_history(self, mock_jobs_hist, client): + """lot_jobs export should call get_lot_jobs_with_history (includes txn).""" + mock_jobs_hist.return_value = { + 'data': [ + { + 'RESOURCENAME': 'ASSY-01', + 'JOBID': 'JOB-001', + 'JOB_FINAL_STATUS': 'Complete', + 'TXNDATE': '2026-01-15 10:00:00', + 'TXN_JOBSTATUS': 'Complete', + 'STAGENAME': 'Repair', + }, + ], + 'total': 1, + } + + response = client.post( + '/api/query-tool/export-csv', + json={ + 'export_type': 'lot_jobs', + 'params': { + 'equipment_id': 'EQ001', + 'time_start': '2026-01-01 00:00:00', + 'time_end': '2026-01-31 23:59:59', + }, + }, + ) + + assert response.status_code == 200 + assert 'text/csv' in response.content_type + mock_jobs_hist.assert_called_once_with('EQ001', '2026-01-01 00:00:00', '2026-01-31 23:59:59') + class TestEquipmentListEndpoint: """Tests for /api/query-tool/equipment-list endpoint."""