Software: Apache. PHP/8.1.30 uname -a: Linux server1.tuhinhossain.com 5.15.0-151-generic #161-Ubuntu SMP Tue Jul 22 14:25:40 UTC uid=1002(picotech) gid=1003(picotech) groups=1003(picotech),0(root) Safe-mode: OFF (not secure) /home/picotech/domains/wa.picotech.app/public_html/node_modules/thread-stream/ drwxr-xr-x |
Viewing file: Select action/file-type: 'use strict' const { EventEmitter } = require('events') const { Worker } = require('worker_threads') const { join } = require('path') const { pathToFileURL } = require('url') const { wait } = require('./lib/wait') const { WRITE_INDEX, READ_INDEX } = require('./lib/indexes') const buffer = require('buffer') const assert = require('assert') const kImpl = Symbol('kImpl') // V8 limit for string size const MAX_STRING = buffer.constants.MAX_STRING_LENGTH class FakeWeakRef { constructor (value) { this._value = value } deref () { return this._value } } const FinalizationRegistry = global.FinalizationRegistry || class FakeFinalizationRegistry { register () {} unregister () {} } const WeakRef = global.WeakRef || FakeWeakRef const registry = new FinalizationRegistry((worker) => { if (worker.exited) { return } worker.terminate() }) function createWorker (stream, opts) { const { filename, workerData } = opts const bundlerOverrides = '__bundlerPathsOverrides' in globalThis ? globalThis.__bundlerPathsOverrides : {} const toExecute = bundlerOverrides['thread-stream-worker'] || join(__dirname, 'lib', 'worker.js') const worker = new Worker(toExecute, { ...opts.workerOpts, workerData: { filename: filename.indexOf('file://') === 0 ? filename : pathToFileURL(filename).href, dataBuf: stream[kImpl].dataBuf, stateBuf: stream[kImpl].stateBuf, workerData } }) // We keep a strong reference for now, // we need to start writing first worker.stream = new FakeWeakRef(stream) worker.on('message', onWorkerMessage) worker.on('exit', onWorkerExit) registry.register(stream, worker) return worker } function drain (stream) { assert(!stream[kImpl].sync) if (stream[kImpl].needDrain) { stream[kImpl].needDrain = false stream.emit('drain') } } function nextFlush (stream) { const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX) let leftover = stream[kImpl].data.length - writeIndex if (leftover > 0) { if (stream[kImpl].buf.length === 0) { stream[kImpl].flushing = false if (stream[kImpl].ending) { end(stream) } else if (stream[kImpl].needDrain) { process.nextTick(drain, stream) } return } let toWrite = stream[kImpl].buf.slice(0, leftover) let toWriteBytes = Buffer.byteLength(toWrite) if (toWriteBytes <= leftover) { stream[kImpl].buf = stream[kImpl].buf.slice(leftover) // process._rawDebug('writing ' + toWrite.length) write(stream, toWrite, nextFlush.bind(null, stream)) } else { // multi-byte utf-8 stream.flush(() => { // err is already handled in flush() if (stream.destroyed) { return } Atomics.store(stream[kImpl].state, READ_INDEX, 0) Atomics.store(stream[kImpl].state, WRITE_INDEX, 0) // Find a toWrite length that fits the buffer // it must exists as the buffer is at least 4 bytes length // and the max utf-8 length for a char is 4 bytes. while (toWriteBytes > stream[kImpl].data.length) { leftover = leftover / 2 toWrite = stream[kImpl].buf.slice(0, leftover) toWriteBytes = Buffer.byteLength(toWrite) } stream[kImpl].buf = stream[kImpl].buf.slice(leftover) write(stream, toWrite, nextFlush.bind(null, stream)) }) } } else if (leftover === 0) { if (writeIndex === 0 && stream[kImpl].buf.length === 0) { // we had a flushSync in the meanwhile return } stream.flush(() => { Atomics.store(stream[kImpl].state, READ_INDEX, 0) Atomics.store(stream[kImpl].state, WRITE_INDEX, 0) nextFlush(stream) }) } else { // This should never happen throw new Error('overwritten') } } function onWorkerMessage (msg) { const stream = this.stream.deref() if (stream === undefined) { this.exited = true // Terminate the worker. this.terminate() return } switch (msg.code) { case 'READY': // Replace the FakeWeakRef with a // proper one. this.stream = new WeakRef(stream) stream.flush(() => { stream[kImpl].ready = true stream.emit('ready') }) break case 'ERROR': destroy(stream, msg.err) break default: throw new Error('this should not happen: ' + msg.code) } } function onWorkerExit (code) { const stream = this.stream.deref() if (stream === undefined) { // Nothing to do, the worker already exit return } registry.unregister(stream) stream.worker.exited = true stream.worker.off('exit', onWorkerExit) destroy(stream, code !== 0 ? new Error('The worker thread exited') : null) } class ThreadStream extends EventEmitter { constructor (opts = {}) { super() if (opts.bufferSize < 4) { throw new Error('bufferSize must at least fit a 4-byte utf-8 char') } this[kImpl] = {} this[kImpl].stateBuf = new SharedArrayBuffer(128) this[kImpl].state = new Int32Array(this[kImpl].stateBuf) this[kImpl].dataBuf = new SharedArrayBuffer(opts.bufferSize || 4 * 1024 * 1024) this[kImpl].data = Buffer.from(this[kImpl].dataBuf) this[kImpl].sync = opts.sync || false this[kImpl].ending = false this[kImpl].ended = false this[kImpl].needDrain = false this[kImpl].destroyed = false this[kImpl].flushing = false this[kImpl].ready = false this[kImpl].finished = false this[kImpl].errored = null this[kImpl].closed = false this[kImpl].buf = '' // TODO (fix): Make private? this.worker = createWorker(this, opts) // TODO (fix): make private } write (data) { if (this[kImpl].destroyed) { throw new Error('the worker has exited') } if (this[kImpl].ending) { throw new Error('the worker is ending') } if (this[kImpl].flushing && this[kImpl].buf.length + data.length >= MAX_STRING) { try { writeSync(this) this[kImpl].flushing = true } catch (err) { destroy(this, err) return false } } this[kImpl].buf += data if (this[kImpl].sync) { try { writeSync(this) return true } catch (err) { destroy(this, err) return false } } if (!this[kImpl].flushing) { this[kImpl].flushing = true setImmediate(nextFlush, this) } this[kImpl].needDrain = this[kImpl].data.length - this[kImpl].buf.length - Atomics.load(this[kImpl].state, WRITE_INDEX) <= 0 return !this[kImpl].needDrain } end () { if (this[kImpl].destroyed) { return } this[kImpl].ending = true end(this) } flush (cb) { if (this[kImpl].destroyed) { if (typeof cb === 'function') { process.nextTick(cb, new Error('the worker has exited')) } return } // TODO write all .buf const writeIndex = Atomics.load(this[kImpl].state, WRITE_INDEX) // process._rawDebug(`(flush) readIndex (${Atomics.load(this.state, READ_INDEX)}) writeIndex (${Atomics.load(this.state, WRITE_INDEX)})`) wait(this[kImpl].state, READ_INDEX, writeIndex, Infinity, (err, res) => { if (err) { destroy(this, err) process.nextTick(cb, err) return } if (res === 'not-equal') { // TODO handle deadlock this.flush(cb) return } process.nextTick(cb) }) } flushSync () { if (this[kImpl].destroyed) { return } writeSync(this) flushSync(this) } unref () { this.worker.unref() } ref () { this.worker.ref() } get ready () { return this[kImpl].ready } get destroyed () { return this[kImpl].destroyed } get closed () { return this[kImpl].closed } get writable () { return !this[kImpl].destroyed && !this[kImpl].ending } get writableEnded () { return this[kImpl].ending } get writableFinished () { return this[kImpl].finished } get writableNeedDrain () { return this[kImpl].needDrain } get writableObjectMode () { return false } get writableErrored () { return this[kImpl].errored } } function destroy (stream, err) { if (stream[kImpl].destroyed) { return } stream[kImpl].destroyed = true if (err) { stream[kImpl].errored = err stream.emit('error', err) } if (!stream.worker.exited) { stream.worker.terminate() .catch(() => {}) .then(() => { stream[kImpl].closed = true stream.emit('close') }) } else { setImmediate(() => { stream[kImpl].closed = true stream.emit('close') }) } } function write (stream, data, cb) { // data is smaller than the shared buffer length const current = Atomics.load(stream[kImpl].state, WRITE_INDEX) const length = Buffer.byteLength(data) stream[kImpl].data.write(data, current) Atomics.store(stream[kImpl].state, WRITE_INDEX, current + length) Atomics.notify(stream[kImpl].state, WRITE_INDEX) cb() return true } function end (stream) { if (stream[kImpl].ended || !stream[kImpl].ending || stream[kImpl].flushing) { return } stream[kImpl].ended = true try { stream.flushSync() let readIndex = Atomics.load(stream[kImpl].state, READ_INDEX) // process._rawDebug('writing index') Atomics.store(stream[kImpl].state, WRITE_INDEX, -1) // process._rawDebug(`(end) readIndex (${Atomics.load(stream.state, READ_INDEX)}) writeIndex (${Atomics.load(stream.state, WRITE_INDEX)})`) Atomics.notify(stream[kImpl].state, WRITE_INDEX) // Wait for the process to complete let spins = 0 while (readIndex !== -1) { // process._rawDebug(`read = ${read}`) Atomics.wait(stream[kImpl].state, READ_INDEX, readIndex, 1000) readIndex = Atomics.load(stream[kImpl].state, READ_INDEX) if (readIndex === -2) { throw new Error('end() failed') } if (++spins === 10) { throw new Error('end() took too long (10s)') } } process.nextTick(() => { stream[kImpl].finished = true stream.emit('finish') }) } catch (err) { destroy(stream, err) } // process._rawDebug('end finished...') } function writeSync (stream) { const cb = () => { if (stream[kImpl].ending) { end(stream) } else if (stream[kImpl].needDrain) { process.nextTick(drain, stream) } } stream[kImpl].flushing = false while (stream[kImpl].buf.length !== 0) { const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX) let leftover = stream[kImpl].data.length - writeIndex if (leftover === 0) { flushSync(stream) Atomics.store(stream[kImpl].state, READ_INDEX, 0) Atomics.store(stream[kImpl].state, WRITE_INDEX, 0) continue } else if (leftover < 0) { // stream should never happen throw new Error('overwritten') } let toWrite = stream[kImpl].buf.slice(0, leftover) let toWriteBytes = Buffer.byteLength(toWrite) if (toWriteBytes <= leftover) { stream[kImpl].buf = stream[kImpl].buf.slice(leftover) // process._rawDebug('writing ' + toWrite.length) write(stream, toWrite, cb) } else { // multi-byte utf-8 flushSync(stream) Atomics.store(stream[kImpl].state, READ_INDEX, 0) Atomics.store(stream[kImpl].state, WRITE_INDEX, 0) // Find a toWrite length that fits the buffer // it must exists as the buffer is at least 4 bytes length // and the max utf-8 length for a char is 4 bytes. while (toWriteBytes > stream[kImpl].buf.length) { leftover = leftover / 2 toWrite = stream[kImpl].buf.slice(0, leftover) toWriteBytes = Buffer.byteLength(toWrite) } stream[kImpl].buf = stream[kImpl].buf.slice(leftover) write(stream, toWrite, cb) } } } function flushSync (stream) { if (stream[kImpl].flushing) { throw new Error('unable to flush while flushing') } // process._rawDebug('flushSync started') const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX) let spins = 0 // TODO handle deadlock while (true) { const readIndex = Atomics.load(stream[kImpl].state, READ_INDEX) if (readIndex === -2) { throw new Error('_flushSync failed') } // process._rawDebug(`(flushSync) readIndex (${readIndex}) writeIndex (${writeIndex})`) if (readIndex !== writeIndex) { // TODO stream timeouts for some reason. Atomics.wait(stream[kImpl].state, READ_INDEX, readIndex, 1000) } else { break } if (++spins === 10) { throw new Error('_flushSync took too long (10s)') } } // process._rawDebug('flushSync finished') } module.exports = ThreadStream |
:: Command execute :: | |
--[ c99shell v. 2.5 [PHP 8 Update] [24.05.2025] | Generation time: 0.0042 ]-- |