multiprocess.js 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. const debug = require('debug')('log4js:multiprocess');
  2. const net = require('net');
  3. const LoggingEvent = require('../LoggingEvent');
  4. const END_MSG = '__LOG4JS__';
  5. /**
  6. * Creates a server, listening on config.loggerPort, config.loggerHost.
  7. * Output goes to config.actualAppender (config.appender is used to
  8. * set up that appender).
  9. */
  10. function logServer(config, actualAppender, levels) {
  11. /**
  12. * Takes a utf-8 string, returns an object with
  13. * the correct log properties.
  14. */
  15. function deserializeLoggingEvent(clientSocket, msg) {
  16. debug('(master) deserialising log event');
  17. const loggingEvent = LoggingEvent.deserialise(msg);
  18. loggingEvent.remoteAddress = clientSocket.remoteAddress;
  19. loggingEvent.remotePort = clientSocket.remotePort;
  20. return loggingEvent;
  21. }
  22. const server = net.createServer((clientSocket) => {
  23. debug('(master) connection received');
  24. clientSocket.setEncoding('utf8');
  25. let logMessage = '';
  26. function logTheMessage(msg) {
  27. debug('(master) deserialising log event and sending to actual appender');
  28. actualAppender(deserializeLoggingEvent(clientSocket, msg));
  29. }
  30. function chunkReceived(chunk) {
  31. debug('(master) chunk of data received');
  32. let event;
  33. logMessage += chunk || '';
  34. if (logMessage.indexOf(END_MSG) > -1) {
  35. event = logMessage.slice(0, logMessage.indexOf(END_MSG));
  36. logTheMessage(event);
  37. logMessage = logMessage.slice(event.length + END_MSG.length) || '';
  38. // check for more, maybe it was a big chunk
  39. chunkReceived();
  40. }
  41. }
  42. function handleError(error) {
  43. const loggingEvent = {
  44. startTime: new Date(),
  45. categoryName: 'log4js',
  46. level: levels.ERROR,
  47. data: ['A worker log process hung up unexpectedly', error],
  48. remoteAddress: clientSocket.remoteAddress,
  49. remotePort: clientSocket.remotePort,
  50. };
  51. actualAppender(loggingEvent);
  52. }
  53. clientSocket.on('data', chunkReceived);
  54. clientSocket.on('end', chunkReceived);
  55. clientSocket.on('error', handleError);
  56. });
  57. server.listen(
  58. config.loggerPort || 5000,
  59. config.loggerHost || 'localhost',
  60. (e) => {
  61. debug('(master) master server listening, error was ', e);
  62. // allow the process to exit, if this is the only socket active
  63. server.unref();
  64. }
  65. );
  66. function app(event) {
  67. debug('(master) log event sent directly to actual appender (local event)');
  68. return actualAppender(event);
  69. }
  70. app.shutdown = function (cb) {
  71. debug('(master) master shutdown called, closing server');
  72. server.close(cb);
  73. };
  74. return app;
  75. }
  76. function workerAppender(config) {
  77. let canWrite = false;
  78. const buffer = [];
  79. let socket;
  80. let shutdownAttempts = 3;
  81. function write(loggingEvent) {
  82. debug('(worker) Writing log event to socket');
  83. socket.write(loggingEvent.serialise(), 'utf8');
  84. socket.write(END_MSG, 'utf8');
  85. }
  86. function emptyBuffer() {
  87. let evt;
  88. debug('(worker) emptying worker buffer');
  89. while ((evt = buffer.shift())) {
  90. write(evt);
  91. }
  92. }
  93. function createSocket() {
  94. debug(
  95. `(worker) worker appender creating socket to ${
  96. config.loggerHost || 'localhost'
  97. }:${config.loggerPort || 5000}`
  98. );
  99. socket = net.createConnection(
  100. config.loggerPort || 5000,
  101. config.loggerHost || 'localhost'
  102. );
  103. socket.on('connect', () => {
  104. debug('(worker) worker socket connected');
  105. emptyBuffer();
  106. canWrite = true;
  107. });
  108. socket.on('timeout', socket.end.bind(socket));
  109. socket.on('error', (e) => {
  110. debug('connection error', e);
  111. canWrite = false;
  112. emptyBuffer();
  113. });
  114. socket.on('close', createSocket);
  115. }
  116. createSocket();
  117. function log(loggingEvent) {
  118. if (canWrite) {
  119. write(loggingEvent);
  120. } else {
  121. debug(
  122. '(worker) worker buffering log event because it cannot write at the moment'
  123. );
  124. buffer.push(loggingEvent);
  125. }
  126. }
  127. log.shutdown = function (cb) {
  128. debug('(worker) worker shutdown called');
  129. if (buffer.length && shutdownAttempts) {
  130. debug('(worker) worker buffer has items, waiting 100ms to empty');
  131. shutdownAttempts -= 1;
  132. setTimeout(() => {
  133. log.shutdown(cb);
  134. }, 100);
  135. } else {
  136. socket.removeAllListeners('close');
  137. socket.end(cb);
  138. }
  139. };
  140. return log;
  141. }
  142. function createAppender(config, appender, levels) {
  143. if (config.mode === 'master') {
  144. debug('Creating master appender');
  145. return logServer(config, appender, levels);
  146. }
  147. debug('Creating worker appender');
  148. return workerAppender(config);
  149. }
  150. function configure(config, layouts, findAppender, levels) {
  151. let appender;
  152. debug(`configure with mode = ${config.mode}`);
  153. if (config.mode === 'master') {
  154. if (!config.appender) {
  155. debug(`no appender found in config ${config}`);
  156. throw new Error('multiprocess master must have an "appender" defined');
  157. }
  158. debug(`actual appender is ${config.appender}`);
  159. appender = findAppender(config.appender);
  160. if (!appender) {
  161. debug(`actual appender "${config.appender}" not found`);
  162. throw new Error(
  163. `multiprocess master appender "${config.appender}" not defined`
  164. );
  165. }
  166. }
  167. return createAppender(config, appender, levels);
  168. }
  169. module.exports.configure = configure;