-
-
Notifications
You must be signed in to change notification settings - Fork 233
/
Copy pathindex.js
87 lines (72 loc) · 2.65 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
const cluster = require('cluster')
const Aedes = require('aedes')
const { createServer } = require('net')
const { cpus } = require('os')
const MONGO_URL = 'mongodb://127.0.0.1/aedes-clusters'
const mq = process.env.MQ === 'redis'
? require('mqemitter-redis')({
port: process.env.REDIS_PORT || 6379
})
: require('mqemitter-mongodb')({
url: MONGO_URL
})
const persistence = process.env.PERSISTENCE === 'redis'
? require('aedes-persistence-redis')({
port: process.env.REDIS_PORT || 6379
})
: require('aedes-persistence-mongodb')({
url: MONGO_URL
})
function startAedes () {
const port = 1883
const aedes = Aedes({
id: 'BROKER_' + cluster.worker.id,
mq,
persistence
})
const server = createServer(aedes.handle)
server.listen(port, '0.0.0.0', function () {
console.log('Aedes listening on port:', port)
aedes.publish({ topic: 'aedes/hello', payload: "I'm broker " + aedes.id })
})
server.on('error', function (err) {
console.log('Server error', err)
process.exit(1)
})
aedes.on('subscribe', function (subscriptions, client) {
console.log('MQTT client \x1b[32m' + (client ? client.id : client) +
'\x1b[0m subscribed to topics: ' + subscriptions.map(s => s.topic).join('\n'), 'from broker', aedes.id)
})
aedes.on('unsubscribe', function (subscriptions, client) {
console.log('MQTT client \x1b[32m' + (client ? client.id : client) +
'\x1b[0m unsubscribed to topics: ' + subscriptions.join('\n'), 'from broker', aedes.id)
})
// fired when a client connects
aedes.on('client', function (client) {
console.log('Client Connected: \x1b[33m' + (client ? client.id : client) + '\x1b[0m', 'to broker', aedes.id)
})
// fired when a client disconnects
aedes.on('clientDisconnect', function (client) {
console.log('Client Disconnected: \x1b[31m' + (client ? client.id : client) + '\x1b[0m', 'to broker', aedes.id)
})
// fired when a message is published
aedes.on('publish', async function (packet, client) {
console.log('Client \x1b[31m' + (client ? client.id : 'BROKER_' + aedes.id) + '\x1b[0m has published', packet.payload.toString(), 'on', packet.topic, 'to broker', aedes.id)
})
}
if (cluster.isMaster) {
const numWorkers = cpus().length
for (let i = 0; i < numWorkers; i++) {
cluster.fork()
}
cluster.on('online', function (worker) {
console.log('Worker ' + worker.process.pid + ' is online')
})
cluster.on('exit', function (worker, code, signal) {
console.log('Worker ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal)
console.log('Starting a new worker')
cluster.fork()
})
} else {
startAedes()
}