Master de II. ULL. 1er cuatrimestre
Lea el Capítulo 4 “Connecting Robust Microservices” de Node.JS The Right Way y resuelva los problemas en la secciones
  yzmq-filer-rep.js program uses fs.readfile() to serve up file contents. However it doesn’t handle error cases.
    SIGINT to detect the user’s Ctrl-C in the terminal
    SIGTERM (the termination signal)?uncaughtException event on the process objectonline events and logged a message when the workers came up.kill [pid]from the command linezmq-filer-rep-cluster.js program to fork a new worker whenever one dies?PUSH socket and bind it to an IPC endpoint. This socket will be for sending jobs to the workersPULL socket and bind it to a different IPC endpoint. This socket will receive messages from the workersready message, increment the ready counter, orresult message , output it to the consolejob messages out through the push socketPULL socket and connect it to the master ‘s PUSH endpointPUSH socket and connect it to the master ‘s PULL endpointjob messages on the PULL socket, and respond by sending a result message out on the PUSH socketready message out on the PUSH socket.result  messages include at least the process ID of the worker. 
This way you can inspect the console output and confirm that the workload is being balanced among the worker processes.connecting-robust-microservices-chapter-4/microservices in our repo ULL-MII-CA-1819/nodejs-the-right-wayEn el paradigma de paralelismo conocido como Farm o Granja de Procesadores la tarea que se quiere realizar es dividida en un subconjunto de sub-tareas bastante mayor que el número de procesadores disponibles.
Una ventaja que tiene este paradigma es que consigue equilibrar la carga de trabajo entre las máquinas,
En el siguiente código mostramos como usar los sockets ROUTER y DEALER de 0MQ junto con los clusters de Node.js para crear un borrador de una granja de trabajadores:
Fichero connecting-robust-microservices-chapter-4/microservices/dealer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
'use strict';
const PORT = require("./port.js");
const ins = require("./ins.js");
const cluster = require("cluster");
const zmq = require("zeromq");
const numWorkers = require("os").cpus().length;
const randomBetween = (min, max) => Math.floor(Math.random() * (max - min) + min);
function workerTask() {
  const dealer = zmq.socket('dealer');
  dealer.identity = process.env["identity"]; 
  console.log("identity "+dealer.identity+
              " process "+process.pid+" port = "+PORT);
  dealer.connect('tcp://localhost:'+PORT);
  let total = 0;
  const sendMessage = () => dealer.send(['ready']);
  //  Get workload from broker, until finished
  dealer.on('message', function onMessage(...args) {
    // console.log("Inside Worker. args = "+ins(args.map(x => x.toString())));
    const workload = args[0].toString('utf8');
    //console.log("Inside Worker. workload = "+workload);
    if (workload === 'stop') {
      console.log('Completed: '+total+' tasks ('+dealer.identity+' '+process.pid+')');
      dealer.removeListener('message', onMessage);
      // https://nodejs.org/api/events.html#events_emitter_removelistener_eventname_listener is a method of EventsEmitter
      dealer.close();
      return;
    }
    total++;
    // Simulate some work
    setTimeout(sendMessage, randomBetween(0, 500));
  });
  //  Tell the broker we're ready for work
  sendMessage();
}
function main() {
  const broker = zmq.socket('router');
  //broker.bindSync('tcp://*:5671');
  broker.bind('tcp://*:'+PORT);
  let endTime = Date.now() + 5000
    , workersFired = 0;
  broker.on('message', function (...args) {
    // console.log("Inside Master. args = "+ins(args.map(x => x.toString())));
    const identity = args[0]
      , now = Date.now();
    if (now < endTime) {
      broker.send([identity, 'more work']);
    } else {
      broker.send([identity, 'stop']);
      workersFired++;
      if (workersFired === numWorkers) {
        setImmediate(function () { // See https://nodejs.org/api/timers.html#timers_setimmediate_callback_args
          broker.close();
          cluster.disconnect();
        });
      }
    }
  });
  for (let i=0;i<numWorkers;i++) {
    cluster.fork({identity: "worker"+i});
  }
}
if (cluster.isMaster) main();
else workerTask();
Observa que pese a que el worker envía solamente [ 'ready' ]:
1
2
3
4
  const sendMessage = () => dealer.send(['ready']);
  ...
    // Simulate some work
    setTimeout(sendMessage, randomBetween(0, 500));
En el master recibimos como primer elemento la identity del worker:
1
2
3
  broker.on('message', function (...args) {
    // console.log("Inside Master. args = "+ins(args.map(x => x.toString())));
    const identity = args[0]
Consultemos la documentación de 0MQ:
The zmq_socket() man page describes it thus:
When receiving messages a
ZMQ_ROUTERsocket shall prepend a message part containing the identity of the originating peer to the message before passing it to the application. Messages received are fair-queued from among all connected peers. When sending messages aZMQ_ROUTERsocket shall remove the first part of the message and use it to determine the identity of the peer the message shall be routed to.
Cuando ejecuto este programa, obtengo una salida parecida a esta:
1
2
3
4
5
6
7
8
9
[~/.../microservices(master)]$ node dealer.js 
identity worker0 process 56820 port = 60300
identity worker2 process 56822 port = 60300
identity worker3 process 56823 port = 60300
identity worker1 process 56821 port = 60300
Completed: 24 tasks (worker3 56823)
Completed: 22 tasks (worker2 56822)
Completed: 19 tasks (worker1 56821)
Completed: 18 tasks (worker0 56820)
Usando 0MQ y paralelismo de granja, compute en paralelo una aproximación al número π aprovechando la siguiente fórmula:
\[\int_{0}^{1} \frac{4}{(1+x^2)} dx = 4 \arctan(x) |_{0}^{1}\ = 4 ( \frac{\pi}{4} - 0) = \pi\]Para computar π aproxime la integral mediante sumas de áreas de rectángulos:

Escriba un chat de línea de comandos - con rooms - usando 0MQ.
En el servidor:
1
2
3
4
5
6
7
8
9
     publisher.send( ["room-1", // topic
                        JSON.stringify(
                          {
                            type: "message",
                            from: user,
                            content: content
                          }
                        )
                      ]
En el cliente:
1
2
3
4
5
subscriber.on("message", (room, data) => {
  console.log(room.toString());
  const message = JSON.parse(data);
  ...
});
Fichero local/src/javascript/learning/readline-examples/small-cli.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
  const readline = require('readline');
  const rl = readline.createInterface({
    input: process.stdin,
    output: process.stdout,
    prompt: 'DSI> '
  });
  const bye = () => {
    console.log('Have a great day!');
    process.exit(0);
  };
  const methods = {
    hello: () => console.log("world"),
    exit: () => bye(),
    default: (line) => console.log(`Say what? I might have heard '${line.trim()}'`),
  };
  rl.prompt();
  rl.on('line', (line) => {
    const choice = line.trim();
    if (choice in methods) methods[choice]();
    else methods['default'](line);
    rl.prompt();
  }).on('close', () => bye);