Software: Apache. PHP/8.1.30 uname -a: Linux server1.tuhinhossain.com 5.15.0-163-generic #173-Ubuntu SMP Tue Oct 14 17:51:00 UTC uid=1002(picotech) gid=1003(picotech) groups=1003(picotech),0(root) Safe-mode: OFF (not secure) /home/picotech/domains/note.picotech.app/public_html/src/services/ drwxr-xr-x | |
| Viewing file: Select action/file-type: const { Recording, AudioChunk } = require('../models');
const ffmpegService = require('./ffmpegService');
const transcriptionService = require('./transcriptionService');
const transcriptionAggregationService = require('./transcriptionAggregationService');
let liveTranscriptionService = null;
const getLiveTranscriptionService = () => {
if (!liveTranscriptionService) {
liveTranscriptionService = require('./liveTranscriptionService').getInstance();
}
return liveTranscriptionService;
};
const { formatTranscript } = require('./transcriptionService');
let isProcessing = false;
const MAX_CONCURRENT_JOBS = 20; // Process 2 recordings at a time
let chunkingQueue = [];
let transcriptionQueue = [];
/**
* Add recording to chunking queue
*/
async function addToChunkingQueue(recordingId) {
if (!chunkingQueue.includes(recordingId)) {
chunkingQueue.push(recordingId);
console.log(`Added recording ${recordingId} to chunking queue`);
processQueues();
}
}
/**
* Add chunk to transcription queue
*/
async function addToTranscriptionQueue(chunkId) {
if (!transcriptionQueue.includes(chunkId)) {
transcriptionQueue.push(chunkId);
console.log(`Added chunk ${chunkId} to transcription queue`);
processQueues();
}
}
/**
* Process both queues
*/
async function processQueues() {
if (isProcessing) return;
isProcessing = true;
try {
// Process chunking queue first
while (chunkingQueue.length > 0 && getActiveChunkingJobs() < MAX_CONCURRENT_JOBS) {
const recordingId = chunkingQueue.shift();
await processChunkingJob(recordingId);
}
// Process transcription queue
while (transcriptionQueue.length > 0 && getActiveTranscriptionJobs() < MAX_CONCURRENT_JOBS) {
const chunkId = transcriptionQueue.shift();
await processTranscriptionJob(chunkId);
}
} catch (error) {
console.error('Error processing queues:', error);
} finally {
isProcessing = false;
}
// Continue processing if there are more jobs
if (chunkingQueue.length > 0 || transcriptionQueue.length > 0) {
setTimeout(() => processQueues(), 1000); // Wait 1 second before next batch
}
}
/**
* Process a single chunking job
*/
async function processChunkingJob(recordingId) {
try {
const recording = await Recording.findByPk(recordingId);
if (!recording) {
console.warn(`Recording ${recordingId} not found`);
return;
}
if (recording.chunking_status !== 'pending') {
console.log(`Recording ${recordingId} chunking status is ${recording.chunking_status}, skipping`);
return;
}
console.log(`Starting chunking for recording ${recordingId}`);
await recording.updateChunkingStatus('processing');
const chunks = await ffmpegService.createChunks(recording);
// Add chunks to transcription queue
for (const chunk of chunks) {
await addToTranscriptionQueue(chunk.id);
}
console.log(`Completed chunking for recording ${recordingId}, created ${chunks.length} chunks`);
} catch (error) {
console.error(`Error processing chunking job for recording ${recordingId}:`, error);
const recording = await Recording.findByPk(recordingId);
if (recording) {
await recording.updateChunkingStatus('failed', error.message);
}
}
}
/**
* Process a single transcription job
*/
async function processTranscriptionJob(chunkId) {
try {
const chunk = await AudioChunk.findByPk(chunkId);
if (!chunk) {
console.warn(`Chunk ${chunkId} not found`);
return;
}
if (chunk.transcription_status !== 'pending') {
console.log(`Chunk ${chunkId} transcription status is ${chunk.transcription_status}, skipping`);
return;
}
console.log(`Starting transcription for chunk ${chunkId}`);
await chunk.updateTranscriptionStatus('processing');
const transcription = await transcriptionService.providers.openrouter.transcribe(chunk.file_path);
// Store transcription data in chunk
await chunk.update({ transcription_data: JSON.stringify(transcription) });
await chunk.updateTranscriptionStatus('completed');
// Get the recording to find the meeting ID
const recording = await Recording.findByPk(chunk.recording_id);
if (recording) {
// Format the transcription for display
const formattedTranscription = formatTranscript(transcription);
// Broadcast transcription update via WebSocket
const service = getLiveTranscriptionService();
console.log('LiveTranscriptionService instance:', service);
if (service) {
service.broadcastTranscriptionUpdate(recording.meeting_id, {
chunkId: chunk.id,
transcription: formattedTranscription,
recordingId: recording.id,
meetingId: recording.meeting_id
});
}
}
console.log(`Completed transcription for chunk ${chunkId}`);
// Check if all chunks for this recording are now transcribed
await transcriptionAggregationService.checkAndAggregateTranscription(chunk.recording_id);
} catch (error) {
console.error(`Error processing transcription job for chunk ${chunkId}:`, error);
const chunk = await AudioChunk.findByPk(chunkId);
if (chunk) {
await chunk.updateTranscriptionStatus('failed', error.message);
}
}
}
/**
* Get number of active chunking jobs
*/
function getActiveChunkingJobs() {
// This is a simple implementation - in production, you'd track active jobs more precisely
return 0; // For now, allow all jobs to run
}
/**
* Get number of active transcription jobs
*/
function getActiveTranscriptionJobs() {
// This is a simple implementation - in production, you'd track active jobs more precisely
return 0; // For now, allow all jobs to run
}
/**
* Start processing pending jobs on startup
*/
async function initialize() {
try {
// Find recordings that need chunking
const pendingChunking = await Recording.findPendingChunking();
for (const recording of pendingChunking) {
await addToChunkingQueue(recording.id);
}
// Find chunks that need transcription
const pendingTranscription = await AudioChunk.findPendingTranscription();
for (const chunk of pendingTranscription) {
await addToTranscriptionQueue(chunk.id);
}
console.log(`Initialized job queue with ${pendingChunking.length} chunking jobs and ${pendingTranscription.length} transcription jobs`);
} catch (error) {
console.error('Error initializing job queue:', error);
}
}
/**
* Get queue status
*/
function getStatus() {
return {
chunkingQueue: chunkingQueue.length,
transcriptionQueue: transcriptionQueue.length,
isProcessing,
};
}
module.exports = {
addToChunkingQueue,
addToTranscriptionQueue,
processQueues,
processChunkingJob,
processTranscriptionJob,
getActiveChunkingJobs,
getActiveTranscriptionJobs,
initialize,
getStatus,
}; |
:: Command execute :: | |
--[ c99shell v. 2.5 [PHP 8 Update] [24.05.2025] | Generation time: 0.0039 ]-- |