Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| from data_processing import load_tasks | |
| from annotation import save_annotations | |
| tasks = load_tasks() | |
| current_task = 0 | |
| def get_current_task_with_annotations(annotation_results): | |
| """获取当前任务信息,应用已有标注的样式(用于初始加载)""" | |
| task = tasks[current_task] | |
| current_choice = annotation_results.get(current_task) if annotation_results else None | |
| # 基础音频数据 | |
| audioA_data = (task["audioA"][1], task["audioA"][0]) # (rate, data) | |
| audioB_data = (task["audioB"][1], task["audioB"][0]) # (rate, data) | |
| # 应用选择样式 | |
| if current_choice == "A": | |
| audioA_styled = gr.update(value=audioA_data, elem_classes="selected") | |
| audioB_styled = gr.update(value=audioB_data, elem_classes="") | |
| elif current_choice == "B": | |
| audioA_styled = gr.update(value=audioA_data, elem_classes="") | |
| audioB_styled = gr.update(value=audioB_data, elem_classes="selected") | |
| else: | |
| audioA_styled = gr.update(value=audioA_data, elem_classes="") | |
| audioB_styled = gr.update(value=audioB_data, elem_classes="") | |
| return ( | |
| task["instruction"], | |
| task["text"], | |
| audioA_styled, | |
| audioB_styled, | |
| current_task == 0, | |
| current_task == len(tasks) - 1, | |
| current_task + 1 | |
| ) | |
| def get_current_task(annotation_results=None, styled=False): | |
| """获取当前任务信息,可选择是否应用样式""" | |
| task = tasks[current_task] | |
| if styled and annotation_results is not None: | |
| # 返回带样式的版本(用于更新已有界面) | |
| current_choice = annotation_results.get(current_task) | |
| audioA_data = (task["audioA"][1], task["audioA"][0]) # (rate, data) | |
| audioB_data = (task["audioB"][1], task["audioB"][0]) # (rate, data) | |
| if current_choice == "A": | |
| audioA_styled = gr.update(value=audioA_data, elem_classes="selected") | |
| audioB_styled = gr.update(value=audioB_data, elem_classes="") | |
| elif current_choice == "B": | |
| audioA_styled = gr.update(value=audioA_data, elem_classes="") | |
| audioB_styled = gr.update(value=audioB_data, elem_classes="selected") | |
| else: | |
| audioA_styled = gr.update(value=audioA_data, elem_classes="") | |
| audioB_styled = gr.update(value=audioB_data, elem_classes="") | |
| return ( | |
| task["instruction"], | |
| task["text"], | |
| audioA_styled, | |
| audioB_styled, | |
| current_task == 0, | |
| current_task == len(tasks) - 1, | |
| current_task + 1 | |
| ) | |
| else: | |
| # 返回原始格式(用于初始化界面) | |
| return ( | |
| task["instruction"], | |
| task["text"], | |
| task["audioA"][0], # audioA_data | |
| task["audioA"][1], # audioA_rate | |
| task["audioB"][0], # audioB_data | |
| task["audioB"][1], # audioB_rate | |
| current_task == 0, | |
| current_task == len(tasks) - 1, | |
| current_task + 1 | |
| ) | |
| def apply_selection_style(audioA, audioB, choice): | |
| """根据选择结果应用样式""" | |
| if choice == "A": | |
| return ( | |
| gr.update(value=audioA, elem_classes="selected"), | |
| gr.update(value=audioB, elem_classes="") | |
| ) | |
| elif choice == "B": | |
| return ( | |
| gr.update(value=audioA, elem_classes=""), | |
| gr.update(value=audioB, elem_classes="selected") | |
| ) | |
| else: | |
| return ( | |
| gr.update(value=audioA, elem_classes=""), | |
| gr.update(value=audioB, elem_classes="") | |
| ) | |
| def select_audio(choice, audioA, audioB, annotation_results, username): | |
| """记录选择并更新UI高亮,自动保存标注结果""" | |
| annotation_results[current_task] = choice | |
| print(f"任务 {current_task}: 选择 {choice}") | |
| # 自动保存标注结果 | |
| save_result = save_annotations(username, annotation_results, tasks) | |
| print(f"自动保存结果: {save_result}") | |
| audioA_update, audioB_update = apply_selection_style(audioA, audioB, choice) | |
| return audioA_update, audioB_update, annotation_results | |
| def change_task(direction, annotation_results, username): | |
| """切换任务""" | |
| global current_task | |
| if direction == "prev" and current_task > 0: | |
| current_task -= 1 | |
| elif direction == "next" and current_task < len(tasks) - 1: | |
| current_task += 1 | |
| # 使用带样式的版本 | |
| inst, text, audioA_update, audioB_update, prev_disabled, next_disabled, task_num = get_current_task( | |
| annotation_results, styled=True) | |
| total_tasks = get_total_tasks() | |
| # 生成合并的用户信息和任务编号HTML | |
| combined_task_info = f'<div class="user-task-info"><span>👤 当前用户: {username}</span><span><strong>任务编号: {task_num} / {total_tasks}</strong></span></div>' | |
| return ( | |
| inst, text, | |
| audioA_update, | |
| audioB_update, | |
| gr.update(interactive=not prev_disabled), | |
| gr.update(interactive=not next_disabled), | |
| gr.update(value=combined_task_info), # 更新任务编号位置的合并信息 | |
| annotation_results | |
| ) | |
| def get_total_tasks(): | |
| """返回总任务数""" | |
| return len(tasks) |