package org.eclipse.hono.client.kafka.consumer;

import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.consumer.KafkaConsumerRecords;
import io.vertx.kafka.client.consumer.OffsetAndMetadata;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.common.errors.TimeoutException;
import org.eclipse.hono.util.Lifecycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/hono-client-kafka-common-1.6.0.jar:org/eclipse/hono/client/kafka/consumer/AbstractAtLeastOnceKafkaConsumer.class */
public abstract class AbstractAtLeastOnceKafkaConsumer<T> implements Lifecycle {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AbstractAtLeastOnceKafkaConsumer.class);
    private final KafkaConsumer<String, Buffer> kafkaConsumer;
    private final Set<String> topics;
    private final Pattern topicPattern;
    private final Handler<T> messageHandler;
    private final Handler<Throwable> closeHandler;
    private final Duration pollTimeout;

    public AbstractAtLeastOnceKafkaConsumer(KafkaConsumer<String, Buffer> kafkaConsumer, String str, Handler<T> handler, Handler<Throwable> handler2, long j) {
        this(kafkaConsumer, (Set<String>) Set.of((String) Objects.requireNonNull(str)), handler, handler2, j);
    }

    public AbstractAtLeastOnceKafkaConsumer(KafkaConsumer<String, Buffer> kafkaConsumer, Set<String> set, Handler<T> handler, Handler<Throwable> handler2, long j) {
        this(kafkaConsumer, (Set) Objects.requireNonNull(set), null, handler, handler2, j);
    }

    public AbstractAtLeastOnceKafkaConsumer(KafkaConsumer<String, Buffer> kafkaConsumer, Pattern pattern, Handler<T> handler, Handler<Throwable> handler2, long j) {
        this(kafkaConsumer, null, (Pattern) Objects.requireNonNull(pattern), handler, handler2, j);
    }

    private AbstractAtLeastOnceKafkaConsumer(KafkaConsumer<String, Buffer> kafkaConsumer, Set<String> set, Pattern pattern, Handler<T> handler, Handler<Throwable> handler2, long j) {
        Objects.requireNonNull(kafkaConsumer);
        Objects.requireNonNull(handler);
        Objects.requireNonNull(handler2);
        this.kafkaConsumer = kafkaConsumer;
        this.messageHandler = handler;
        this.closeHandler = handler2;
        this.topics = set;
        this.topicPattern = pattern;
        this.pollTimeout = Duration.ofMillis(j);
    }

    protected abstract T createMessage(KafkaConsumerRecord<String, Buffer> kafkaConsumerRecord);

    @Override // org.eclipse.hono.util.Lifecycle
    public Future<Void> start() {
        Promise promise = Promise.promise();
        if (this.topics != null) {
            this.kafkaConsumer.subscribe(this.topics, promise);
        } else {
            this.kafkaConsumer.subscribe(this.topicPattern, promise);
        }
        return promise.future().compose(r5 -> {
            Promise promise2 = Promise.promise();
            this.kafkaConsumer.poll(this.pollTimeout, promise2);
            return promise2.future().onSuccess2(this::handleBatch).recover(th -> {
                return Future.failedFuture(new KafkaConsumerPollException(th));
            }).mapEmpty();
        });
    }

    @Override // org.eclipse.hono.util.Lifecycle
    public Future<Void> stop() {
        Promise promise = Promise.promise();
        this.kafkaConsumer.close(promise);
        return promise.future();
    }

    private void handleBatch(KafkaConsumerRecords<String, Buffer> kafkaConsumerRecords) {
        LOG.debug("Polled {} records", Integer.valueOf(kafkaConsumerRecords.size()));
        CompositeFuture.all(processBatch(kafkaConsumerRecords)).compose(compositeFuture -> {
            return commit(true);
        }).compose(r3 -> {
            return poll();
        }).onSuccess2(this::handleBatch);
    }

    private Future<KafkaConsumerRecords<String, Buffer>> poll() {
        Promise promise = Promise.promise();
        this.kafkaConsumer.poll(this.pollTimeout, promise);
        return promise.future().recover(th -> {
            LOG.error("Error polling messages: " + th);
            KafkaConsumerPollException kafkaConsumerPollException = new KafkaConsumerPollException(th);
            closeWithError(kafkaConsumerPollException);
            return Future.failedFuture(kafkaConsumerPollException);
        });
    }

    private List<Future> processBatch(KafkaConsumerRecords<String, Buffer> kafkaConsumerRecords) {
        IntStream range = IntStream.range(0, kafkaConsumerRecords.size());
        Objects.requireNonNull(kafkaConsumerRecords);
        return (List) range.mapToObj(kafkaConsumerRecords::recordAt).map(this::processRecord).collect(Collectors.toList());
    }

    private Future<Void> processRecord(KafkaConsumerRecord<String, Buffer> kafkaConsumerRecord) {
        try {
            this.messageHandler.handle(createMessage(kafkaConsumerRecord));
            return Future.succeededFuture();
        } catch (Exception e) {
            LOG.error("Error handling record, closing the consumer: ", (Throwable) e);
            commitCurrentOffset(kafkaConsumerRecord);
            stop();
            return Future.failedFuture(e);
        }
    }

    private Future<Void> commit(boolean z) {
        Promise promise = Promise.promise();
        this.kafkaConsumer.commit(promise);
        return promise.future().onSuccess2(r3 -> {
            LOG.debug("Committed offsets");
        }).recover(th -> {
            LOG.error("Error committing offsets: " + th);
            if ((th instanceof TimeoutException) && z) {
                LOG.debug("Committing offsets timed out. Maybe increase 'default.api.timeout.ms'?");
                return commit(false);
            }
            KafkaConsumerCommitException kafkaConsumerCommitException = new KafkaConsumerCommitException(th);
            closeWithError(kafkaConsumerCommitException);
            return Future.failedFuture(kafkaConsumerCommitException);
        });
    }

    private void commitCurrentOffset(KafkaConsumerRecord<String, Buffer> kafkaConsumerRecord) {
        this.kafkaConsumer.commit(Map.of(new TopicPartition(kafkaConsumerRecord.topic(), kafkaConsumerRecord.partition()), new OffsetAndMetadata(kafkaConsumerRecord.offset() + 1, "")));
    }

    private void closeWithError(Throwable th) {
        LOG.error("Closing consumer with cause", th);
        this.closeHandler.handle(th);
        stop();
    }
}
