进程通信
Node运行在单线程下,但这并不意味着无法利用多核/多机下多进程的优势。事实上,Node最初从设计上就考虑了分布式网络场景。
# 创建进程
通信方式与进程产生方式有关,node有四种创建进程的方式:
spawn()exec()execFile()fork()
# 通信方式
# 通过 stdin/stdout 传递 json
const { spawn } = require('child_process');
child = spawn('node', ['./stdio-child.js']);
child.stdout.setEncoding('utf8');
// 父进程-发
child.stdin.write(JSON.stringify({
type: 'handshake',
payload: '你好吖'
}));
// 父进程-收
child.stdout.on('data', function (chunk) {
let data = chunk.toString();
let message = JSON.parse(data);
console.log(`${message.type} ${message.payload}`);
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
子进程与之类似:
// ./stdio-child.js
// 子进程-收
process.stdin.on('data', (chunk) => {
let data = chunk.toString();
let message = JSON.parse(data);
switch (message.type) {
case 'handshake':
// 子进程-发
process.stdout.write(JSON.stringify({
type: 'message',
payload: message.payload + ' : hoho'
}));
break;
default:
break;
}
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 原生 IPC 支持
如spawn()及fork()的例子,进程之间可以借助内置的IPC机制通信 父进程:
process.on('message')收child.send()发 子进程:process.on('message')收process.send()发 限制同上,同样要有一方能够拿到另一方的handle才行
# sockets
借助网络来完成进程间通信,不仅能跨进程,还能跨机器 node-ipc就采用这种方案:
// server
const ipc=require('../../../node-ipc');
ipc.config.id = 'world';
ipc.config.retry= 1500;
ipc.config.maxConnections=1;
ipc.serveNet(
function(){
ipc.server.on(
'message',
function(data,socket){
ipc.log('got a message : ', data);
ipc.server.emit(
socket,
'message',
data+' world!'
);
}
);
ipc.server.on(
'socket.disconnected',
function(data,socket){
console.log('DISCONNECTED\n\n',arguments);
}
);
}
);
ipc.server.on(
'error',
function(err){
ipc.log('Got an ERROR!',err);
}
);
ipc.server.start();
// client
const ipc=require('node-ipc');
ipc.config.id = 'hello';
ipc.config.retry= 1500;
ipc.connectToNet(
'world',
function(){
ipc.of.world.on(
'connect',
function(){
ipc.log('## connected to world ##', ipc.config.delay);
ipc.of.world.emit(
'message',
'hello'
);
}
);
ipc.of.world.on(
'disconnect',
function(){
ipc.log('disconnected from world');
}
);
ipc.of.world.on(
'message',
function(data){
ipc.log('got a message from world : ', data);
}
);
}
);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
当然,单机场景下通过网络来完成进程间通信有些浪费性能,但网络通信的优势在于跨环境的兼容性与更进一步的RPC场景
# message queue
父子进程都通过外部消息机制来通信,跨进程的能力取决于MQ支持 即进程间不直接通信,而是通过中间层(MQ),加一个控制层就能获得更多灵活性和优势:
- 稳定性:消息机制提供了强大的稳定性保证,比如确认送达(消息回执ACK),失败重发/防止多发等等
- 优先级控制:允许调整消息响应次序
- 离线能力:消息可以被缓存
- 事务性消息处理:把关联消息组合成事务,保证其送达顺序及完整性 比较受欢迎 smrchy/rsmq (opens new window) 如:
// init
RedisSMQ = require("rsmq");
rsmq = new RedisSMQ( {host: "127.0.0.1", port: 6379, ns: "rsmq"} );
// create queue
rsmq.createQueue({qname:"myqueue"}, function (err, resp) {
if (resp===1) {
console.log("queue created")
}
});
// send message
rsmq.sendMessage({qname:"myqueue", message:"Hello World"}, function (err, resp) {
if (resp) {
console.log("Message sent. ID:", resp);
}
});
// receive message
rsmq.receiveMessage({qname:"myqueue"}, function (err, resp) {
if (resp.id) {
console.log("Message received.", resp)
}
else {
console.log("No messages for me...")
}
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
会起一个Redis server,基本原理如下:消息的收/发/缓存/持久化依靠Redis提供的能力,在此基础上实现完整的队列机制
# Redis
基本思路与message queue类似: Redis自带Pub/Sub机制(即发布-订阅模式),适用于简单的通信场景,比如一对一或一对多并且不关注消息可靠性的场景 另外,Redis有list结构,可以用作消息队列,以此提高消息可靠性。一般做法是生产者LPUSH消息,消费者BRPOP消息。适用于要求消息可靠性的简单通信场景,但缺点是消息不具状态,且没有ACK机制,无法满足复杂的通信需求
# 总结
Node进程间通信有4种方式:
- 通过stdin/stdout传递json:最直接的方式,适用于能够拿到“子”进程handle的场景,适用于关联进程之间通信,无法跨机器
- Node原生IPC支持:最native(地道?)的方式,比上一种“正规”一些,具有同样的局限性
- 通过sockets:最通用的方式,有良好的跨环境能力,但存在网络的性能损耗
- 借助message queue:最强大的方式,既然要通信,场景还复杂,不妨扩展出一层消息中间件,漂亮地解决各种通信问题
上次更新: 2022/08/31, 09:41:27