DataLabelingApp / task_manager.py
sunnyzjx's picture
Update task_manager.py
a99121d verified
import gradio as gr
from data_processing import load_tasks
from annotation import save_annotations
tasks = load_tasks()
current_task = 0
def get_current_task_with_annotations(annotation_results):
"""获取当前任务信息,应用已有标注的样式(用于初始加载)"""
task = tasks[current_task]
current_choice = annotation_results.get(current_task) if annotation_results else None
# 基础音频数据
audioA_data = (task["audioA"][1], task["audioA"][0]) # (rate, data)
audioB_data = (task["audioB"][1], task["audioB"][0]) # (rate, data)
# 应用选择样式
if current_choice == "A":
audioA_styled = gr.update(value=audioA_data, elem_classes="selected")
audioB_styled = gr.update(value=audioB_data, elem_classes="")
elif current_choice == "B":
audioA_styled = gr.update(value=audioA_data, elem_classes="")
audioB_styled = gr.update(value=audioB_data, elem_classes="selected")
else:
audioA_styled = gr.update(value=audioA_data, elem_classes="")
audioB_styled = gr.update(value=audioB_data, elem_classes="")
return (
task["instruction"],
task["text"],
audioA_styled,
audioB_styled,
current_task == 0,
current_task == len(tasks) - 1,
current_task + 1
)
def get_current_task(annotation_results=None, styled=False):
"""获取当前任务信息,可选择是否应用样式"""
task = tasks[current_task]
if styled and annotation_results is not None:
# 返回带样式的版本(用于更新已有界面)
current_choice = annotation_results.get(current_task)
audioA_data = (task["audioA"][1], task["audioA"][0]) # (rate, data)
audioB_data = (task["audioB"][1], task["audioB"][0]) # (rate, data)
if current_choice == "A":
audioA_styled = gr.update(value=audioA_data, elem_classes="selected")
audioB_styled = gr.update(value=audioB_data, elem_classes="")
elif current_choice == "B":
audioA_styled = gr.update(value=audioA_data, elem_classes="")
audioB_styled = gr.update(value=audioB_data, elem_classes="selected")
else:
audioA_styled = gr.update(value=audioA_data, elem_classes="")
audioB_styled = gr.update(value=audioB_data, elem_classes="")
return (
task["instruction"],
task["text"],
audioA_styled,
audioB_styled,
current_task == 0,
current_task == len(tasks) - 1,
current_task + 1
)
else:
# 返回原始格式(用于初始化界面)
return (
task["instruction"],
task["text"],
task["audioA"][0], # audioA_data
task["audioA"][1], # audioA_rate
task["audioB"][0], # audioB_data
task["audioB"][1], # audioB_rate
current_task == 0,
current_task == len(tasks) - 1,
current_task + 1
)
def apply_selection_style(audioA, audioB, choice):
"""根据选择结果应用样式"""
if choice == "A":
return (
gr.update(value=audioA, elem_classes="selected"),
gr.update(value=audioB, elem_classes="")
)
elif choice == "B":
return (
gr.update(value=audioA, elem_classes=""),
gr.update(value=audioB, elem_classes="selected")
)
else:
return (
gr.update(value=audioA, elem_classes=""),
gr.update(value=audioB, elem_classes="")
)
def select_audio(choice, audioA, audioB, annotation_results, username):
"""记录选择并更新UI高亮,自动保存标注结果"""
annotation_results[current_task] = choice
print(f"任务 {current_task}: 选择 {choice}")
# 自动保存标注结果
save_result = save_annotations(username, annotation_results, tasks)
print(f"自动保存结果: {save_result}")
audioA_update, audioB_update = apply_selection_style(audioA, audioB, choice)
return audioA_update, audioB_update, annotation_results
def change_task(direction, annotation_results, username):
"""切换任务"""
global current_task
if direction == "prev" and current_task > 0:
current_task -= 1
elif direction == "next" and current_task < len(tasks) - 1:
current_task += 1
# 使用带样式的版本
inst, text, audioA_update, audioB_update, prev_disabled, next_disabled, task_num = get_current_task(
annotation_results, styled=True)
total_tasks = get_total_tasks()
# 生成合并的用户信息和任务编号HTML
combined_task_info = f'<div class="user-task-info"><span>👤 当前用户: {username}</span><span><strong>任务编号: {task_num} / {total_tasks}</strong></span></div>'
return (
inst, text,
audioA_update,
audioB_update,
gr.update(interactive=not prev_disabled),
gr.update(interactive=not next_disabled),
gr.update(value=combined_task_info), # 更新任务编号位置的合并信息
annotation_results
)
def get_total_tasks():
"""返回总任务数"""
return len(tasks)