1127 lines
35 KiB
JavaScript
1127 lines
35 KiB
JavaScript
"use strict";
|
|
|
|
var _interopRequireDefault = require("@babel/runtime/helpers/interopRequireDefault");
|
|
|
|
var _regenerator = _interopRequireDefault(require("@babel/runtime/regenerator"));
|
|
|
|
var _asyncToGenerator2 = _interopRequireDefault(require("@babel/runtime/helpers/asyncToGenerator"));
|
|
|
|
/**
|
|
* this method is used in nodejs-environments.
|
|
* The ipc is handled via sockets and file-writes to the tmp-folder
|
|
*/
|
|
var util = require('util');
|
|
|
|
var fs = require('fs');
|
|
|
|
var os = require('os');
|
|
|
|
var events = require('events');
|
|
|
|
var net = require('net');
|
|
|
|
var path = require('path');
|
|
|
|
var micro = require('nano-time');
|
|
|
|
var rimraf = require('rimraf');
|
|
|
|
var sha3_224 = require('js-sha3').sha3_224;
|
|
|
|
var isNode = require('detect-node');
|
|
|
|
var unload = require('unload');
|
|
|
|
var fillOptionsWithDefaults = require('../../dist/lib/options.js').fillOptionsWithDefaults;
|
|
|
|
var ownUtil = require('../../dist/lib/util.js');
|
|
|
|
var randomInt = ownUtil.randomInt;
|
|
var randomToken = ownUtil.randomToken;
|
|
|
|
var _require = require('oblivious-set'),
|
|
ObliviousSet = _require.ObliviousSet;
|
|
/**
|
|
* windows sucks, so we have handle windows-type of socket-paths
|
|
* @link https://gist.github.com/domenic/2790533#gistcomment-331356
|
|
*/
|
|
|
|
|
|
function cleanPipeName(str) {
|
|
if (process.platform === 'win32' && !str.startsWith('\\\\.\\pipe\\')) {
|
|
str = str.replace(/^\//, '');
|
|
str = str.replace(/\//g, '-');
|
|
return '\\\\.\\pipe\\' + str;
|
|
} else {
|
|
return str;
|
|
}
|
|
}
|
|
|
|
var mkdir = util.promisify(fs.mkdir);
|
|
var writeFile = util.promisify(fs.writeFile);
|
|
var readFile = util.promisify(fs.readFile);
|
|
var unlink = util.promisify(fs.unlink);
|
|
var readdir = util.promisify(fs.readdir);
|
|
var chmod = util.promisify(fs.chmod);
|
|
var removeDir = util.promisify(rimraf);
|
|
var OTHER_INSTANCES = {};
|
|
var TMP_FOLDER_NAME = 'pubkey.bc';
|
|
var TMP_FOLDER_BASE = path.join(os.tmpdir(), TMP_FOLDER_NAME);
|
|
var getPathsCache = new Map();
|
|
|
|
function getPaths(channelName) {
|
|
if (!getPathsCache.has(channelName)) {
|
|
var channelHash = sha3_224(channelName); // use hash incase of strange characters
|
|
|
|
/**
|
|
* because the lenght of socket-paths is limited, we use only the first 20 chars
|
|
* and also start with A to ensure we do not start with a number
|
|
* @link https://serverfault.com/questions/641347/check-if-a-path-exceeds-maximum-for-unix-domain-socket
|
|
*/
|
|
|
|
var channelFolder = 'A' + channelHash.substring(0, 20);
|
|
var channelPathBase = path.join(TMP_FOLDER_BASE, channelFolder);
|
|
var folderPathReaders = path.join(channelPathBase, 'rdrs');
|
|
var folderPathMessages = path.join(channelPathBase, 'messages');
|
|
var ret = {
|
|
channelBase: channelPathBase,
|
|
readers: folderPathReaders,
|
|
messages: folderPathMessages
|
|
};
|
|
getPathsCache.set(channelName, ret);
|
|
return ret;
|
|
}
|
|
|
|
return getPathsCache.get(channelName);
|
|
}
|
|
|
|
var ENSURE_BASE_FOLDER_EXISTS_PROMISE = null;
|
|
|
|
function ensureBaseFolderExists() {
|
|
return _ensureBaseFolderExists.apply(this, arguments);
|
|
}
|
|
|
|
function _ensureBaseFolderExists() {
|
|
_ensureBaseFolderExists = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee4() {
|
|
return _regenerator["default"].wrap(function _callee4$(_context4) {
|
|
while (1) {
|
|
switch (_context4.prev = _context4.next) {
|
|
case 0:
|
|
if (!ENSURE_BASE_FOLDER_EXISTS_PROMISE) {
|
|
ENSURE_BASE_FOLDER_EXISTS_PROMISE = mkdir(TMP_FOLDER_BASE)["catch"](function () {
|
|
return null;
|
|
});
|
|
}
|
|
|
|
return _context4.abrupt("return", ENSURE_BASE_FOLDER_EXISTS_PROMISE);
|
|
|
|
case 2:
|
|
case "end":
|
|
return _context4.stop();
|
|
}
|
|
}
|
|
}, _callee4);
|
|
}));
|
|
return _ensureBaseFolderExists.apply(this, arguments);
|
|
}
|
|
|
|
function ensureFoldersExist(_x, _x2) {
|
|
return _ensureFoldersExist.apply(this, arguments);
|
|
}
|
|
/**
|
|
* removes the tmp-folder
|
|
* @return {Promise<true>}
|
|
*/
|
|
|
|
|
|
function _ensureFoldersExist() {
|
|
_ensureFoldersExist = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee5(channelName, paths) {
|
|
var chmodValue;
|
|
return _regenerator["default"].wrap(function _callee5$(_context5) {
|
|
while (1) {
|
|
switch (_context5.prev = _context5.next) {
|
|
case 0:
|
|
paths = paths || getPaths(channelName);
|
|
_context5.next = 3;
|
|
return ensureBaseFolderExists();
|
|
|
|
case 3:
|
|
_context5.next = 5;
|
|
return mkdir(paths.channelBase)["catch"](function () {
|
|
return null;
|
|
});
|
|
|
|
case 5:
|
|
_context5.next = 7;
|
|
return Promise.all([mkdir(paths.readers)["catch"](function () {
|
|
return null;
|
|
}), mkdir(paths.messages)["catch"](function () {
|
|
return null;
|
|
})]);
|
|
|
|
case 7:
|
|
// set permissions so other users can use the same channel
|
|
chmodValue = '777';
|
|
_context5.next = 10;
|
|
return Promise.all([chmod(paths.channelBase, chmodValue), chmod(paths.readers, chmodValue), chmod(paths.messages, chmodValue)])["catch"](function () {
|
|
return null;
|
|
});
|
|
|
|
case 10:
|
|
case "end":
|
|
return _context5.stop();
|
|
}
|
|
}
|
|
}, _callee5);
|
|
}));
|
|
return _ensureFoldersExist.apply(this, arguments);
|
|
}
|
|
|
|
function clearNodeFolder() {
|
|
return _clearNodeFolder.apply(this, arguments);
|
|
}
|
|
|
|
function _clearNodeFolder() {
|
|
_clearNodeFolder = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee6() {
|
|
return _regenerator["default"].wrap(function _callee6$(_context6) {
|
|
while (1) {
|
|
switch (_context6.prev = _context6.next) {
|
|
case 0:
|
|
if (!(!TMP_FOLDER_BASE || TMP_FOLDER_BASE === '' || TMP_FOLDER_BASE === '/')) {
|
|
_context6.next = 2;
|
|
break;
|
|
}
|
|
|
|
throw new Error('BroadcastChannel.clearNodeFolder(): path is wrong');
|
|
|
|
case 2:
|
|
ENSURE_BASE_FOLDER_EXISTS_PROMISE = null;
|
|
_context6.next = 5;
|
|
return removeDir(TMP_FOLDER_BASE);
|
|
|
|
case 5:
|
|
ENSURE_BASE_FOLDER_EXISTS_PROMISE = null;
|
|
return _context6.abrupt("return", true);
|
|
|
|
case 7:
|
|
case "end":
|
|
return _context6.stop();
|
|
}
|
|
}
|
|
}, _callee6);
|
|
}));
|
|
return _clearNodeFolder.apply(this, arguments);
|
|
}
|
|
|
|
function socketPath(channelName, readerUuid, paths) {
|
|
paths = paths || getPaths(channelName);
|
|
var socketPath = path.join(paths.readers, readerUuid + '.s');
|
|
return cleanPipeName(socketPath);
|
|
}
|
|
|
|
function socketInfoPath(channelName, readerUuid, paths) {
|
|
paths = paths || getPaths(channelName);
|
|
var socketPath = path.join(paths.readers, readerUuid + '.json');
|
|
return socketPath;
|
|
}
|
|
/**
|
|
* Because it is not possible to get all socket-files in a folder,
|
|
* when used under fucking windows,
|
|
* we have to set a normal file so other readers know our socket exists
|
|
*/
|
|
|
|
|
|
function createSocketInfoFile(channelName, readerUuid, paths) {
|
|
var pathToFile = socketInfoPath(channelName, readerUuid, paths);
|
|
return writeFile(pathToFile, JSON.stringify({
|
|
time: microSeconds()
|
|
})).then(function () {
|
|
return pathToFile;
|
|
});
|
|
}
|
|
/**
|
|
* returns the amount of channel-folders in the tmp-directory
|
|
* @return {Promise<number>}
|
|
*/
|
|
|
|
|
|
function countChannelFolders() {
|
|
return _countChannelFolders.apply(this, arguments);
|
|
}
|
|
|
|
function _countChannelFolders() {
|
|
_countChannelFolders = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee7() {
|
|
var folders;
|
|
return _regenerator["default"].wrap(function _callee7$(_context7) {
|
|
while (1) {
|
|
switch (_context7.prev = _context7.next) {
|
|
case 0:
|
|
_context7.next = 2;
|
|
return ensureBaseFolderExists();
|
|
|
|
case 2:
|
|
_context7.next = 4;
|
|
return readdir(TMP_FOLDER_BASE);
|
|
|
|
case 4:
|
|
folders = _context7.sent;
|
|
return _context7.abrupt("return", folders.length);
|
|
|
|
case 6:
|
|
case "end":
|
|
return _context7.stop();
|
|
}
|
|
}
|
|
}, _callee7);
|
|
}));
|
|
return _countChannelFolders.apply(this, arguments);
|
|
}
|
|
|
|
function connectionError(_x3) {
|
|
return _connectionError.apply(this, arguments);
|
|
}
|
|
/**
|
|
* creates the socket-file and subscribes to it
|
|
* @return {{emitter: EventEmitter, server: any}}
|
|
*/
|
|
|
|
|
|
function _connectionError() {
|
|
_connectionError = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee8(originalError) {
|
|
var count, addObj, text, newError;
|
|
return _regenerator["default"].wrap(function _callee8$(_context8) {
|
|
while (1) {
|
|
switch (_context8.prev = _context8.next) {
|
|
case 0:
|
|
_context8.next = 2;
|
|
return countChannelFolders();
|
|
|
|
case 2:
|
|
count = _context8.sent;
|
|
|
|
if (!(count < 30)) {
|
|
_context8.next = 5;
|
|
break;
|
|
}
|
|
|
|
return _context8.abrupt("return", originalError);
|
|
|
|
case 5:
|
|
addObj = {};
|
|
Object.entries(originalError).forEach(function (_ref4) {
|
|
var k = _ref4[0],
|
|
v = _ref4[1];
|
|
return addObj[k] = v;
|
|
});
|
|
text = 'BroadcastChannel.create(): error: ' + 'This might happen if you have created to many channels, ' + 'like when you use BroadcastChannel in unit-tests.' + 'Try using BroadcastChannel.clearNodeFolder() to clear the tmp-folder before each test.' + 'See https://github.com/pubkey/broadcast-channel#clear-tmp-folder';
|
|
newError = new Error(text + ': ' + JSON.stringify(addObj, null, 2));
|
|
return _context8.abrupt("return", newError);
|
|
|
|
case 10:
|
|
case "end":
|
|
return _context8.stop();
|
|
}
|
|
}
|
|
}, _callee8);
|
|
}));
|
|
return _connectionError.apply(this, arguments);
|
|
}
|
|
|
|
function createSocketEventEmitter(_x4, _x5, _x6) {
|
|
return _createSocketEventEmitter.apply(this, arguments);
|
|
}
|
|
|
|
function _createSocketEventEmitter() {
|
|
_createSocketEventEmitter = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee11(channelName, readerUuid, paths) {
|
|
var pathToSocket, emitter, server;
|
|
return _regenerator["default"].wrap(function _callee11$(_context11) {
|
|
while (1) {
|
|
switch (_context11.prev = _context11.next) {
|
|
case 0:
|
|
pathToSocket = socketPath(channelName, readerUuid, paths);
|
|
emitter = new events.EventEmitter();
|
|
server = net.createServer(function (stream) {
|
|
stream.on('end', function () {});
|
|
stream.on('data', function (msg) {
|
|
emitter.emit('data', msg.toString());
|
|
});
|
|
});
|
|
_context11.next = 5;
|
|
return new Promise(function (resolve, reject) {
|
|
server.on('error', /*#__PURE__*/function () {
|
|
var _ref5 = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee9(err) {
|
|
var useErr;
|
|
return _regenerator["default"].wrap(function _callee9$(_context9) {
|
|
while (1) {
|
|
switch (_context9.prev = _context9.next) {
|
|
case 0:
|
|
_context9.next = 2;
|
|
return connectionError(err);
|
|
|
|
case 2:
|
|
useErr = _context9.sent;
|
|
reject(useErr);
|
|
|
|
case 4:
|
|
case "end":
|
|
return _context9.stop();
|
|
}
|
|
}
|
|
}, _callee9);
|
|
}));
|
|
|
|
return function (_x24) {
|
|
return _ref5.apply(this, arguments);
|
|
};
|
|
}());
|
|
server.listen(pathToSocket, /*#__PURE__*/function () {
|
|
var _ref6 = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee10(err, res) {
|
|
var useErr;
|
|
return _regenerator["default"].wrap(function _callee10$(_context10) {
|
|
while (1) {
|
|
switch (_context10.prev = _context10.next) {
|
|
case 0:
|
|
if (!err) {
|
|
_context10.next = 7;
|
|
break;
|
|
}
|
|
|
|
_context10.next = 3;
|
|
return connectionError(err);
|
|
|
|
case 3:
|
|
useErr = _context10.sent;
|
|
reject(useErr);
|
|
_context10.next = 8;
|
|
break;
|
|
|
|
case 7:
|
|
resolve(res);
|
|
|
|
case 8:
|
|
case "end":
|
|
return _context10.stop();
|
|
}
|
|
}
|
|
}, _callee10);
|
|
}));
|
|
|
|
return function (_x25, _x26) {
|
|
return _ref6.apply(this, arguments);
|
|
};
|
|
}());
|
|
});
|
|
|
|
case 5:
|
|
return _context11.abrupt("return", {
|
|
path: pathToSocket,
|
|
emitter: emitter,
|
|
server: server
|
|
});
|
|
|
|
case 6:
|
|
case "end":
|
|
return _context11.stop();
|
|
}
|
|
}
|
|
}, _callee11);
|
|
}));
|
|
return _createSocketEventEmitter.apply(this, arguments);
|
|
}
|
|
|
|
function openClientConnection(_x7, _x8) {
|
|
return _openClientConnection.apply(this, arguments);
|
|
}
|
|
/**
|
|
* writes the new message to the file-system
|
|
* so other readers can find it
|
|
* @return {Promise}
|
|
*/
|
|
|
|
|
|
function _openClientConnection() {
|
|
_openClientConnection = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee12(channelName, readerUuid) {
|
|
var pathToSocket, client;
|
|
return _regenerator["default"].wrap(function _callee12$(_context12) {
|
|
while (1) {
|
|
switch (_context12.prev = _context12.next) {
|
|
case 0:
|
|
pathToSocket = socketPath(channelName, readerUuid);
|
|
client = new net.Socket();
|
|
return _context12.abrupt("return", new Promise(function (res, rej) {
|
|
client.connect(pathToSocket, function () {
|
|
return res(client);
|
|
});
|
|
client.on('error', function (err) {
|
|
return rej(err);
|
|
});
|
|
}));
|
|
|
|
case 3:
|
|
case "end":
|
|
return _context12.stop();
|
|
}
|
|
}
|
|
}, _callee12);
|
|
}));
|
|
return _openClientConnection.apply(this, arguments);
|
|
}
|
|
|
|
function writeMessage(channelName, readerUuid, messageJson, paths) {
|
|
paths = paths || getPaths(channelName);
|
|
var time = microSeconds();
|
|
var writeObject = {
|
|
uuid: readerUuid,
|
|
time: time,
|
|
data: messageJson
|
|
};
|
|
var token = randomToken();
|
|
var fileName = time + '_' + readerUuid + '_' + token + '.json';
|
|
var msgPath = path.join(paths.messages, fileName);
|
|
return writeFile(msgPath, JSON.stringify(writeObject)).then(function () {
|
|
return {
|
|
time: time,
|
|
uuid: readerUuid,
|
|
token: token,
|
|
path: msgPath
|
|
};
|
|
});
|
|
}
|
|
/**
|
|
* returns the uuids of all readers
|
|
* @return {string[]}
|
|
*/
|
|
|
|
|
|
function getReadersUuids(_x9, _x10) {
|
|
return _getReadersUuids.apply(this, arguments);
|
|
}
|
|
|
|
function _getReadersUuids() {
|
|
_getReadersUuids = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee13(channelName, paths) {
|
|
var readersPath, files;
|
|
return _regenerator["default"].wrap(function _callee13$(_context13) {
|
|
while (1) {
|
|
switch (_context13.prev = _context13.next) {
|
|
case 0:
|
|
paths = paths || getPaths(channelName);
|
|
readersPath = paths.readers;
|
|
_context13.next = 4;
|
|
return readdir(readersPath);
|
|
|
|
case 4:
|
|
files = _context13.sent;
|
|
return _context13.abrupt("return", files.map(function (file) {
|
|
return file.split('.');
|
|
}).filter(function (split) {
|
|
return split[1] === 'json';
|
|
}) // do not scan .socket-files
|
|
. // do not scan .socket-files
|
|
map(function (split) {
|
|
return split[0];
|
|
}));
|
|
|
|
case 6:
|
|
case "end":
|
|
return _context13.stop();
|
|
}
|
|
}
|
|
}, _callee13);
|
|
}));
|
|
return _getReadersUuids.apply(this, arguments);
|
|
}
|
|
|
|
function messagePath(_x11, _x12, _x13, _x14) {
|
|
return _messagePath.apply(this, arguments);
|
|
}
|
|
|
|
function _messagePath() {
|
|
_messagePath = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee14(channelName, time, token, writerUuid) {
|
|
var fileName, msgPath;
|
|
return _regenerator["default"].wrap(function _callee14$(_context14) {
|
|
while (1) {
|
|
switch (_context14.prev = _context14.next) {
|
|
case 0:
|
|
fileName = time + '_' + writerUuid + '_' + token + '.json';
|
|
msgPath = path.join(getPaths(channelName).messages, fileName);
|
|
return _context14.abrupt("return", msgPath);
|
|
|
|
case 3:
|
|
case "end":
|
|
return _context14.stop();
|
|
}
|
|
}
|
|
}, _callee14);
|
|
}));
|
|
return _messagePath.apply(this, arguments);
|
|
}
|
|
|
|
function getAllMessages(_x15, _x16) {
|
|
return _getAllMessages.apply(this, arguments);
|
|
}
|
|
|
|
function _getAllMessages() {
|
|
_getAllMessages = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee15(channelName, paths) {
|
|
var messagesPath, files;
|
|
return _regenerator["default"].wrap(function _callee15$(_context15) {
|
|
while (1) {
|
|
switch (_context15.prev = _context15.next) {
|
|
case 0:
|
|
paths = paths || getPaths(channelName);
|
|
messagesPath = paths.messages;
|
|
_context15.next = 4;
|
|
return readdir(messagesPath);
|
|
|
|
case 4:
|
|
files = _context15.sent;
|
|
return _context15.abrupt("return", files.map(function (file) {
|
|
var fileName = file.split('.')[0];
|
|
var split = fileName.split('_');
|
|
return {
|
|
path: path.join(messagesPath, file),
|
|
time: parseInt(split[0]),
|
|
senderUuid: split[1],
|
|
token: split[2]
|
|
};
|
|
}));
|
|
|
|
case 6:
|
|
case "end":
|
|
return _context15.stop();
|
|
}
|
|
}
|
|
}, _callee15);
|
|
}));
|
|
return _getAllMessages.apply(this, arguments);
|
|
}
|
|
|
|
function getSingleMessage(channelName, msgObj, paths) {
|
|
paths = paths || getPaths(channelName);
|
|
return {
|
|
path: path.join(paths.messages, msgObj.t + '_' + msgObj.u + '_' + msgObj.to + '.json'),
|
|
time: msgObj.t,
|
|
senderUuid: msgObj.u,
|
|
token: msgObj.to
|
|
};
|
|
}
|
|
|
|
function readMessage(messageObj) {
|
|
return readFile(messageObj.path, 'utf8').then(function (content) {
|
|
return JSON.parse(content);
|
|
});
|
|
}
|
|
|
|
function cleanOldMessages(_x17, _x18) {
|
|
return _cleanOldMessages.apply(this, arguments);
|
|
}
|
|
|
|
function _cleanOldMessages() {
|
|
_cleanOldMessages = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee16(messageObjects, ttl) {
|
|
var olderThen;
|
|
return _regenerator["default"].wrap(function _callee16$(_context16) {
|
|
while (1) {
|
|
switch (_context16.prev = _context16.next) {
|
|
case 0:
|
|
olderThen = Date.now() - ttl;
|
|
_context16.next = 3;
|
|
return Promise.all(messageObjects.filter(function (obj) {
|
|
return obj.time / 1000 < olderThen;
|
|
}).map(function (obj) {
|
|
return unlink(obj.path)["catch"](function () {
|
|
return null;
|
|
});
|
|
}));
|
|
|
|
case 3:
|
|
case "end":
|
|
return _context16.stop();
|
|
}
|
|
}
|
|
}, _callee16);
|
|
}));
|
|
return _cleanOldMessages.apply(this, arguments);
|
|
}
|
|
|
|
var type = 'node';
|
|
/**
|
|
* creates a new channelState
|
|
* @return {Promise<any>}
|
|
*/
|
|
|
|
function create(_x19) {
|
|
return _create.apply(this, arguments);
|
|
}
|
|
|
|
function _create() {
|
|
_create = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee17(channelName) {
|
|
var options,
|
|
time,
|
|
paths,
|
|
ensureFolderExistsPromise,
|
|
uuid,
|
|
state,
|
|
_yield$Promise$all2,
|
|
socketEE,
|
|
infoFilePath,
|
|
_args17 = arguments;
|
|
|
|
return _regenerator["default"].wrap(function _callee17$(_context17) {
|
|
while (1) {
|
|
switch (_context17.prev = _context17.next) {
|
|
case 0:
|
|
options = _args17.length > 1 && _args17[1] !== undefined ? _args17[1] : {};
|
|
options = fillOptionsWithDefaults(options);
|
|
time = microSeconds();
|
|
paths = getPaths(channelName);
|
|
ensureFolderExistsPromise = ensureFoldersExist(channelName, paths);
|
|
uuid = randomToken();
|
|
state = {
|
|
time: time,
|
|
channelName: channelName,
|
|
options: options,
|
|
uuid: uuid,
|
|
paths: paths,
|
|
// contains all messages that have been emitted before
|
|
emittedMessagesIds: new ObliviousSet(options.node.ttl * 2),
|
|
messagesCallbackTime: null,
|
|
messagesCallback: null,
|
|
// ensures we do not read messages in parrallel
|
|
writeBlockPromise: Promise.resolve(),
|
|
otherReaderClients: {},
|
|
// ensure if process crashes, everything is cleaned up
|
|
removeUnload: unload.add(function () {
|
|
return close(state);
|
|
}),
|
|
closed: false
|
|
};
|
|
if (!OTHER_INSTANCES[channelName]) OTHER_INSTANCES[channelName] = [];
|
|
OTHER_INSTANCES[channelName].push(state);
|
|
_context17.next = 11;
|
|
return ensureFolderExistsPromise;
|
|
|
|
case 11:
|
|
_context17.next = 13;
|
|
return Promise.all([createSocketEventEmitter(channelName, uuid, paths), createSocketInfoFile(channelName, uuid, paths), refreshReaderClients(state)]);
|
|
|
|
case 13:
|
|
_yield$Promise$all2 = _context17.sent;
|
|
socketEE = _yield$Promise$all2[0];
|
|
infoFilePath = _yield$Promise$all2[1];
|
|
state.socketEE = socketEE;
|
|
state.infoFilePath = infoFilePath; // when new message comes in, we read it and emit it
|
|
|
|
socketEE.emitter.on('data', function (data) {
|
|
// if the socket is used fast, it may appear that multiple messages are flushed at once
|
|
// so we have to split them before
|
|
var singleOnes = data.split('|');
|
|
singleOnes.filter(function (single) {
|
|
return single !== '';
|
|
}).forEach(function (single) {
|
|
try {
|
|
var obj = JSON.parse(single);
|
|
handleMessagePing(state, obj);
|
|
} catch (err) {
|
|
throw new Error('could not parse data: ' + single);
|
|
}
|
|
});
|
|
});
|
|
return _context17.abrupt("return", state);
|
|
|
|
case 20:
|
|
case "end":
|
|
return _context17.stop();
|
|
}
|
|
}
|
|
}, _callee17);
|
|
}));
|
|
return _create.apply(this, arguments);
|
|
}
|
|
|
|
function _filterMessage(msgObj, state) {
|
|
if (msgObj.senderUuid === state.uuid) return false; // not send by own
|
|
|
|
if (state.emittedMessagesIds.has(msgObj.token)) return false; // not already emitted
|
|
|
|
if (!state.messagesCallback) return false; // no listener
|
|
|
|
if (msgObj.time < state.messagesCallbackTime) return false; // not older then onMessageCallback
|
|
|
|
if (msgObj.time < state.time) return false; // msgObj is older then channel
|
|
|
|
state.emittedMessagesIds.add(msgObj.token);
|
|
return true;
|
|
}
|
|
/**
|
|
* when the socket pings, so that we now new messages came,
|
|
* run this
|
|
*/
|
|
|
|
|
|
function handleMessagePing(_x20, _x21) {
|
|
return _handleMessagePing.apply(this, arguments);
|
|
}
|
|
/**
|
|
* ensures that the channelState is connected with all other readers
|
|
* @return {Promise<void>}
|
|
*/
|
|
|
|
|
|
function _handleMessagePing() {
|
|
_handleMessagePing = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee18(state, msgObj) {
|
|
var messages, useMessages;
|
|
return _regenerator["default"].wrap(function _callee18$(_context18) {
|
|
while (1) {
|
|
switch (_context18.prev = _context18.next) {
|
|
case 0:
|
|
if (state.messagesCallback) {
|
|
_context18.next = 2;
|
|
break;
|
|
}
|
|
|
|
return _context18.abrupt("return");
|
|
|
|
case 2:
|
|
if (msgObj) {
|
|
_context18.next = 8;
|
|
break;
|
|
}
|
|
|
|
_context18.next = 5;
|
|
return getAllMessages(state.channelName, state.paths);
|
|
|
|
case 5:
|
|
messages = _context18.sent;
|
|
_context18.next = 9;
|
|
break;
|
|
|
|
case 8:
|
|
// get single message
|
|
messages = [getSingleMessage(state.channelName, msgObj, state.paths)];
|
|
|
|
case 9:
|
|
useMessages = messages.filter(function (msgObj) {
|
|
return _filterMessage(msgObj, state);
|
|
}).sort(function (msgObjA, msgObjB) {
|
|
return msgObjA.time - msgObjB.time;
|
|
}); // sort by time
|
|
// if no listener or message, so not do anything
|
|
|
|
if (!(!useMessages.length || !state.messagesCallback)) {
|
|
_context18.next = 12;
|
|
break;
|
|
}
|
|
|
|
return _context18.abrupt("return");
|
|
|
|
case 12:
|
|
_context18.next = 14;
|
|
return Promise.all(useMessages.map(function (msgObj) {
|
|
return readMessage(msgObj).then(function (content) {
|
|
return msgObj.content = content;
|
|
});
|
|
}));
|
|
|
|
case 14:
|
|
useMessages.forEach(function (msgObj) {
|
|
state.emittedMessagesIds.add(msgObj.token);
|
|
|
|
if (state.messagesCallback) {
|
|
// emit to subscribers
|
|
state.messagesCallback(msgObj.content.data);
|
|
}
|
|
});
|
|
|
|
case 15:
|
|
case "end":
|
|
return _context18.stop();
|
|
}
|
|
}
|
|
}, _callee18);
|
|
}));
|
|
return _handleMessagePing.apply(this, arguments);
|
|
}
|
|
|
|
function refreshReaderClients(channelState) {
|
|
return getReadersUuids(channelState.channelName, channelState.paths).then(function (otherReaders) {
|
|
// remove subscriptions to closed readers
|
|
Object.keys(channelState.otherReaderClients).filter(function (readerUuid) {
|
|
return !otherReaders.includes(readerUuid);
|
|
}).forEach( /*#__PURE__*/function () {
|
|
var _ref = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee(readerUuid) {
|
|
return _regenerator["default"].wrap(function _callee$(_context) {
|
|
while (1) {
|
|
switch (_context.prev = _context.next) {
|
|
case 0:
|
|
_context.prev = 0;
|
|
_context.next = 3;
|
|
return channelState.otherReaderClients[readerUuid].destroy();
|
|
|
|
case 3:
|
|
_context.next = 7;
|
|
break;
|
|
|
|
case 5:
|
|
_context.prev = 5;
|
|
_context.t0 = _context["catch"](0);
|
|
|
|
case 7:
|
|
delete channelState.otherReaderClients[readerUuid];
|
|
|
|
case 8:
|
|
case "end":
|
|
return _context.stop();
|
|
}
|
|
}
|
|
}, _callee, null, [[0, 5]]);
|
|
}));
|
|
|
|
return function (_x22) {
|
|
return _ref.apply(this, arguments);
|
|
};
|
|
}()); // add new readers
|
|
|
|
return Promise.all(otherReaders.filter(function (readerUuid) {
|
|
return readerUuid !== channelState.uuid;
|
|
}) // not own
|
|
.filter(function (readerUuid) {
|
|
return !channelState.otherReaderClients[readerUuid];
|
|
}) // not already has client
|
|
.map( /*#__PURE__*/function () {
|
|
var _ref2 = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee2(readerUuid) {
|
|
var client;
|
|
return _regenerator["default"].wrap(function _callee2$(_context2) {
|
|
while (1) {
|
|
switch (_context2.prev = _context2.next) {
|
|
case 0:
|
|
_context2.prev = 0;
|
|
|
|
if (!channelState.closed) {
|
|
_context2.next = 3;
|
|
break;
|
|
}
|
|
|
|
return _context2.abrupt("return");
|
|
|
|
case 3:
|
|
_context2.prev = 3;
|
|
_context2.next = 6;
|
|
return openClientConnection(channelState.channelName, readerUuid);
|
|
|
|
case 6:
|
|
client = _context2.sent;
|
|
channelState.otherReaderClients[readerUuid] = client;
|
|
_context2.next = 12;
|
|
break;
|
|
|
|
case 10:
|
|
_context2.prev = 10;
|
|
_context2.t0 = _context2["catch"](3);
|
|
|
|
case 12:
|
|
_context2.next = 16;
|
|
break;
|
|
|
|
case 14:
|
|
_context2.prev = 14;
|
|
_context2.t1 = _context2["catch"](0);
|
|
|
|
case 16:
|
|
case "end":
|
|
return _context2.stop();
|
|
}
|
|
}
|
|
}, _callee2, null, [[0, 14], [3, 10]]);
|
|
}));
|
|
|
|
return function (_x23) {
|
|
return _ref2.apply(this, arguments);
|
|
};
|
|
}()));
|
|
});
|
|
}
|
|
/**
|
|
* post a message to the other readers
|
|
* @return {Promise<void>}
|
|
*/
|
|
|
|
|
|
function postMessage(channelState, messageJson) {
|
|
var writePromise = writeMessage(channelState.channelName, channelState.uuid, messageJson, channelState.paths);
|
|
channelState.writeBlockPromise = channelState.writeBlockPromise.then( /*#__PURE__*/(0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee3() {
|
|
var _yield$Promise$all, msgObj, pingStr, writeToReadersPromise;
|
|
|
|
return _regenerator["default"].wrap(function _callee3$(_context3) {
|
|
while (1) {
|
|
switch (_context3.prev = _context3.next) {
|
|
case 0:
|
|
_context3.next = 2;
|
|
return new Promise(function (res) {
|
|
return setTimeout(res, 0);
|
|
});
|
|
|
|
case 2:
|
|
_context3.next = 4;
|
|
return Promise.all([writePromise, refreshReaderClients(channelState)]);
|
|
|
|
case 4:
|
|
_yield$Promise$all = _context3.sent;
|
|
msgObj = _yield$Promise$all[0];
|
|
emitOverFastPath(channelState, msgObj, messageJson);
|
|
pingStr = '{"t":' + msgObj.time + ',"u":"' + msgObj.uuid + '","to":"' + msgObj.token + '"}|';
|
|
writeToReadersPromise = Promise.all(Object.values(channelState.otherReaderClients).filter(function (client) {
|
|
return client.writable;
|
|
}) // client might have closed in between
|
|
.map(function (client) {
|
|
return new Promise(function (res) {
|
|
client.write(pingStr, res);
|
|
});
|
|
}));
|
|
/**
|
|
* clean up old messages
|
|
* to not waste resources on cleaning up,
|
|
* only if random-int matches, we clean up old messages
|
|
*/
|
|
|
|
if (randomInt(0, 20) === 0) {
|
|
/* await */
|
|
getAllMessages(channelState.channelName, channelState.paths).then(function (allMessages) {
|
|
return cleanOldMessages(allMessages, channelState.options.node.ttl);
|
|
});
|
|
}
|
|
|
|
return _context3.abrupt("return", writeToReadersPromise);
|
|
|
|
case 11:
|
|
case "end":
|
|
return _context3.stop();
|
|
}
|
|
}
|
|
}, _callee3);
|
|
})));
|
|
return channelState.writeBlockPromise;
|
|
}
|
|
/**
|
|
* When multiple BroadcastChannels with the same name
|
|
* are created in a single node-process, we can access them directly and emit messages.
|
|
* This might not happen often in production
|
|
* but will speed up things when this module is used in unit-tests.
|
|
*/
|
|
|
|
|
|
function emitOverFastPath(state, msgObj, messageJson) {
|
|
if (!state.options.node.useFastPath) return; // disabled
|
|
|
|
var others = OTHER_INSTANCES[state.channelName].filter(function (s) {
|
|
return s !== state;
|
|
});
|
|
var checkObj = {
|
|
time: msgObj.time,
|
|
senderUuid: msgObj.uuid,
|
|
token: msgObj.token
|
|
};
|
|
others.filter(function (otherState) {
|
|
return _filterMessage(checkObj, otherState);
|
|
}).forEach(function (otherState) {
|
|
otherState.messagesCallback(messageJson);
|
|
});
|
|
}
|
|
|
|
function onMessage(channelState, fn) {
|
|
var time = arguments.length > 2 && arguments[2] !== undefined ? arguments[2] : microSeconds();
|
|
channelState.messagesCallbackTime = time;
|
|
channelState.messagesCallback = fn;
|
|
handleMessagePing(channelState);
|
|
}
|
|
/**
|
|
* closes the channel
|
|
* @return {Promise}
|
|
*/
|
|
|
|
|
|
function close(channelState) {
|
|
if (channelState.closed) return;
|
|
channelState.closed = true;
|
|
channelState.emittedMessagesIds.clear();
|
|
OTHER_INSTANCES[channelState.channelName] = OTHER_INSTANCES[channelState.channelName].filter(function (o) {
|
|
return o !== channelState;
|
|
});
|
|
|
|
if (channelState.removeUnload) {
|
|
channelState.removeUnload.remove();
|
|
}
|
|
|
|
return new Promise(function (res) {
|
|
if (channelState.socketEE) channelState.socketEE.emitter.removeAllListeners();
|
|
Object.values(channelState.otherReaderClients).forEach(function (client) {
|
|
return client.destroy();
|
|
});
|
|
|
|
if (channelState.infoFilePath) {
|
|
try {
|
|
fs.unlinkSync(channelState.infoFilePath);
|
|
} catch (err) {}
|
|
}
|
|
/**
|
|
* the server get closed lazy because others might still write on it
|
|
* and have not found out that the infoFile was deleted
|
|
*/
|
|
|
|
|
|
setTimeout(function () {
|
|
channelState.socketEE.server.close();
|
|
res();
|
|
}, 200);
|
|
});
|
|
}
|
|
|
|
function canBeUsed() {
|
|
return isNode;
|
|
}
|
|
/**
|
|
* on node we use a relatively height averageResponseTime,
|
|
* because the file-io might be in use.
|
|
* Also it is more important that the leader-election is reliable,
|
|
* then to have a fast election.
|
|
*/
|
|
|
|
|
|
function averageResponseTime() {
|
|
return 200;
|
|
}
|
|
|
|
function microSeconds() {
|
|
return parseInt(micro.microseconds());
|
|
}
|
|
|
|
module.exports = {
|
|
TMP_FOLDER_BASE: TMP_FOLDER_BASE,
|
|
cleanPipeName: cleanPipeName,
|
|
getPaths: getPaths,
|
|
ensureFoldersExist: ensureFoldersExist,
|
|
clearNodeFolder: clearNodeFolder,
|
|
socketPath: socketPath,
|
|
socketInfoPath: socketInfoPath,
|
|
createSocketInfoFile: createSocketInfoFile,
|
|
countChannelFolders: countChannelFolders,
|
|
createSocketEventEmitter: createSocketEventEmitter,
|
|
openClientConnection: openClientConnection,
|
|
writeMessage: writeMessage,
|
|
getReadersUuids: getReadersUuids,
|
|
messagePath: messagePath,
|
|
getAllMessages: getAllMessages,
|
|
getSingleMessage: getSingleMessage,
|
|
readMessage: readMessage,
|
|
cleanOldMessages: cleanOldMessages,
|
|
type: type,
|
|
create: create,
|
|
_filterMessage: _filterMessage,
|
|
handleMessagePing: handleMessagePing,
|
|
refreshReaderClients: refreshReaderClients,
|
|
postMessage: postMessage,
|
|
emitOverFastPath: emitOverFastPath,
|
|
onMessage: onMessage,
|
|
close: close,
|
|
canBeUsed: canBeUsed,
|
|
averageResponseTime: averageResponseTime,
|
|
microSeconds: microSeconds
|
|
}; |