All files / src/p2p provider.ts

84.61% Statements 33/39
87.5% Branches 7/8
55.55% Functions 5/9
84.61% Lines 33/39

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113                                                          1x             18x 18x 18x 18x     18x 18x 18x 18x 18x 18x 18x 18x 18x       18x   9x     9x 9x 9x         18x           9x 9x 9x 9x 9x                       54x 18x 36x 18x   18x   54x       9x   9x                      
/**
 * @argument encodedMessage a complete message from the lower transport layer
 */
export type P2PDataHandler = (encodedMessage: Uint8Array) => void
 
export type P2PErrorHandler = (error: any) => void
 
export type P2PHandler = () => void
 
export type P2PEventMap = {
    data: P2PDataHandler
    error: P2PErrorHandler
    close: P2PHandler
}
 
/**
 * Provider interface for P2P protocol responsible for re-assembling full message payloads before
 * delivering them upstream via event emission
 */
export interface P2PProvider {
    write(encodedMessage: Uint8Array, done?: P2PHandler): void
    end(cb?: P2PHandler): void
    destroy(err?: Error): void
 
    on<T extends keyof P2PEventMap>(event: T, handler: P2PEventMap[T]): this
    //removeListener<T extends keyof P2PEventMap>(event: T, handler: P2PEventMap[T]): this
}
 
export class SimpleEnvelopeP2PProvider {
    static maxReadLength = 8 * 1024 * 1024
    private declare nextProvider: P2PProvider
    private declare dataHandlers: Array<P2PDataHandler>
    private declare errorHandlers: Array<P2PErrorHandler>
    private declare remainingData: Uint8Array
 
    constructor(nextProvider: P2PProvider) {
        this.nextProvider = nextProvider
        this.remainingData = new Uint8Array(0)
        this.dataHandlers = []
        this.errorHandlers = []
 
        // process nextProvider data
        this.nextProvider.on('data', (data: Uint8Array) => {
            const newData = new Uint8Array(this.remainingData.byteLength + data.byteLength)
            newData.set(this.remainingData, 0)
            newData.set(data, this.remainingData.byteLength)
            this.remainingData = newData
            while (this.remainingData.byteLength >= 4) {
                const view = new DataView(this.remainingData.buffer)
                const messageLength = view.getUint32(0, true)
                Iif (messageLength > SimpleEnvelopeP2PProvider.maxReadLength) {
                    this.emitError(new Error('Incoming Message too long'))
                }
 
                if (this.remainingData.byteLength < 4 + messageLength) {
                    // need more data
                    break
                }
 
                const messageBuffer = this.remainingData.subarray(4, 4 + messageLength)
                this.remainingData = this.remainingData.slice(4 + messageLength)
                this.emitData(messageBuffer)
            }
        })
 
        // proxy error
        this.nextProvider.on('error', (err: any) => {
            this.emitError(err)
        })
    }
 
    write(data: Uint8Array, done?: P2PHandler): void {
        const nextBuffer = new Uint8Array(4 + data.byteLength)
        const view = new DataView(nextBuffer.buffer)
        view.setUint32(0, data.byteLength, true)
        nextBuffer.set(data, 4)
        this.nextProvider.write(nextBuffer, done)
    }
 
    end(cb?: P2PHandler): void {
        this.nextProvider.end(cb)
    }
 
    destroy(err?: Error): void {
        this.nextProvider.destroy(err)
    }
 
    on<T extends keyof P2PEventMap>(event: T, handler: P2PEventMap[T]): this {
        if (event === 'data') {
            this.dataHandlers.push(handler)
        } else if (event === 'error') {
            this.errorHandlers.push(handler)
        } else {
            this.nextProvider.on(event, handler)
        }
        return this
    }
 
    emitData(messageBuffer: Uint8Array): void {
        for (const handler of this.dataHandlers) {
            // typescript is loosing the specificity provided by T in the assignment above
            handler(messageBuffer)
        }
    }
 
    emitError(err: any): void {
        for (const handler of this.errorHandlers) {
            // typescript is loosing the specificity provided by T in the assignment above
            handler(err)
        }
    }
}