const { Kafka, logLevel } = require('kafkajs') // config as used in PHOENIX-API services // see: https://kafka.js.org/docs/configuration const KAFKA_BROKER_HOST ='kafka.confluent.svc.cluster.local' const KAFKA_SASL_USERNAME ='mzc-ciot-phoenix-api' const KAFKA_SASL_PASSWORD ='2YOpkuy95eD9SAsnuOoGSEhRBoMt9fatDLBUV2RcHOsX' const KAFKA_SASL_MECHANISM ='PLAIN' const KAFKA_RETRY_MAX_TIME = 10000 const KAFKA_RETRY_INITIAL_TIME = 1000 const KAFKA_RETRY_FACTOR = 0.2 const KAFKA_RETRY_MULTIPLIER = 2 const KAFKA_RETRY_RETRIES = 5 const KAFKA_CONSUMER_AUTO_COMMIT_INTERVAL = 500 const KAFKA_PRODUCER_METADATA_MAX_AGE = undefined const KAFKA_PRODUCER_AUTO_TOPIC_CREATION = true const KAFKA_PRODUCER_TRANSACTION_TIMEOUT = 120000 const KAFKA_PRODUCER_ACKS = -1 const KAFKA_PRODUCER_TIMEOUT = 10000 const KAFKA_AUTH_TIMEOUT = 10000 const KAFKA_REAUTH_THRESHOLD = 10000 const KAFKA_SOCKET_TIMEOUT = 10000 const MESSAGE_PRODUCING_INTERVAL = 121000 const CLIENT_ID = 'icarus-debug-producer' const TOPIC = 'test-topic-icarus-debug' let interval = null const kafka = new Kafka({ clientId: CLIENT_ID, logLevel: logLevel.DEBUG, brokers: [KAFKA_BROKER_HOST], ssl: { rejectUnauthorized: false }, authenticationTimeout: KAFKA_AUTH_TIMEOUT, reauthenticationThreshold: KAFKA_REAUTH_THRESHOLD, retry: { initialRetryTime: KAFKA_RETRY_INITIAL_TIME, retries: KAFKA_RETRY_RETRIES, factor: KAFKA_RETRY_FACTOR, multiplier: KAFKA_RETRY_MULTIPLIER, maxRetryTime: KAFKA_RETRY_MAX_TIME, }, sasl: { mechanism: KAFKA_SASL_MECHANISM, username: KAFKA_SASL_USERNAME, password: KAFKA_SASL_PASSWORD, }, }) const producer = kafka.producer({ retry: { initialRetryTime: KAFKA_RETRY_INITIAL_TIME, retries: KAFKA_RETRY_RETRIES, factor: KAFKA_RETRY_FACTOR, multiplier: KAFKA_RETRY_MULTIPLIER, maxRetryTime: KAFKA_RETRY_MAX_TIME, }, metadataMaxAge: KAFKA_PRODUCER_METADATA_MAX_AGE, allowAutoTopicCreation: KAFKA_PRODUCER_AUTO_TOPIC_CREATION, transactionTimeout: KAFKA_PRODUCER_TRANSACTION_TIMEOUT, }) async function main () { let i = 1 const { REQUEST, DISCONNECT, CONNECT, REQUEST_TIMEOUT, REQUEST_QUEUE_SIZE } = producer.events producer.on(REQUEST, e => console.log(`REQUEST at ${e.timestamp}`)) producer.on(DISCONNECT, e => console.log(`DISCONNECT at ${e.timestamp}`)) producer.on(CONNECT, e => console.log(`CONNECT at ${e.timestamp}`)) producer.on(REQUEST_TIMEOUT, e => console.log(`REQUEST_TIMEOUT at ${e.timestamp}`)) producer.on(REQUEST_QUEUE_SIZE, e => console.log(`REQUEST_QUEUE_SIZE at ${e.timestamp}`)) console.log('connecting') await producer.connect() console.log('connected') interval = setInterval(async () => { const message = { idx: i++, value: (new Date()).toTimeString() } console.log('sending', message) await producer.send({ topic: TOPIC, messages: [message], timeout: KAFKA_PRODUCER_TIMEOUT, acks: KAFKA_PRODUCER_ACKS, }) }, MESSAGE_PRODUCING_INTERVAL) } function terminator(sig) { if (typeof sig === 'string') { clearInterval(interval) // call your async task here and then call process.exit() after async task is done producer.disconnect().then(function() { console.log('Received %s - terminating server app ...', sig) process.exit(1) }) } console.log('Node server stopped.') } // catching signals and do something before exit ['SIGHUP', 'SIGINT', 'SIGQUIT', 'SIGILL', 'SIGTRAP', 'SIGABRT', 'SIGBUS', 'SIGFPE', 'SIGUSR1', 'SIGSEGV', 'SIGUSR2', 'SIGTERM' ].forEach(function (sig) { process.on(sig, function () { terminator(sig) console.log('signal: ' + sig) }) }) main()