import { pipeline } from '@xenova/transformers'; import { parentPort } from 'worker_threads'; import * as fs from 'node:fs'; import wavefile from 'wavefile'; import { spawn } from 'node:child_process'; let whisperPipeline: any; export async function canDiarization(): Promise { return new Promise((resolve) => { const proc = spawn('python3', ['-c', 'import pyannote.audio']); proc.on('close', (code: number) => resolve(code === 0)); proc.on('error', () => resolve(false)); }); } async function runDiarization(audioPath: string, torchHome: string): Promise { const script = ` import sys import json import os from pyannote.audio import Pipeline os.environ['TORCH_HOME'] = "${torchHome}" pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1") diarization = pipeline(sys.argv[1]) segments = [] for turn, _, speaker in diarization.itertracks(yield_label=True): segments.append({ "start": turn.start, "end": turn.end, "speaker": speaker }) print(json.dumps(segments)) `; return new Promise((resolve, reject) => { let output = ''; const proc = spawn('python3', ['-c', script, audioPath]); proc.stdout.on('data', (data: Buffer) => output += data.toString()); proc.stderr.on('data', (data: Buffer) => console.error(data.toString())); proc.on('close', (code: number) => { if(code === 0) { try { resolve(JSON.parse(output)); } catch (err) { reject(new Error('Failed to parse diarization output')); } } else { reject(new Error(`Python process exited with code ${code}`)); } }); proc.on('error', reject); }); } function combineSpeakerTranscript(chunks: any[], speakers: any[]): string { const speakerMap = new Map(); let speakerCount = 0; speakers.forEach((seg: any) => { if(!speakerMap.has(seg.speaker)) speakerMap.set(seg.speaker, ++speakerCount); }); const lines: string[] = []; let currentSpeaker = -1; let currentText = ''; chunks.forEach((chunk: any) => { const time = chunk.timestamp[0]; const speaker = speakers.find((s: any) => time >= s.start && time <= s.end); const speakerNum = speaker ? speakerMap.get(speaker.speaker) : 1; if (speakerNum !== currentSpeaker) { if(currentText) lines.push(`[speaker ${currentSpeaker}]: ${currentText.trim()}`); currentSpeaker = speakerNum; currentText = chunk.text; } else { currentText += chunk.text; } }); if(currentText) lines.push(`[speaker ${currentSpeaker}]: ${currentText.trim()}`); return lines.join('\n'); } parentPort?.on('message', async ({ path, model, speaker, torchHome }) => { try { if(!whisperPipeline) whisperPipeline = await pipeline('automatic-speech-recognition', `Xenova/${model}`, {cache_dir: torchHome, quantized: true}); // Prepare audio file (convert to mono channel wave) const wav = new wavefile.WaveFile(fs.readFileSync(path)); wav.toBitDepth('32f'); wav.toSampleRate(16000); const samples = wav.getSamples(); let buffer; if(Array.isArray(samples)) { // stereo to mono - average the channels const left = samples[0]; const right = samples[1]; buffer = new Float32Array(left.length); for (let i = 0; i < left.length; i++) buffer[i] = (left[i] + right[i]) / 2; } else { buffer = samples; } // Transcribe const transcriptResult = await whisperPipeline(buffer, {return_timestamps: speaker ? 'word' : false}); if(!speaker) { parentPort?.postMessage({ text: transcriptResult.text?.trim() || null }); return; } // Speaker Diarization const hasDiarization = await canDiarization(); if(!hasDiarization) { parentPort?.postMessage({ text: transcriptResult.text?.trim() || null, warning: 'Speaker diarization unavailable' }); return; } const speakers = await runDiarization(path, torchHome); const combined = combineSpeakerTranscript(transcriptResult.chunks || [], speakers); parentPort?.postMessage({ text: combined }); } catch (err) { parentPort?.postMessage({ error: (err as Error).message }); } });