'use strict';
const http = require('node:http');
const https = require('node:https');
const { Duplex: DuplexStream } = require('node:stream');
const merge = require('merge');
const concat = require('concat-stream');
const constants = require('./constants');
const utils = require('./utils');
/**
* Represents a transport adapter over HTTP
*/
class HTTPTransport extends DuplexStream {
static get DEFAULTS() {
return {
allowLoopbackAddresses: true
};
}
/**
* Contructs a HTTP transport adapter
* @constructor
*/
constructor(options) {
super({ objectMode: true });
this._options = merge({}, HTTPTransport.DEFAULTS, options);
this._pending = new Map();
this.server = this._createServer(this._options);
this.server.on('error', (err) => this.emit('error', err));
setInterval(() => this._timeoutPending(), constants.T_RESPONSETIMEOUT);
}
/**
* Creates the HTTP server object
* @private
*/
_createServer() {
return http.createServer();
}
/**
* Returns a HTTP request object
* @private
*/
_createRequest(options) {
if (options.protocol === 'https:') {
return https.request(...arguments);
}
return http.request(...arguments);
}
/**
* Implements the readable interface
* @private
*/
_read() {
if (this.server.listeners('request').length) {
return;
}
this.server.on('request', (req, res) => this._handle(req, res));
}
/**
* Every T_RESPONSETIMEOUT, we destroy any open sockets that are still
* waiting
* @private
*/
_timeoutPending() {
const now = Date.now();
this._pending.forEach(({ timestamp, response }, id) => {
let timeout = timestamp + constants.T_RESPONSETIMEOUT;
if (now >= timeout) {
response.statusCode = 504;
response.end('Gateway Timeout');
this._pending.delete(id);
}
});
}
/**
* Implements the writable interface
* @private
*/
_write([id, buffer, target], encoding, callback) {
let [, contact] = target;
// NB: If responding to a received request...
if (this._pending.has(id)) {
this._pending.get(id).response.end(buffer);
this._pending.delete(id);
return callback(null);
}
// NB: If originating an outbound request...
const reqopts = {
hostname: contact.hostname,
port: contact.port,
protocol: contact.protocol,
method: 'POST',
headers: {
'x-kad-message-id': id
}
};
if (typeof contact.path === 'string') {
reqopts.path = contact.path;
}
const request = this._createRequest(reqopts);
request.on('response', (response) => {
response.on('error', (err) => this.emit('error', err));
response.pipe(concat((buffer) => {
if (response.statusCode >= 400) {
let err = new Error(buffer.toString());
err.dispose = id;
this.emit('error', err);
} else {
this.push(buffer);
}
}));
});
request.on('error', (err) => {
err.dispose = id;
this.emit('error', err);
});
request.end(buffer);
callback();
}
/**
* Default request handler
* @private
*/
_handle(req, res) {
req.on('error', (err) => this.emit('error', err));
res.on('error', (err) => this.emit('error', err));
if (!req.headers['x-kad-message-id']) {
res.statusCode = 400;
return res.end();
}
res.setHeader('X-Kad-Message-ID', req.headers['x-kad-message-id']);
res.setHeader('Access-Control-Allow-Origin', req.headers.origin || '*');
res.setHeader('Access-Control-Allow-Methods', '*');
res.setHeader('Access-Control-Allow-Headers', '*');
res.setHeader('Access-Control-Allow-Credentials', 'true');
if (!['POST', 'OPTIONS'].includes(req.method)) {
res.statusCode = 405;
}
if (req.method !== 'POST') {
return res.end();
}
req.pipe(concat((buffer) => {
this._pending.set(req.headers['x-kad-message-id'], {
timestamp: Date.now(),
response: res
});
this.push(buffer);
}));
}
/**
* @private
*/
_validate(contact) {
return utils.isValidContact(contact, this._options.allowLoopbackAddresses);
}
/**
* Binds the server to the given address/port
*/
listen() {
this.server.listen(...arguments);
}
}
module.exports = HTTPTransport;