Master de II. ULL. 1er cuatrimestre
Lea el Capítulo 4 “Connecting Robust Microservices” de Node.JS The Right Way y resuelva los problemas en la secciones
yzmq-filer-rep.js
program uses fs.readfile()
to serve up file contents. However it doesn’t handle error cases.
SIGINT
to detect the user’s Ctrl-C
in the terminal
SIGTERM
(the termination signal)?uncaughtException
event on the process objectonline
events and logged a message when the workers came up.kill [pid]
from the command linezmq-filer-rep-cluster.js
program to fork a new worker whenever one dies?PUSH
socket and bind it to an IPC endpoint. This socket will be for sending jobs to the workersPULL
socket and bind it to a different IPC endpoint. This socket will receive messages from the workersready
message, increment the ready counter, orresult
message , output it to the consolejob
messages out through the push socketPULL
socket and connect it to the master ‘s PUSH
endpointPUSH
socket and connect it to the master ‘s PULL
endpointjob
messages on the PULL
socket, and respond by sending a result
message out on the PUSH
socketready
message out on the PUSH
socket.result
messages include at least the process ID of the worker.
This way you can inspect the console output and confirm that the workload is being balanced among the worker processes.connecting-robust-microservices-chapter-4/microservices
in our repo ULL-MII-CA-1819/nodejs-the-right-way
En el paradigma de paralelismo conocido como Farm o Granja de Procesadores la tarea que se quiere realizar es dividida en un subconjunto de sub-tareas bastante mayor que el número de procesadores disponibles.
Una ventaja que tiene este paradigma es que consigue equilibrar la carga de trabajo entre las máquinas,
En el siguiente código mostramos como usar los sockets ROUTER y DEALER de 0MQ junto con los clusters de Node.js para crear un borrador de una granja de trabajadores:
Fichero connecting-robust-microservices-chapter-4/microservices/dealer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
'use strict';
const PORT = require("./port.js");
const ins = require("./ins.js");
const cluster = require("cluster");
const zmq = require("zeromq");
const numWorkers = require("os").cpus().length;
const randomBetween = (min, max) => Math.floor(Math.random() * (max - min) + min);
function workerTask() {
const dealer = zmq.socket('dealer');
dealer.identity = process.env["identity"];
console.log("identity "+dealer.identity+
" process "+process.pid+" port = "+PORT);
dealer.connect('tcp://localhost:'+PORT);
let total = 0;
const sendMessage = () => dealer.send(['ready']);
// Get workload from broker, until finished
dealer.on('message', function onMessage(...args) {
// console.log("Inside Worker. args = "+ins(args.map(x => x.toString())));
const workload = args[0].toString('utf8');
//console.log("Inside Worker. workload = "+workload);
if (workload === 'stop') {
console.log('Completed: '+total+' tasks ('+dealer.identity+' '+process.pid+')');
dealer.removeListener('message', onMessage);
// https://nodejs.org/api/events.html#events_emitter_removelistener_eventname_listener is a method of EventsEmitter
dealer.close();
return;
}
total++;
// Simulate some work
setTimeout(sendMessage, randomBetween(0, 500));
});
// Tell the broker we're ready for work
sendMessage();
}
function main() {
const broker = zmq.socket('router');
//broker.bindSync('tcp://*:5671');
broker.bind('tcp://*:'+PORT);
let endTime = Date.now() + 5000
, workersFired = 0;
broker.on('message', function (...args) {
// console.log("Inside Master. args = "+ins(args.map(x => x.toString())));
const identity = args[0]
, now = Date.now();
if (now < endTime) {
broker.send([identity, 'more work']);
} else {
broker.send([identity, 'stop']);
workersFired++;
if (workersFired === numWorkers) {
setImmediate(function () { // See https://nodejs.org/api/timers.html#timers_setimmediate_callback_args
broker.close();
cluster.disconnect();
});
}
}
});
for (let i=0;i<numWorkers;i++) {
cluster.fork({identity: "worker"+i});
}
}
if (cluster.isMaster) main();
else workerTask();
Observa que pese a que el worker envía solamente [ 'ready' ]
:
1
2
3
4
const sendMessage = () => dealer.send(['ready']);
...
// Simulate some work
setTimeout(sendMessage, randomBetween(0, 500));
En el master recibimos como primer elemento la identity del worker:
1
2
3
broker.on('message', function (...args) {
// console.log("Inside Master. args = "+ins(args.map(x => x.toString())));
const identity = args[0]
Consultemos la documentación de 0MQ:
The zmq_socket()
man page describes it thus:
When receiving messages a
ZMQ_ROUTER
socket shall prepend a message part containing the identity of the originating peer to the message before passing it to the application. Messages received are fair-queued from among all connected peers. When sending messages aZMQ_ROUTER
socket shall remove the first part of the message and use it to determine the identity of the peer the message shall be routed to.
Cuando ejecuto este programa, obtengo una salida parecida a esta:
1
2
3
4
5
6
7
8
9
[~/.../microservices(master)]$ node dealer.js
identity worker0 process 56820 port = 60300
identity worker2 process 56822 port = 60300
identity worker3 process 56823 port = 60300
identity worker1 process 56821 port = 60300
Completed: 24 tasks (worker3 56823)
Completed: 22 tasks (worker2 56822)
Completed: 19 tasks (worker1 56821)
Completed: 18 tasks (worker0 56820)
Usando 0MQ y paralelismo de granja, compute en paralelo una aproximación al número π aprovechando la siguiente fórmula:
\[\int_{0}^{1} \frac{4}{(1+x^2)} dx = 4 \arctan(x) |_{0}^{1}\ = 4 ( \frac{\pi}{4} - 0) = \pi\]Para computar π aproxime la integral mediante sumas de áreas de rectángulos:
Escriba un chat de línea de comandos - con rooms - usando 0MQ.
En el servidor:
1
2
3
4
5
6
7
8
9
publisher.send( ["room-1", // topic
JSON.stringify(
{
type: "message",
from: user,
content: content
}
)
]
En el cliente:
1
2
3
4
5
subscriber.on("message", (room, data) => {
console.log(room.toString());
const message = JSON.parse(data);
...
});
Fichero local/src/javascript/learning/readline-examples/small-cli.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
const readline = require('readline');
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
prompt: 'DSI> '
});
const bye = () => {
console.log('Have a great day!');
process.exit(0);
};
const methods = {
hello: () => console.log("world"),
exit: () => bye(),
default: (line) => console.log(`Say what? I might have heard '${line.trim()}'`),
};
rl.prompt();
rl.on('line', (line) => {
const choice = line.trim();
if (choice in methods) methods[choice]();
else methods['default'](line);
rl.prompt();
}).on('close', () => bye);