import { pipeline } from '@xenova/transformers'; import { parentPort } from 'worker_threads'; import { spawn } from 'node:child_process'; import { execSync } from 'node:child_process'; import { mkdtempSync, rmSync, readFileSync } from 'node:fs'; import { join } from 'node:path'; import { tmpdir } from 'node:os'; import wavefile from 'wavefile'; let whisperPipeline: any; export async function canDiarization(): Promise { return new Promise((resolve) => { const proc = spawn('python', ['-c', 'import pyannote.audio']); proc.on('close', (code: number) => resolve(code === 0)); proc.on('error', () => resolve(false)); }); } async function runDiarization(audioPath: string, dir: string, token: string): Promise { const script = ` import sys import json import os from pyannote.audio import Pipeline os.environ['TORCH_HOME'] = r"${dir}" pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", token="${token}") output = pipeline(sys.argv[1]) segments = [] for turn, speaker in output.speaker_diarization: segments.append({"start": turn.start, "end": turn.end, "speaker": speaker}) print(json.dumps(segments)) `; return new Promise((resolve, reject) => { let output = ''; const proc = spawn('python', ['-c', script, audioPath]); proc.stdout.on('data', (data: Buffer) => output += data.toString()); proc.stderr.on('data', (data: Buffer) => console.error(data.toString())); proc.on('close', (code: number) => { if(code === 0) { try { resolve(JSON.parse(output)); } catch (err) { reject(new Error('Failed to parse diarization output')); } } else { reject(new Error(`Python process exited with code ${code}`)); } }); proc.on('error', reject); }); } function combineSpeakerTranscript(chunks: any[], speakers: any[]): string { const speakerMap = new Map(); let speakerCount = 0; speakers.forEach((seg: any) => { if(!speakerMap.has(seg.speaker)) speakerMap.set(seg.speaker, ++speakerCount); }); const lines: string[] = []; let currentSpeaker = -1; let currentText = ''; chunks.forEach((chunk: any) => { const time = chunk.timestamp[0]; const speaker = speakers.find((s: any) => time >= s.start && time <= s.end); const speakerNum = speaker ? speakerMap.get(speaker.speaker) : 1; if (speakerNum !== currentSpeaker) { if(currentText) lines.push(`[Speaker ${currentSpeaker}]: ${currentText.trim()}`); currentSpeaker = speakerNum; currentText = chunk.text; } else { currentText += chunk.text; } }); if(currentText) lines.push(`[Speaker ${currentSpeaker}]: ${currentText.trim()}`); return lines.join('\n'); } function prepareAudioBuffer(file: string): [string, Float32Array] { let wav: any, tmp; try { wav = new wavefile.WaveFile(readFileSync(file)); } catch(err) { tmp = join(mkdtempSync(join(tmpdir(), 'audio-')), 'converted.wav'); execSync(`ffmpeg -i "${file}" -ar 16000 -ac 1 -f wav "${tmp}"`, { stdio: 'ignore' }); wav = new wavefile.WaveFile(readFileSync(tmp)); } finally { wav.toBitDepth('32f'); wav.toSampleRate(16000); const samples = wav.getSamples(); if(Array.isArray(samples)) { const left = samples[0]; const right = samples[1]; const buffer = new Float32Array(left.length); for (let i = 0; i < left.length; i++) buffer[i] = (left[i] + right[i]) / 2; return [tmp || file, buffer]; } return [tmp || file, samples]; } } parentPort?.on('message', async ({ file, speaker, model, modelDir, token }) => { try { if(!whisperPipeline) whisperPipeline = await pipeline('automatic-speech-recognition', `Xenova/${model}`, {cache_dir: modelDir, quantized: true}); // Prepare audio file const [f, buffer] = prepareAudioBuffer(file); // Fetch transcript and speakers const hasDiarization = speaker && await canDiarization(); const [transcript, speakers] = await Promise.all([ whisperPipeline(buffer, {return_timestamps: speaker ? 'word' : false}), (!speaker || !token || !hasDiarization) ? Promise.resolve(): runDiarization(f, modelDir, token), ]); if(file != f) rmSync(f, { recursive: true, force: true }); // Return any results / errors if no more processing required const text = transcript.text?.trim() || null; if(!speaker) return parentPort?.postMessage({ text }); if(!token) return parentPort?.postMessage({ text, error: 'HuggingFace token required' }); if(!hasDiarization) return parentPort?.postMessage({ text, error: 'Speaker diarization unavailable' }); // Combine transcript and speakers const combined = combineSpeakerTranscript(transcript.chunks || [], speakers || []); parentPort?.postMessage({ text: combined }); } catch (err: any) { parentPort?.postMessage({ error: err.stack || err.message }); } });