Update kafka producer

This commit is contained in:
Andrea Cavalli 2022-09-07 15:29:46 +02:00
parent a77442bae5
commit 202a90846b
1 changed files with 3 additions and 1 deletions

View File

@ -26,7 +26,9 @@ public abstract class KafkaProducer<K> {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaParameters.bootstrapServers());
props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaParameters.clientId());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, getChannelName().getSerializerClass());
SenderOptions<Integer, K> senderOptions = SenderOptions.create(props);