import { spawn } from 'node:child_process'; import { pipeline } from '@xenova/transformers'; import { AbortablePromise, Ai } from './ai.ts'; export class Audio { private whisperPipeline: any; constructor(private ai: Ai) {} private combineSpeakerTranscript(chunks: any[], speakers: any[]): string { const speakerMap = new Map(); let speakerCount = 0; speakers.forEach((seg: any) => { if(!speakerMap.has(seg.speaker)) speakerMap.set(seg.speaker, ++speakerCount); }); const lines: string[] = []; let currentSpeaker = -1; let currentText = ''; chunks.forEach((chunk: any) => { const time = chunk.timestamp[0]; const speaker = speakers.find((s: any) => time >= s.start && time <= s.end); const speakerNum = speaker ? speakerMap.get(speaker.speaker) : 1; if (speakerNum !== currentSpeaker) { if(currentText) lines.push(`[speaker ${currentSpeaker}]: ${currentText.trim()}`); currentSpeaker = speakerNum; currentText = chunk.text; } else { currentText += chunk.text; } }); if(currentText) lines.push(`[speaker ${currentSpeaker}]: ${currentText.trim()}`); return lines.join('\n'); } async canDiarization(): Promise { return new Promise((resolve) => { const proc = spawn('python3', ['-c', 'import pyannote.audio']); proc.on('close', (code: number) => resolve(code === 0)); proc.on('error', () => resolve(false)); }); } private async runDiarization(audioPath: string): Promise { if(!await this.canDiarization()) throw new Error('Pyannote is not installed: pip install pyannote.audio'); const script = ` import sys import json from pyannote.audio import Pipeline os.environ['TORCH_HOME'] = "${this.ai.options.path}" pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1") diarization = pipeline(sys.argv[1]) segments = [] for turn, _, speaker in diarization.itertracks(yield_label=True): segments.append({ "start": turn.start, "end": turn.end, "speaker": speaker }) print(json.dumps(segments)) `; return new Promise((resolve, reject) => { let output = ''; const proc = spawn('python3', ['-c', script, audioPath]); proc.stdout.on('data', (data: Buffer) => output += data.toString()); proc.stderr.on('data', (data: Buffer) => console.error(data.toString())); proc.on('close', (code: number) => { if(code === 0) { try { resolve(JSON.parse(output)); } catch (err) { reject(new Error('Failed to parse diarization output')); } } else { reject(new Error(`Python process exited with code ${code}`)); } }); proc.on('error', reject); }); } asr(path: string, options: { model?: string; speaker?: boolean } = {}): AbortablePromise { const { model = this.ai.options.asr || 'whisper-base', speaker = false } = options; let aborted = false; const abort = () => { aborted = true; }; const p = new Promise(async (resolve, reject) => { try { if(aborted) return resolve(null); if(!this.whisperPipeline) this.whisperPipeline = await pipeline('automatic-speech-recognition', `Xenova/${model}`, { cache_dir: this.ai.options.path, quantized: true }); // Transcript if(aborted) return resolve(null); const transcriptResult = await this.whisperPipeline(path, {return_timestamps: speaker ? 'word' : false, chunk_length_s: 30,}); if(!speaker) return resolve(transcriptResult.text?.trim() || null); // Speaker Diarization if(aborted) return resolve(null); const speakers = await this.runDiarization(path); if(aborted) return resolve(null); const combined = this.combineSpeakerTranscript(transcriptResult.chunks || [], speakers); resolve(combined); } catch (err) { reject(err); } }); return Object.assign(p, { abort }); } }