commit 2ae8a87319579ab5538f669585ae84e3100a69c1 Author: s.golasch Date: Tue Aug 1 11:41:47 2023 +0200 first commit diff --git a/index.js b/index.js new file mode 100644 index 0000000..0fc3488 --- /dev/null +++ b/index.js @@ -0,0 +1,113 @@ +const { Kafka, logLevel } = require('kafkajs') + +// config as used in PHOENIX-API services +// see: https://kafka.js.org/docs/configuration +const KAFKA_BROKER_HOST ='kafka.confluent.svc.cluster.local' +const KAFKA_SASL_USERNAME ='mzc-ciot-phoenix-api' +const KAFKA_SASL_PASSWORD ='2YOpkuy95eD9SAsnuOoGSEhRBoMt9fatDLBUV2RcHOsX' +const KAFKA_SASL_MECHANISM ='PLAIN' +const KAFKA_RETRY_MAX_TIME = 10000 +const KAFKA_RETRY_INITIAL_TIME = 1000 +const KAFKA_RETRY_FACTOR = 0.2 +const KAFKA_RETRY_MULTIPLIER = 2 +const KAFKA_RETRY_RETRIES = 5 +const KAFKA_CONSUMER_AUTO_COMMIT_INTERVAL = 500 +const KAFKA_PRODUCER_METADATA_MAX_AGE = undefined +const KAFKA_PRODUCER_AUTO_TOPIC_CREATION = true +const KAFKA_PRODUCER_TRANSACTION_TIMEOUT = 120000 +const KAFKA_PRODUCER_ACKS = -1 +const KAFKA_PRODUCER_TIMEOUT = 10000 +const KAFKA_AUTH_TIMEOUT = 10000 +const KAFKA_REAUTH_THRESHOLD = 10000 +const KAFKA_SOCKET_TIMEOUT = 10000 + +const MESSAGE_PRODUCING_INTERVAL = 121000 +const CLIENT_ID = 'icarus-debug-producer' +const TOPIC = 'test-topic-icarus-debug' + +let interval = null + +const kafka = new Kafka({ + clientId: CLIENT_ID, + logLevel: logLevel.DEBUG, + brokers: [KAFKA_BROKER_HOST], + ssl: { rejectUnauthorized: false }, + authenticationTimeout: KAFKA_AUTH_TIMEOUT, + reauthenticationThreshold: KAFKA_REAUTH_THRESHOLD, + retry: { + initialRetryTime: KAFKA_RETRY_INITIAL_TIME, + retries: KAFKA_RETRY_RETRIES, + factor: KAFKA_RETRY_FACTOR, + multiplier: KAFKA_RETRY_MULTIPLIER, + maxRetryTime: KAFKA_RETRY_MAX_TIME, + }, + sasl: { + mechanism: KAFKA_SASL_MECHANISM, + username: KAFKA_SASL_USERNAME, + password: KAFKA_SASL_PASSWORD, + }, +}) + +const producer = kafka.producer({ + retry: { + initialRetryTime: KAFKA_RETRY_INITIAL_TIME, + retries: KAFKA_RETRY_RETRIES, + factor: KAFKA_RETRY_FACTOR, + multiplier: KAFKA_RETRY_MULTIPLIER, + maxRetryTime: KAFKA_RETRY_MAX_TIME, + }, + metadataMaxAge: KAFKA_PRODUCER_METADATA_MAX_AGE, + allowAutoTopicCreation: KAFKA_PRODUCER_AUTO_TOPIC_CREATION, + transactionTimeout: KAFKA_PRODUCER_TRANSACTION_TIMEOUT, +}) + +async function main () { + let i = 1 + const { REQUEST, DISCONNECT, CONNECT, REQUEST_TIMEOUT, REQUEST_QUEUE_SIZE } = producer.events + + producer.on(REQUEST, e => console.log(`REQUEST at ${e.timestamp}`)) + producer.on(DISCONNECT, e => console.log(`DISCONNECT at ${e.timestamp}`)) + producer.on(CONNECT, e => console.log(`CONNECT at ${e.timestamp}`)) + producer.on(REQUEST_TIMEOUT, e => console.log(`REQUEST_TIMEOUT at ${e.timestamp}`)) + producer.on(REQUEST_QUEUE_SIZE, e => console.log(`REQUEST_QUEUE_SIZE at ${e.timestamp}`)) + + console.log('connecting') + await producer.connect() + console.log('connected') + + + interval = setInterval(async () => { + const message = { idx: i++, value: (new Date()).toTimeString() } + console.log('sending', message) + await producer.send({ + topic: TOPIC, + messages: [message], + timeout: KAFKA_PRODUCER_TIMEOUT, + acks: KAFKA_PRODUCER_ACKS, + }) + }, MESSAGE_PRODUCING_INTERVAL) +} + +function terminator(sig) { + if (typeof sig === 'string') { + clearInterval(interval) + // call your async task here and then call process.exit() after async task is done + producer.disconnect().then(function() { + console.log('Received %s - terminating server app ...', sig) + process.exit(1) + }) + } + console.log('Node server stopped.') +} + +// catching signals and do something before exit +['SIGHUP', 'SIGINT', 'SIGQUIT', 'SIGILL', 'SIGTRAP', 'SIGABRT', + 'SIGBUS', 'SIGFPE', 'SIGUSR1', 'SIGSEGV', 'SIGUSR2', 'SIGTERM' +].forEach(function (sig) { + process.on(sig, function () { + terminator(sig) + console.log('signal: ' + sig) + }) +}) + +main() \ No newline at end of file diff --git a/package-lock.json b/package-lock.json new file mode 100644 index 0000000..c56ce99 --- /dev/null +++ b/package-lock.json @@ -0,0 +1,30 @@ +{ + "name": "kafka-producer", + "version": "1.0.0", + "lockfileVersion": 2, + "requires": true, + "packages": { + "": { + "version": "1.0.0", + "license": "ISC", + "dependencies": { + "kafkajs": "1.15.0" + } + }, + "node_modules/kafkajs": { + "version": "1.15.0", + "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-1.15.0.tgz", + "integrity": "sha512-yjPyEnQCkPxAuQLIJnY5dI+xnmmgXmhuOQ1GVxClG5KTOV/rJcW1qA3UfvyEJKTp/RTSqQnUR3HJsKFvHyTpNg==", + "engines": { + "node": ">=10.13.0" + } + } + }, + "dependencies": { + "kafkajs": { + "version": "1.15.0", + "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-1.15.0.tgz", + "integrity": "sha512-yjPyEnQCkPxAuQLIJnY5dI+xnmmgXmhuOQ1GVxClG5KTOV/rJcW1qA3UfvyEJKTp/RTSqQnUR3HJsKFvHyTpNg==" + } + } +} diff --git a/package.json b/package.json new file mode 100644 index 0000000..0da0fe1 --- /dev/null +++ b/package.json @@ -0,0 +1,15 @@ +{ + "name": "kafka-producer", + "version": "1.0.0", + "description": "", + "main": "index.js", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "keywords": [], + "author": "", + "license": "ISC", + "dependencies": { + "kafkajs": "1.15.0" + } +}