113 lines
3.6 KiB
JavaScript
113 lines
3.6 KiB
JavaScript
const { Kafka, logLevel } = require('kafkajs')
|
|
|
|
// config as used in PHOENIX-API services
|
|
// see: https://kafka.js.org/docs/configuration
|
|
const KAFKA_BROKER_HOST ='kafka.confluent.svc.cluster.local'
|
|
const KAFKA_SASL_USERNAME ='mzc-ciot-phoenix-api'
|
|
const KAFKA_SASL_PASSWORD ='2YOpkuy95eD9SAsnuOoGSEhRBoMt9fatDLBUV2RcHOsX'
|
|
const KAFKA_SASL_MECHANISM ='PLAIN'
|
|
const KAFKA_RETRY_MAX_TIME = 10000
|
|
const KAFKA_RETRY_INITIAL_TIME = 1000
|
|
const KAFKA_RETRY_FACTOR = 0.2
|
|
const KAFKA_RETRY_MULTIPLIER = 2
|
|
const KAFKA_RETRY_RETRIES = 5
|
|
const KAFKA_CONSUMER_AUTO_COMMIT_INTERVAL = 500
|
|
const KAFKA_PRODUCER_METADATA_MAX_AGE = undefined
|
|
const KAFKA_PRODUCER_AUTO_TOPIC_CREATION = true
|
|
const KAFKA_PRODUCER_TRANSACTION_TIMEOUT = 120000
|
|
const KAFKA_PRODUCER_ACKS = -1
|
|
const KAFKA_PRODUCER_TIMEOUT = 10000
|
|
const KAFKA_AUTH_TIMEOUT = 10000
|
|
const KAFKA_REAUTH_THRESHOLD = 10000
|
|
const KAFKA_SOCKET_TIMEOUT = 10000
|
|
|
|
const MESSAGE_PRODUCING_INTERVAL = 121000
|
|
const CLIENT_ID = 'icarus-debug-producer'
|
|
const TOPIC = 'test-topic-icarus-debug'
|
|
|
|
let interval = null
|
|
|
|
const kafka = new Kafka({
|
|
clientId: CLIENT_ID,
|
|
logLevel: logLevel.DEBUG,
|
|
brokers: [KAFKA_BROKER_HOST],
|
|
ssl: { rejectUnauthorized: false },
|
|
authenticationTimeout: KAFKA_AUTH_TIMEOUT,
|
|
reauthenticationThreshold: KAFKA_REAUTH_THRESHOLD,
|
|
retry: {
|
|
initialRetryTime: KAFKA_RETRY_INITIAL_TIME,
|
|
retries: KAFKA_RETRY_RETRIES,
|
|
factor: KAFKA_RETRY_FACTOR,
|
|
multiplier: KAFKA_RETRY_MULTIPLIER,
|
|
maxRetryTime: KAFKA_RETRY_MAX_TIME,
|
|
},
|
|
sasl: {
|
|
mechanism: KAFKA_SASL_MECHANISM,
|
|
username: KAFKA_SASL_USERNAME,
|
|
password: KAFKA_SASL_PASSWORD,
|
|
},
|
|
})
|
|
|
|
const producer = kafka.producer({
|
|
retry: {
|
|
initialRetryTime: KAFKA_RETRY_INITIAL_TIME,
|
|
retries: KAFKA_RETRY_RETRIES,
|
|
factor: KAFKA_RETRY_FACTOR,
|
|
multiplier: KAFKA_RETRY_MULTIPLIER,
|
|
maxRetryTime: KAFKA_RETRY_MAX_TIME,
|
|
},
|
|
metadataMaxAge: KAFKA_PRODUCER_METADATA_MAX_AGE,
|
|
allowAutoTopicCreation: KAFKA_PRODUCER_AUTO_TOPIC_CREATION,
|
|
transactionTimeout: KAFKA_PRODUCER_TRANSACTION_TIMEOUT,
|
|
})
|
|
|
|
async function main () {
|
|
let i = 1
|
|
const { REQUEST, DISCONNECT, CONNECT, REQUEST_TIMEOUT, REQUEST_QUEUE_SIZE } = producer.events
|
|
|
|
producer.on(REQUEST, e => console.log(`REQUEST at ${e.timestamp}`))
|
|
producer.on(DISCONNECT, e => console.log(`DISCONNECT at ${e.timestamp}`))
|
|
producer.on(CONNECT, e => console.log(`CONNECT at ${e.timestamp}`))
|
|
producer.on(REQUEST_TIMEOUT, e => console.log(`REQUEST_TIMEOUT at ${e.timestamp}`))
|
|
producer.on(REQUEST_QUEUE_SIZE, e => console.log(`REQUEST_QUEUE_SIZE at ${e.timestamp}`))
|
|
|
|
console.log('connecting')
|
|
await producer.connect()
|
|
console.log('connected')
|
|
|
|
|
|
interval = setInterval(async () => {
|
|
const message = { idx: i++, value: (new Date()).toTimeString() }
|
|
console.log('sending', message)
|
|
await producer.send({
|
|
topic: TOPIC,
|
|
messages: [message],
|
|
timeout: KAFKA_PRODUCER_TIMEOUT,
|
|
acks: KAFKA_PRODUCER_ACKS,
|
|
})
|
|
}, MESSAGE_PRODUCING_INTERVAL)
|
|
}
|
|
|
|
function terminator(sig) {
|
|
if (typeof sig === 'string') {
|
|
clearInterval(interval)
|
|
// call your async task here and then call process.exit() after async task is done
|
|
producer.disconnect().then(function() {
|
|
console.log('Received %s - terminating server app ...', sig)
|
|
process.exit(1)
|
|
})
|
|
}
|
|
console.log('Node server stopped.')
|
|
}
|
|
|
|
// catching signals and do something before exit
|
|
['SIGHUP', 'SIGINT', 'SIGQUIT', 'SIGILL', 'SIGTRAP', 'SIGABRT',
|
|
'SIGBUS', 'SIGFPE', 'SIGUSR1', 'SIGSEGV', 'SIGUSR2', 'SIGTERM'
|
|
].forEach(function (sig) {
|
|
process.on(sig, function () {
|
|
terminator(sig)
|
|
console.log('signal: ' + sig)
|
|
})
|
|
})
|
|
|
|
main() |