/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.clients.consumer.internals.AbstractMembershipManager;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.ConsumerDelegate;
import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerMetrics;
import org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker;
import org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName;
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.clients.consumer.internals.Deserializers;
import org.apache.kafka.clients.consumer.internals.Fetch;
import org.apache.kafka.clients.consumer.internals.FetchBuffer;
import org.apache.kafka.clients.consumer.internals.FetchCollector;
import org.apache.kafka.clients.consumer.internals.FetchConfig;
import org.apache.kafka.clients.consumer.internals.FetchMetricsManager;
import org.apache.kafka.clients.consumer.internals.MemberStateListener;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.consumer.internals.WakeupTrigger;
import org.apache.kafka.clients.consumer.internals.events.AllTopicsMetadataEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent;
import org.apache.kafka.clients.consumer.internals.events.CommitEvent;
import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
import org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent;
import org.apache.kafka.clients.consumer.internals.events.CurrentLagEvent;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.PausePartitionsEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
import org.apache.kafka.clients.consumer.internals.events.ResumePartitionsEvent;
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
import org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicPatternSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicRe2JPatternSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
import org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent;
import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.event.Level;

public class AsyncKafkaConsumer<K, V>
implements ConsumerDelegate<K, V> {
    private static final long NO_CURRENT_THREAD = -1L;
    private final ApplicationEventHandler applicationEventHandler;
    private final Time time;
    private final AtomicReference<Optional<ConsumerGroupMetadata>> groupMetadata = new AtomicReference(Optional.empty());
    private final AsyncConsumerMetrics kafkaConsumerMetrics;
    private Logger log;
    private final String clientId;
    private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
    private final BackgroundEventHandler backgroundEventHandler;
    private final BackgroundEventProcessor backgroundEventProcessor;
    private final CompletableEventReaper backgroundEventReaper;
    private final Deserializers<K, V> deserializers;
    private final FetchBuffer fetchBuffer;
    private final FetchCollector<K, V> fetchCollector;
    private final ConsumerInterceptors<K, V> interceptors;
    private final IsolationLevel isolationLevel;
    private final SubscriptionState subscriptions;
    private final AtomicReference<Set<TopicPartition>> groupAssignmentSnapshot = new AtomicReference(Collections.emptySet());
    private final ConsumerMetadata metadata;
    private final Metrics metrics;
    private final long retryBackoffMs;
    private final int requestTimeoutMs;
    private final Duration defaultApiTimeoutMs;
    private final boolean autoCommitEnabled;
    private volatile boolean closed = false;
    private Optional<ClientTelemetryReporter> clientTelemetryReporter = Optional.empty();
    private boolean cachedSubscriptionHasAllFetchPositions;
    private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
    private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
    private final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker;
    private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> lastPendingAsyncCommit = null;
    private final AtomicLong currentThread = new AtomicLong(-1L);
    private final AtomicInteger refCount = new AtomicInteger(0);
    private final MemberStateListener memberStateListener = new MemberStateListener(){

        @Override
        public void onMemberEpochUpdated(Optional<Integer> memberEpoch, String memberId) {
            AsyncKafkaConsumer.this.updateGroupMetadata(memberEpoch, memberId);
        }

        @Override
        public void onGroupAssignmentUpdated(Set<TopicPartition> partitions) {
            AsyncKafkaConsumer.this.setGroupAssignmentSnapshot(partitions);
        }
    };

    AsyncKafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        this(config, keyDeserializer, valueDeserializer, Time.SYSTEM, ApplicationEventHandler::new, CompletableEventReaper::new, FetchCollector::new, ConsumerMetadata::new, new LinkedBlockingQueue<BackgroundEvent>());
    }

    AsyncKafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Time time, ApplicationEventHandlerFactory applicationEventHandlerFactory, CompletableEventReaperFactory backgroundEventReaperFactory, FetchCollectorFactory<K, V> fetchCollectorFactory, ConsumerMetadataFactory metadataFactory, LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue) {
        try {
            GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config, GroupRebalanceConfig.ProtocolType.CONSUMER);
            this.clientId = config.getString("client.id");
            this.autoCommitEnabled = config.getBoolean("enable.auto.commit");
            LogContext logContext = ConsumerUtils.createLogContext(config, groupRebalanceConfig);
            this.backgroundEventQueue = backgroundEventQueue;
            this.log = logContext.logger(this.getClass());
            this.log.debug("Initializing the Kafka consumer");
            this.defaultApiTimeoutMs = Duration.ofMillis(config.getInt("default.api.timeout.ms").intValue());
            this.time = time;
            List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(this.clientId, (AbstractConfig)config);
            this.clientTelemetryReporter = CommonClientConfigs.telemetryReporter(this.clientId, config);
            this.clientTelemetryReporter.ifPresent(reporters::add);
            this.metrics = ConsumerUtils.createMetrics(config, time, reporters);
            this.kafkaConsumerMetrics = new AsyncConsumerMetrics(this.metrics);
            this.retryBackoffMs = config.getLong("retry.backoff.ms");
            this.requestTimeoutMs = config.getInt("request.timeout.ms");
            List interceptorList = ConsumerUtils.configuredConsumerInterceptors(config);
            this.interceptors = new ConsumerInterceptors(interceptorList);
            this.deserializers = new Deserializers<K, V>(config, keyDeserializer, valueDeserializer);
            this.subscriptions = ConsumerUtils.createSubscriptionState(config, logContext);
            ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(this.metrics.reporters(), interceptorList, Arrays.asList(this.deserializers.keyDeserializer, this.deserializers.valueDeserializer));
            this.metadata = metadataFactory.build(config, this.subscriptions, logContext, clusterResourceListeners);
            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
            this.metadata.bootstrap(addresses);
            FetchMetricsManager fetchMetricsManager = ConsumerUtils.createFetchMetricsManager(this.metrics);
            FetchConfig fetchConfig = new FetchConfig(config);
            this.isolationLevel = fetchConfig.isolationLevel;
            ApiVersions apiVersions = new ApiVersions();
            LinkedBlockingQueue<ApplicationEvent> applicationEventQueue = new LinkedBlockingQueue<ApplicationEvent>();
            this.backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue, time, this.kafkaConsumerMetrics);
            this.fetchBuffer = new FetchBuffer(logContext);
            Supplier<NetworkClientDelegate> networkClientDelegateSupplier = NetworkClientDelegate.supplier(time, logContext, this.metadata, config, apiVersions, this.metrics, fetchMetricsManager.throttleTimeSensor(), this.clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null), this.backgroundEventHandler, false, this.kafkaConsumerMetrics);
            this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(this.interceptors);
            this.groupMetadata.set(this.initializeGroupMetadata(config, groupRebalanceConfig));
            Supplier<RequestManagers> requestManagersSupplier = RequestManagers.supplier(time, logContext, this.backgroundEventHandler, this.metadata, this.subscriptions, this.fetchBuffer, config, groupRebalanceConfig, apiVersions, fetchMetricsManager, networkClientDelegateSupplier, this.clientTelemetryReporter, this.metrics, this.offsetCommitCallbackInvoker, this.memberStateListener);
            Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext, this.metadata, this.subscriptions, requestManagersSupplier);
            this.applicationEventHandler = applicationEventHandlerFactory.build(logContext, time, applicationEventQueue, new CompletableEventReaper(logContext), applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, this.kafkaConsumerMetrics);
            this.rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker(logContext, this.subscriptions, time, new RebalanceCallbackMetricsManager(this.metrics));
            this.backgroundEventProcessor = new BackgroundEventProcessor();
            this.backgroundEventReaper = backgroundEventReaperFactory.build(logContext);
            this.fetchCollector = fetchCollectorFactory.build(logContext, this.metadata, this.subscriptions, fetchConfig, this.deserializers, fetchMetricsManager, time);
            if (this.groupMetadata.get().isPresent() && GroupProtocol.of(config.getString("group.protocol")) == GroupProtocol.CONSUMER) {
                config.ignore("group.remote.assignor");
            }
            config.logUnused();
            AppInfoParser.registerAppInfo("kafka.consumer", this.clientId, this.metrics, time.milliseconds());
            this.log.debug("Kafka consumer initialized");
        }
        catch (Throwable t) {
            if (this.log != null) {
                this.close(Duration.ZERO, true);
            }
            throw new KafkaException("Failed to construct kafka consumer", t);
        }
    }

    AsyncKafkaConsumer(LogContext logContext, String clientId, Deserializers<K, V> deserializers, FetchBuffer fetchBuffer, FetchCollector<K, V> fetchCollector, ConsumerInterceptors<K, V> interceptors, Time time, ApplicationEventHandler applicationEventHandler, BlockingQueue<BackgroundEvent> backgroundEventQueue, CompletableEventReaper backgroundEventReaper, ConsumerRebalanceListenerInvoker rebalanceListenerInvoker, Metrics metrics, SubscriptionState subscriptions, ConsumerMetadata metadata, long retryBackoffMs, int requestTimeoutMs, int defaultApiTimeoutMs, String groupId, boolean autoCommitEnabled) {
        this.log = logContext.logger(this.getClass());
        this.subscriptions = subscriptions;
        this.clientId = clientId;
        this.fetchBuffer = fetchBuffer;
        this.fetchCollector = fetchCollector;
        this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
        this.interceptors = Objects.requireNonNull(interceptors);
        this.time = time;
        this.backgroundEventQueue = backgroundEventQueue;
        this.rebalanceListenerInvoker = rebalanceListenerInvoker;
        this.backgroundEventProcessor = new BackgroundEventProcessor();
        this.backgroundEventReaper = backgroundEventReaper;
        this.metrics = metrics;
        this.groupMetadata.set(this.initializeGroupMetadata(groupId, Optional.empty()));
        this.metadata = metadata;
        this.retryBackoffMs = retryBackoffMs;
        this.requestTimeoutMs = requestTimeoutMs;
        this.defaultApiTimeoutMs = Duration.ofMillis(defaultApiTimeoutMs);
        this.deserializers = deserializers;
        this.applicationEventHandler = applicationEventHandler;
        this.kafkaConsumerMetrics = new AsyncConsumerMetrics(metrics);
        this.clientTelemetryReporter = Optional.empty();
        this.autoCommitEnabled = autoCommitEnabled;
        this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors);
        this.backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue, time, this.kafkaConsumerMetrics);
    }

    AsyncKafkaConsumer(LogContext logContext, Time time, ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, KafkaClient client, SubscriptionState subscriptions, ConsumerMetadata metadata) {
        this.log = logContext.logger(this.getClass());
        this.subscriptions = subscriptions;
        this.clientId = config.getString("client.id");
        this.autoCommitEnabled = config.getBoolean("enable.auto.commit");
        this.fetchBuffer = new FetchBuffer(logContext);
        this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
        this.interceptors = new ConsumerInterceptors(Collections.emptyList());
        this.time = time;
        this.metrics = new Metrics(time);
        this.metadata = metadata;
        this.retryBackoffMs = config.getLong("retry.backoff.ms");
        this.requestTimeoutMs = config.getInt("request.timeout.ms");
        this.defaultApiTimeoutMs = Duration.ofMillis(config.getInt("default.api.timeout.ms").intValue());
        this.deserializers = new Deserializers<K, V>(keyDeserializer, valueDeserializer);
        this.clientTelemetryReporter = Optional.empty();
        ConsumerMetrics metricsRegistry = new ConsumerMetrics("consumer");
        FetchMetricsManager fetchMetricsManager = new FetchMetricsManager(this.metrics, metricsRegistry.fetcherMetrics);
        this.fetchCollector = new FetchCollector<K, V>(logContext, metadata, subscriptions, new FetchConfig(config), this.deserializers, fetchMetricsManager, time);
        this.kafkaConsumerMetrics = new AsyncConsumerMetrics(this.metrics);
        GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config, GroupRebalanceConfig.ProtocolType.CONSUMER);
        this.groupMetadata.set(this.initializeGroupMetadata(config, groupRebalanceConfig));
        LinkedBlockingQueue<ApplicationEvent> applicationEventQueue = new LinkedBlockingQueue<ApplicationEvent>();
        this.backgroundEventQueue = new LinkedBlockingQueue<BackgroundEvent>();
        this.backgroundEventHandler = new BackgroundEventHandler(this.backgroundEventQueue, time, this.kafkaConsumerMetrics);
        this.rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker(logContext, subscriptions, time, new RebalanceCallbackMetricsManager(this.metrics));
        ApiVersions apiVersions = new ApiVersions();
        Supplier<NetworkClientDelegate> networkClientDelegateSupplier = () -> new NetworkClientDelegate(time, config, logContext, client, metadata, this.backgroundEventHandler, false, this.kafkaConsumerMetrics);
        this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(this.interceptors);
        Supplier<RequestManagers> requestManagersSupplier = RequestManagers.supplier(time, logContext, this.backgroundEventHandler, metadata, subscriptions, this.fetchBuffer, config, groupRebalanceConfig, apiVersions, fetchMetricsManager, networkClientDelegateSupplier, this.clientTelemetryReporter, this.metrics, this.offsetCommitCallbackInvoker, this.memberStateListener);
        Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext, metadata, subscriptions, requestManagersSupplier);
        this.applicationEventHandler = new ApplicationEventHandler(logContext, time, applicationEventQueue, new CompletableEventReaper(logContext), applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, this.kafkaConsumerMetrics);
        this.backgroundEventProcessor = new BackgroundEventProcessor();
        this.backgroundEventReaper = new CompletableEventReaper(logContext);
    }

    private Optional<ConsumerGroupMetadata> initializeGroupMetadata(ConsumerConfig config, GroupRebalanceConfig groupRebalanceConfig) {
        Optional<ConsumerGroupMetadata> groupMetadata = this.initializeGroupMetadata(groupRebalanceConfig.groupId, groupRebalanceConfig.groupInstanceId);
        if (groupMetadata.isEmpty()) {
            config.ignore("auto.commit.interval.ms");
            config.ignore("internal.throw.on.fetch.stable.offset.unsupported");
        }
        return groupMetadata;
    }

    private Optional<ConsumerGroupMetadata> initializeGroupMetadata(String groupId, Optional<String> groupInstanceId) {
        if (groupId != null) {
            if (groupId.isEmpty()) {
                throw new InvalidGroupIdException("The configured group.id should not be an empty string or whitespace.");
            }
            return Optional.of(this.initializeConsumerGroupMetadata(groupId, groupInstanceId));
        }
        return Optional.empty();
    }

    private ConsumerGroupMetadata initializeConsumerGroupMetadata(String groupId, Optional<String> groupInstanceId) {
        return new ConsumerGroupMetadata(groupId, -1, "", groupInstanceId);
    }

    private void updateGroupMetadata(Optional<Integer> memberEpoch, String memberId) {
        memberEpoch.ifPresent(epoch -> this.groupMetadata.updateAndGet(oldGroupMetadataOptional -> oldGroupMetadataOptional.map(oldGroupMetadata -> new ConsumerGroupMetadata(oldGroupMetadata.groupId(), memberEpoch.orElse(oldGroupMetadata.generationId()), memberId, oldGroupMetadata.groupInstanceId()))));
    }

    void setGroupAssignmentSnapshot(Set<TopicPartition> partitions) {
        this.groupAssignmentSnapshot.set(Collections.unmodifiableSet(partitions));
    }

    @Override
    public void registerMetricForSubscription(KafkaMetric metric) {
        if (!this.metrics().containsKey(metric.metricName())) {
            this.clientTelemetryReporter.ifPresent(reporter -> reporter.metricChange(metric));
        } else {
            this.log.debug("Skipping registration for metric {}. Existing consumer metrics cannot be overwritten.", (Object)metric.metricName());
        }
    }

    @Override
    public void unregisterMetricFromSubscription(KafkaMetric metric) {
        if (!this.metrics().containsKey(metric.metricName())) {
            this.clientTelemetryReporter.ifPresent(reporter -> reporter.metricRemoval(metric));
        } else {
            this.log.debug("Skipping unregistration for metric {}. Existing consumer metrics cannot be removed.", (Object)metric.metricName());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ConsumerRecords<K, V> poll(Duration timeout) {
        Timer timer = this.time.timer(timeout);
        this.acquireAndEnsureOpen();
        try {
            this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());
            if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
                throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
            }
            do {
                PollEvent event = new PollEvent(timer.currentTimeMs());
                this.applicationEventHandler.add(event);
                ConsumerUtils.getResult(event.reconcileAndAutoCommit(), this.defaultApiTimeoutMs.toMillis());
                this.wakeupTrigger.maybeTriggerWakeup();
                this.updateAssignmentMetadataIfNeeded(timer);
                Fetch<K, V> fetch = this.pollForFetches(timer);
                if (fetch.isEmpty()) continue;
                this.sendPrefetches(timer);
                if (fetch.records().isEmpty()) {
                    this.log.trace("Returning empty records from `poll()` since the consumer's position has advanced for at least one topic partition");
                }
                ConsumerRecords<K, V> consumerRecords = this.interceptors.onConsume(new ConsumerRecords<K, V>(fetch.records(), fetch.nextOffsets()));
                return consumerRecords;
            } while (timer.notExpired());
            ConsumerRecords consumerRecords = ConsumerRecords.empty();
            return consumerRecords;
        }
        finally {
            this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
            this.release();
        }
    }

    @Override
    public void commitSync() {
        this.commitSync(this.defaultApiTimeoutMs);
    }

    @Override
    public void commitAsync() {
        this.commitAsync(null);
    }

    @Override
    public void commitAsync(OffsetCommitCallback callback) {
        this.commitAsync(Optional.empty(), callback);
    }

    @Override
    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
        this.commitAsync(Optional.of(offsets), callback);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void commitAsync(Optional<Map<TopicPartition, OffsetAndMetadata>> offsets, OffsetCommitCallback callback) {
        this.acquireAndEnsureOpen();
        try {
            AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(offsets);
            this.lastPendingAsyncCommit = this.commit(asyncCommitEvent).whenComplete((committedOffsets, throwable) -> {
                if (throwable == null) {
                    this.offsetCommitCallbackInvoker.enqueueInterceptorInvocation((Map<TopicPartition, OffsetAndMetadata>)committedOffsets);
                }
                if (callback == null) {
                    if (throwable != null) {
                        this.log.error("Offset commit with offsets {} failed", committedOffsets, throwable);
                    }
                    return;
                }
                this.offsetCommitCallbackInvoker.enqueueUserCallbackInvocation(callback, (Map<TopicPartition, OffsetAndMetadata>)committedOffsets, (Exception)throwable);
            });
        }
        finally {
            this.release();
        }
    }

    private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commit(CommitEvent commitEvent) {
        this.maybeThrowInvalidGroupIdException();
        this.offsetCommitCallbackInvoker.executeCallbacks();
        if (commitEvent.offsets().isPresent() && commitEvent.offsets().get().isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        this.applicationEventHandler.add(commitEvent);
        ConsumerUtils.getResult(commitEvent.offsetsReady(), this.defaultApiTimeoutMs.toMillis());
        return commitEvent.future();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void seek(TopicPartition partition, long offset) {
        if (offset < 0L) {
            throw new IllegalArgumentException("seek offset must not be a negative number");
        }
        this.acquireAndEnsureOpen();
        try {
            this.log.info("Seeking to offset {} for partition {}", (Object)offset, (Object)partition);
            SeekUnvalidatedEvent seekUnvalidatedEventEvent = new SeekUnvalidatedEvent(this.defaultApiTimeoutDeadlineMs(), partition, offset, Optional.empty());
            this.applicationEventHandler.addAndGet(seekUnvalidatedEventEvent);
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata) {
        long offset = offsetAndMetadata.offset();
        if (offset < 0L) {
            throw new IllegalArgumentException("seek offset must not be a negative number");
        }
        this.acquireAndEnsureOpen();
        try {
            if (offsetAndMetadata.leaderEpoch().isPresent()) {
                this.log.info("Seeking to offset {} for partition {} with epoch {}", new Object[]{offset, partition, offsetAndMetadata.leaderEpoch().get()});
            } else {
                this.log.info("Seeking to offset {} for partition {}", (Object)offset, (Object)partition);
            }
            this.applicationEventHandler.addAndGet(new SeekUnvalidatedEvent(this.defaultApiTimeoutDeadlineMs(), partition, offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch()));
        }
        finally {
            this.release();
        }
    }

    @Override
    public void seekToBeginning(Collection<TopicPartition> partitions) {
        this.seek(partitions, AutoOffsetResetStrategy.EARLIEST);
    }

    @Override
    public void seekToEnd(Collection<TopicPartition> partitions) {
        this.seek(partitions, AutoOffsetResetStrategy.LATEST);
    }

    private void seek(Collection<TopicPartition> partitions, AutoOffsetResetStrategy offsetResetStrategy) {
        if (partitions == null) {
            throw new IllegalArgumentException("Partitions collection cannot be null");
        }
        this.acquireAndEnsureOpen();
        try {
            this.applicationEventHandler.addAndGet(new ResetOffsetEvent(partitions, offsetResetStrategy, this.defaultApiTimeoutDeadlineMs()));
        }
        finally {
            this.release();
        }
    }

    @Override
    public long position(TopicPartition partition) {
        return this.position(partition, this.defaultApiTimeoutMs);
    }

    @Override
    public long position(TopicPartition partition, Duration timeout) {
        this.acquireAndEnsureOpen();
        try {
            if (!this.subscriptions.isAssigned(partition)) {
                throw new IllegalStateException("You can only check the position for partitions assigned to this consumer.");
            }
            Timer timer = this.time.timer(timeout);
            do {
                SubscriptionState.FetchPosition position;
                if ((position = this.subscriptions.validPosition(partition)) != null) {
                    long l = position.offset;
                    return l;
                }
                this.updateFetchPositions(timer);
                timer.update();
                this.wakeupTrigger.maybeTriggerWakeup();
            } while (timer.notExpired());
            throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the position for partition " + String.valueOf(partition) + " could be determined");
        }
        finally {
            this.release();
        }
    }

    @Override
    public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions) {
        return this.committed(partitions, this.defaultApiTimeoutMs);
    }

    /*
     * Loose catch block
     */
    @Override
    public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions, Duration timeout) {
        this.acquireAndEnsureOpen();
        long start = this.time.nanoseconds();
        try {
            this.maybeThrowInvalidGroupIdException();
            if (partitions.isEmpty()) {
                Map<TopicPartition, OffsetAndMetadata> map = Collections.emptyMap();
                return map;
            }
            FetchCommittedOffsetsEvent event = new FetchCommittedOffsetsEvent(partitions, CompletableEvent.calculateDeadlineMs(this.time, timeout));
            this.wakeupTrigger.setActiveTask(event.future());
            try {
                Map<TopicPartition, OffsetAndMetadata> map = this.applicationEventHandler.addAndGet(event);
                return map;
            }
            catch (TimeoutException e) {
                throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the last committed offset for partitions " + String.valueOf(partitions) + " could be determined. Try tuning default.api.timeout.ms larger to relax the threshold.");
            }
            finally {
                this.wakeupTrigger.clearTask();
            }
            {
                catch (Throwable throwable) {
                    throw throwable;
                }
            }
        }
        finally {
            this.kafkaConsumerMetrics.recordCommitted(this.time.nanoseconds() - start);
            this.release();
        }
    }

    private void maybeThrowInvalidGroupIdException() {
        if (this.groupMetadata.get().isEmpty()) {
            throw new InvalidGroupIdException("To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.");
        }
    }

    @Override
    public Map<MetricName, ? extends Metric> metrics() {
        return Collections.unmodifiableMap(this.metrics.metrics());
    }

    @Override
    public List<PartitionInfo> partitionsFor(String topic) {
        return this.partitionsFor(topic, this.defaultApiTimeoutMs);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
        this.acquireAndEnsureOpen();
        try {
            Cluster cluster = this.metadata.fetch();
            List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
            if (!parts.isEmpty()) {
                List<PartitionInfo> list = parts;
                return list;
            }
            if (timeout.toMillis() == 0L) {
                throw new TimeoutException();
            }
            TopicMetadataEvent topicMetadataEvent = new TopicMetadataEvent(topic, CompletableEvent.calculateDeadlineMs(this.time, timeout));
            this.wakeupTrigger.setActiveTask(topicMetadataEvent.future());
            try {
                Map<String, List<PartitionInfo>> topicMetadata = this.applicationEventHandler.addAndGet(topicMetadataEvent);
                List<PartitionInfo> list = topicMetadata.getOrDefault(topic, Collections.emptyList());
                this.wakeupTrigger.clearTask();
                return list;
            }
            catch (Throwable throwable) {
                this.wakeupTrigger.clearTask();
                throw throwable;
            }
        }
        finally {
            this.release();
        }
    }

    @Override
    public Map<String, List<PartitionInfo>> listTopics() {
        return this.listTopics(this.defaultApiTimeoutMs);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Map<String, List<PartitionInfo>> listTopics(Duration timeout) {
        this.acquireAndEnsureOpen();
        try {
            if (timeout.toMillis() == 0L) {
                throw new TimeoutException();
            }
            AllTopicsMetadataEvent topicMetadataEvent = new AllTopicsMetadataEvent(CompletableEvent.calculateDeadlineMs(this.time, timeout));
            this.wakeupTrigger.setActiveTask(topicMetadataEvent.future());
            try {
                Map<String, List<PartitionInfo>> map = this.applicationEventHandler.addAndGet(topicMetadataEvent);
                this.wakeupTrigger.clearTask();
                return map;
            }
            catch (Throwable throwable) {
                this.wakeupTrigger.clearTask();
                throw throwable;
            }
        }
        finally {
            this.release();
        }
    }

    @Override
    public Set<TopicPartition> paused() {
        this.acquireAndEnsureOpen();
        try {
            Set<TopicPartition> set = Collections.unmodifiableSet(this.subscriptions.pausedPartitions());
            return set;
        }
        finally {
            this.release();
        }
    }

    @Override
    public void pause(Collection<TopicPartition> partitions) {
        this.acquireAndEnsureOpen();
        try {
            Objects.requireNonNull(partitions, "The partitions to pause must be nonnull");
            if (!partitions.isEmpty()) {
                this.applicationEventHandler.addAndGet(new PausePartitionsEvent(partitions, this.defaultApiTimeoutDeadlineMs()));
            }
        }
        finally {
            this.release();
        }
    }

    @Override
    public void resume(Collection<TopicPartition> partitions) {
        this.acquireAndEnsureOpen();
        try {
            Objects.requireNonNull(partitions, "The partitions to resume must be nonnull");
            if (!partitions.isEmpty()) {
                this.applicationEventHandler.addAndGet(new ResumePartitionsEvent(partitions, this.defaultApiTimeoutDeadlineMs()));
            }
        }
        finally {
            this.release();
        }
    }

    @Override
    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
        return this.offsetsForTimes(timestampsToSearch, this.defaultApiTimeoutMs);
    }

    @Override
    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout) {
        this.acquireAndEnsureOpen();
        try {
            Objects.requireNonNull(timestampsToSearch, "Timestamps to search cannot be null");
            for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
                if (entry.getValue() >= 0L) continue;
                throw new IllegalArgumentException("The target time for partition " + String.valueOf(entry.getKey()) + " is " + String.valueOf(entry.getValue()) + ". The target time cannot be negative.");
            }
            if (timestampsToSearch.isEmpty()) {
                Map map = Collections.emptyMap();
                return map;
            }
            ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(timestampsToSearch, CompletableEvent.calculateDeadlineMs(this.time, timeout), true);
            if (timeout.toMillis() == 0L) {
                Map.Entry<TopicPartition, Long> entry;
                this.applicationEventHandler.add(listOffsetsEvent);
                entry = listOffsetsEvent.emptyResults();
                return entry;
            }
            Map<TopicPartition, OffsetAndTimestampInternal> offsets = this.applicationEventHandler.addAndGet(listOffsetsEvent);
            HashMap<TopicPartition, OffsetAndTimestamp> results = new HashMap<TopicPartition, OffsetAndTimestamp>(offsets.size());
            offsets.forEach((k, v) -> results.put((TopicPartition)k, v != null ? v.buildOffsetAndTimestamp() : null));
            HashMap<TopicPartition, OffsetAndTimestamp> hashMap = results;
            return hashMap;
        }
        finally {
            this.release();
        }
    }

    @Override
    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
        return this.beginningOffsets(partitions, this.defaultApiTimeoutMs);
    }

    @Override
    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout) {
        return this.beginningOrEndOffset(partitions, -2L, timeout);
    }

    @Override
    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
        return this.endOffsets(partitions, this.defaultApiTimeoutMs);
    }

    @Override
    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout) {
        return this.beginningOrEndOffset(partitions, -1L, timeout);
    }

    private Map<TopicPartition, Long> beginningOrEndOffset(Collection<TopicPartition> partitions, long timestamp, Duration timeout) {
        this.acquireAndEnsureOpen();
        try {
            Objects.requireNonNull(partitions, "Partitions cannot be null");
            if (partitions.isEmpty()) {
                Map<TopicPartition, Long> map = Collections.emptyMap();
                return map;
            }
            Map<TopicPartition, Long> timestampToSearch = partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> timestamp));
            ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(timestampToSearch, CompletableEvent.calculateDeadlineMs(this.time, timeout), false);
            if (timeout.isZero()) {
                this.applicationEventHandler.add(listOffsetsEvent);
                Map<TopicPartition, Long> map = listOffsetsEvent.emptyResults();
                return map;
            }
            Map<TopicPartition, OffsetAndTimestampInternal> offsetAndTimestampMap = this.applicationEventHandler.addAndGet(listOffsetsEvent);
            Map<TopicPartition, Long> map = offsetAndTimestampMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> ((OffsetAndTimestampInternal)entry.getValue()).offset()));
            return map;
        }
        finally {
            this.release();
        }
    }

    @Override
    public OptionalLong currentLag(TopicPartition topicPartition) {
        this.acquireAndEnsureOpen();
        try {
            OptionalLong optionalLong = this.applicationEventHandler.addAndGet(new CurrentLagEvent(topicPartition, this.isolationLevel, this.defaultApiTimeoutDeadlineMs()));
            return optionalLong;
        }
        finally {
            this.release();
        }
    }

    @Override
    public ConsumerGroupMetadata groupMetadata() {
        this.acquireAndEnsureOpen();
        try {
            this.maybeThrowInvalidGroupIdException();
            ConsumerGroupMetadata consumerGroupMetadata = this.groupMetadata.get().get();
            return consumerGroupMetadata;
        }
        finally {
            this.release();
        }
    }

    @Override
    public void enforceRebalance() {
        this.log.warn("Operation not supported in new consumer group protocol");
    }

    @Override
    public void enforceRebalance(String reason) {
        this.log.warn("Operation not supported in new consumer group protocol");
    }

    @Override
    public void close() {
        this.close(Duration.ofMillis(30000L));
    }

    @Override
    public void close(Duration timeout) {
        if (timeout.toMillis() < 0L) {
            throw new IllegalArgumentException("The timeout cannot be negative.");
        }
        this.acquire();
        try {
            if (!this.closed) {
                this.close(timeout, false);
            }
        }
        finally {
            this.closed = true;
            this.release();
        }
    }

    private void close(Duration timeout, boolean swallowException) {
        this.log.trace("Closing the Kafka consumer");
        AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();
        this.wakeupTrigger.disableWakeups();
        Timer closeTimer = this.createTimerForCloseRequests(timeout);
        this.clientTelemetryReporter.ifPresent(ClientTelemetryReporter::initiateClose);
        closeTimer.update();
        Utils.swallow(this.log, Level.ERROR, "Failed to auto-commit offsets", () -> this.autoCommitOnClose(closeTimer), firstException);
        Utils.swallow(this.log, Level.ERROR, "Failed to stop finding coordinator", this::stopFindCoordinatorOnClose, firstException);
        Utils.swallow(this.log, Level.ERROR, "Failed to release group assignment", this::runRebalanceCallbacksOnClose, firstException);
        Utils.swallow(this.log, Level.ERROR, "Failed to leave group while closing consumer", () -> this.leaveGroupOnClose(closeTimer), firstException);
        Utils.swallow(this.log, Level.ERROR, "Failed invoking asynchronous commit callbacks while closing consumer", () -> this.awaitPendingAsyncCommitsAndExecuteCommitCallbacks(closeTimer, false), firstException);
        if (this.applicationEventHandler != null) {
            Utils.closeQuietly(() -> this.applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException);
        }
        closeTimer.update();
        if (this.backgroundEventReaper != null && this.backgroundEventQueue != null) {
            this.backgroundEventReaper.reap(this.backgroundEventQueue);
        }
        Utils.closeQuietly(this.interceptors, "consumer interceptors", firstException);
        Utils.closeQuietly((AutoCloseable)this.kafkaConsumerMetrics, "kafka consumer metrics", firstException);
        Utils.closeQuietly((AutoCloseable)this.metrics, "consumer metrics", firstException);
        Utils.closeQuietly(this.deserializers, "consumer deserializers", firstException);
        this.clientTelemetryReporter.ifPresent(reporter -> Utils.closeQuietly((AutoCloseable)reporter, "async consumer telemetry reporter", firstException));
        AppInfoParser.unregisterAppInfo("kafka.consumer", this.clientId, this.metrics);
        this.log.debug("Kafka consumer has been closed");
        Throwable exception = firstException.get();
        if (exception != null && !swallowException) {
            if (exception instanceof InterruptException) {
                throw (InterruptException)exception;
            }
            throw new KafkaException("Failed to close kafka consumer", exception);
        }
    }

    private Timer createTimerForCloseRequests(Duration timeout) {
        Time time = this.time == null ? Time.SYSTEM : this.time;
        return time.timer(Math.min(timeout.toMillis(), (long)this.requestTimeoutMs));
    }

    private void autoCommitOnClose(Timer timer) {
        if (this.groupMetadata.get().isEmpty()) {
            return;
        }
        if (this.autoCommitEnabled) {
            this.commitSyncAllConsumed(timer);
        }
        this.applicationEventHandler.add(new CommitOnCloseEvent());
    }

    private void runRebalanceCallbacksOnClose() {
        if (this.groupMetadata.get().isEmpty()) {
            return;
        }
        int memberEpoch = this.groupMetadata.get().get().generationId();
        Set<TopicPartition> assignedPartitions = this.groupAssignmentSnapshot.get();
        if (assignedPartitions.isEmpty()) {
            return;
        }
        TreeSet<TopicPartition> droppedPartitions = new TreeSet<TopicPartition>(AbstractMembershipManager.TOPIC_PARTITION_COMPARATOR);
        droppedPartitions.addAll(assignedPartitions);
        Exception error = memberEpoch > 0 ? this.rebalanceListenerInvoker.invokePartitionsRevoked(droppedPartitions) : this.rebalanceListenerInvoker.invokePartitionsLost(droppedPartitions);
        if (error != null) {
            throw ConsumerUtils.maybeWrapAsKafkaException(error);
        }
    }

    private void leaveGroupOnClose(Timer timer) {
        if (this.groupMetadata.get().isEmpty()) {
            return;
        }
        this.log.debug("Leaving the consumer group during consumer close");
        try {
            this.applicationEventHandler.addAndGet(new LeaveGroupOnCloseEvent(CompletableEvent.calculateDeadlineMs(timer)));
            this.log.info("Completed leaving the group");
        }
        catch (TimeoutException e) {
            this.log.warn("Consumer attempted to leave the group but couldn't complete it within {} ms. It will proceed to close.", (Object)timer.timeoutMs());
        }
        finally {
            timer.update();
        }
    }

    private void stopFindCoordinatorOnClose() {
        if (this.groupMetadata.get().isEmpty()) {
            return;
        }
        this.log.debug("Stop finding coordinator during consumer close");
        this.applicationEventHandler.add(new StopFindCoordinatorOnCloseEvent());
    }

    void commitSyncAllConsumed(Timer timer) {
        this.log.debug("Sending synchronous auto-commit on closing");
        try {
            this.commitSync(Duration.ofMillis(timer.remainingMs()));
        }
        catch (Exception e) {
            this.log.warn("Synchronous auto-commit failed", (Throwable)e);
        }
        timer.update();
    }

    @Override
    public void wakeup() {
        this.wakeupTrigger.wakeup();
    }

    @Override
    public void commitSync(Duration timeout) {
        this.commitSync(Optional.empty(), timeout);
    }

    @Override
    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
        this.commitSync(Optional.of(offsets), this.defaultApiTimeoutMs);
    }

    @Override
    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout) {
        this.commitSync(Optional.of(offsets), timeout);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void commitSync(Optional<Map<TopicPartition, OffsetAndMetadata>> offsets, Duration timeout) {
        this.acquireAndEnsureOpen();
        long commitStart = this.time.nanoseconds();
        try {
            SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, CompletableEvent.calculateDeadlineMs(this.time, timeout));
            CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitFuture = this.commit(syncCommitEvent);
            Timer requestTimer = this.time.timer(timeout.toMillis());
            this.awaitPendingAsyncCommitsAndExecuteCommitCallbacks(requestTimer, true);
            this.wakeupTrigger.setActiveTask(commitFuture);
            Map<TopicPartition, OffsetAndMetadata> committedOffsets = ConsumerUtils.getResult(commitFuture, requestTimer);
            this.interceptors.onCommit(committedOffsets);
        }
        finally {
            this.wakeupTrigger.clearTask();
            this.kafkaConsumerMetrics.recordCommitSync(this.time.nanoseconds() - commitStart);
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer timer, boolean enableWakeup) {
        if (this.lastPendingAsyncCommit == null) {
            return;
        }
        try {
            CompletableFuture futureToAwait = new CompletableFuture();
            this.lastPendingAsyncCommit.whenComplete((v, t) -> futureToAwait.complete(null));
            if (enableWakeup) {
                this.wakeupTrigger.setActiveTask(futureToAwait);
            }
            ConsumerUtils.getResult(futureToAwait, timer);
            this.lastPendingAsyncCommit = null;
        }
        finally {
            if (enableWakeup) {
                this.wakeupTrigger.clearTask();
            }
            timer.update();
        }
        this.offsetCommitCallbackInvoker.executeCallbacks();
    }

    @Override
    public Uuid clientInstanceId(Duration timeout) {
        if (this.clientTelemetryReporter.isEmpty()) {
            throw new IllegalStateException("Telemetry is not enabled. Set config `enable.metrics.push` to `true`.");
        }
        return ClientTelemetryUtils.fetchClientInstanceId(this.clientTelemetryReporter.get(), timeout);
    }

    @Override
    public Set<TopicPartition> assignment() {
        this.acquireAndEnsureOpen();
        try {
            Set<TopicPartition> set = Collections.unmodifiableSet(this.subscriptions.assignedPartitions());
            return set;
        }
        finally {
            this.release();
        }
    }

    @Override
    public Set<String> subscription() {
        this.acquireAndEnsureOpen();
        try {
            Set<String> set = Collections.unmodifiableSet(this.subscriptions.subscription());
            return set;
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void assign(Collection<TopicPartition> partitions) {
        this.acquireAndEnsureOpen();
        try {
            if (partitions == null) {
                throw new IllegalArgumentException("Topic partitions collection to assign to cannot be null");
            }
            if (partitions.isEmpty()) {
                this.unsubscribe();
                return;
            }
            for (TopicPartition tp : partitions) {
                String topic = tp != null ? tp.topic() : null;
                if (!Utils.isBlank(topic)) continue;
                throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
            }
            HashSet<TopicPartition> currentTopicPartitions = new HashSet<TopicPartition>();
            for (TopicPartition tp : this.subscriptions.assignedPartitions()) {
                if (!partitions.contains(tp)) continue;
                currentTopicPartitions.add(tp);
            }
            this.fetchBuffer.retainAll(currentTopicPartitions);
            this.applicationEventHandler.addAndGet(new AssignmentChangeEvent(this.time.milliseconds(), this.defaultApiTimeoutDeadlineMs(), partitions));
        }
        finally {
            this.release();
        }
    }

    @Override
    public void unsubscribe() {
        this.acquireAndEnsureOpen();
        try {
            this.fetchBuffer.retainAll(Collections.emptySet());
            Timer timer = this.time.timer(this.defaultApiTimeoutMs);
            UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent(CompletableEvent.calculateDeadlineMs(timer));
            this.applicationEventHandler.add(unsubscribeEvent);
            this.log.info("Unsubscribing all topics or patterns and assigned partitions {}", this.subscriptions.assignedPartitions());
            try {
                this.processBackgroundEvents(unsubscribeEvent.future(), timer, e -> e instanceof GroupAuthorizationException || e instanceof TopicAuthorizationException);
                this.log.info("Unsubscribed all topics or patterns and assigned partitions");
            }
            catch (TimeoutException e2) {
                this.log.error("Failed while waiting for the unsubscribe event to complete");
            }
            this.resetGroupMetadata();
        }
        catch (Exception e3) {
            this.log.error("Unsubscribe failed", (Throwable)e3);
            throw e3;
        }
        finally {
            this.release();
        }
    }

    private void resetGroupMetadata() {
        this.groupMetadata.updateAndGet(oldGroupMetadataOptional -> oldGroupMetadataOptional.map(oldGroupMetadata -> this.initializeConsumerGroupMetadata(oldGroupMetadata.groupId(), oldGroupMetadata.groupInstanceId())));
    }

    WakeupTrigger wakeupTrigger() {
        return this.wakeupTrigger;
    }

    private Fetch<K, V> pollForFetches(Timer timer) {
        long pollTimeout = this.isCommittedOffsetsManagementEnabled() ? Math.min(this.applicationEventHandler.maximumTimeToWait(), timer.remainingMs()) : timer.remainingMs();
        Fetch<K, V> fetch = this.collectFetch();
        if (!fetch.isEmpty()) {
            return fetch;
        }
        this.sendFetches(timer);
        if (!this.cachedSubscriptionHasAllFetchPositions && pollTimeout > this.retryBackoffMs) {
            pollTimeout = this.retryBackoffMs;
        }
        this.log.trace("Polling for fetches with timeout {}", (Object)pollTimeout);
        Timer pollTimer = this.time.timer(pollTimeout);
        this.wakeupTrigger.setFetchAction(this.fetchBuffer);
        try {
            this.fetchBuffer.awaitNotEmpty(pollTimer);
        }
        catch (InterruptException e) {
            this.log.trace("Interrupt during fetch", (Throwable)e);
            throw e;
        }
        finally {
            timer.update(pollTimer.currentTimeMs());
            this.wakeupTrigger.clearTask();
        }
        return this.collectFetch();
    }

    private Fetch<K, V> collectFetch() {
        Fetch<K, V> fetch = this.fetchCollector.collectFetch(this.fetchBuffer);
        this.applicationEventHandler.wakeupNetworkThread();
        return fetch;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean updateFetchPositions(Timer timer) {
        this.cachedSubscriptionHasAllFetchPositions = false;
        try {
            CheckAndUpdatePositionsEvent checkAndUpdatePositionsEvent = new CheckAndUpdatePositionsEvent(CompletableEvent.calculateDeadlineMs(timer));
            this.wakeupTrigger.setActiveTask(checkAndUpdatePositionsEvent.future());
            this.cachedSubscriptionHasAllFetchPositions = this.applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent);
        }
        catch (TimeoutException e) {
            boolean bl = false;
            return bl;
        }
        finally {
            this.wakeupTrigger.clearTask();
        }
        return true;
    }

    private boolean isCommittedOffsetsManagementEnabled() {
        return this.groupMetadata.get().isPresent();
    }

    private void sendFetches(Timer timer) {
        try {
            this.applicationEventHandler.addAndGet(new CreateFetchRequestsEvent(CompletableEvent.calculateDeadlineMs(timer)));
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
    }

    private void sendPrefetches(Timer timer) {
        try {
            this.applicationEventHandler.add(new CreateFetchRequestsEvent(CompletableEvent.calculateDeadlineMs(timer)));
        }
        catch (Throwable t) {
            this.log.warn("An unexpected error occurred while pre-fetching data in Consumer.poll(), but was suppressed", t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
        this.offsetCommitCallbackInvoker.executeCallbacks();
        if (this.subscriptions.hasPatternSubscription()) {
            try {
                this.applicationEventHandler.addAndGet(new UpdatePatternSubscriptionEvent(CompletableEvent.calculateDeadlineMs(timer)));
            }
            catch (TimeoutException e) {
                boolean bl = false;
                return bl;
            }
            finally {
                timer.update();
            }
        }
        this.processBackgroundEvents();
        return this.updateFetchPositions(timer);
    }

    @Override
    public void subscribe(Collection<String> topics) {
        this.subscribeInternal(topics, Optional.empty());
    }

    @Override
    public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
        if (listener == null) {
            throw new IllegalArgumentException("RebalanceListener cannot be null");
        }
        this.subscribeInternal(topics, Optional.of(listener));
    }

    @Override
    public void subscribe(Pattern pattern) {
        this.subscribeInternal(pattern, Optional.empty());
    }

    @Override
    public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener listener) {
        if (listener == null) {
            throw new IllegalArgumentException("RebalanceListener cannot be null");
        }
        this.subscribeToRegex(pattern, Optional.of(listener));
    }

    @Override
    public void subscribe(SubscriptionPattern pattern) {
        this.subscribeToRegex(pattern, Optional.empty());
    }

    @Override
    public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
        if (listener == null) {
            throw new IllegalArgumentException("RebalanceListener cannot be null");
        }
        this.subscribeInternal(pattern, Optional.of(listener));
    }

    private void acquireAndEnsureOpen() {
        this.acquire();
        if (this.closed) {
            this.release();
            throw new IllegalStateException("This consumer has already been closed.");
        }
    }

    private void acquire() {
        Thread thread = Thread.currentThread();
        long threadId = thread.getId();
        if (threadId != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, threadId)) {
            throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access. currentThread(name: " + thread.getName() + ", id: " + threadId + ") otherThread(id: " + this.currentThread.get() + ")");
        }
        this.refCount.incrementAndGet();
    }

    private void release() {
        if (this.refCount.decrementAndGet() == 0) {
            this.currentThread.set(-1L);
        }
    }

    private void subscribeInternal(Pattern pattern, Optional<ConsumerRebalanceListener> listener) {
        this.acquireAndEnsureOpen();
        try {
            this.maybeThrowInvalidGroupIdException();
            if (pattern == null || pattern.toString().isEmpty()) {
                throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? "null" : "empty"));
            }
            this.log.info("Subscribed to pattern: '{}'", (Object)pattern);
            this.applicationEventHandler.addAndGet(new TopicPatternSubscriptionChangeEvent(pattern, listener, this.defaultApiTimeoutDeadlineMs()));
        }
        finally {
            this.release();
        }
    }

    private void subscribeToRegex(SubscriptionPattern pattern, Optional<ConsumerRebalanceListener> listener) {
        this.acquireAndEnsureOpen();
        try {
            this.maybeThrowInvalidGroupIdException();
            this.throwIfSubscriptionPatternIsInvalid(pattern);
            this.log.info("Subscribing to regular expression {}", (Object)pattern);
            this.applicationEventHandler.addAndGet(new TopicRe2JPatternSubscriptionChangeEvent(pattern, listener, CompletableEvent.calculateDeadlineMs(this.time.timer(this.defaultApiTimeoutMs))));
        }
        finally {
            this.release();
        }
    }

    private void throwIfSubscriptionPatternIsInvalid(SubscriptionPattern subscriptionPattern) {
        if (subscriptionPattern == null) {
            throw new IllegalArgumentException("Topic pattern to subscribe to cannot be null");
        }
        if (subscriptionPattern.pattern().isEmpty()) {
            throw new IllegalArgumentException("Topic pattern to subscribe to cannot be empty");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebalanceListener> listener) {
        this.acquireAndEnsureOpen();
        try {
            this.maybeThrowInvalidGroupIdException();
            if (topics == null) {
                throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
            }
            if (topics.isEmpty()) {
                this.unsubscribe();
            } else {
                for (String topic : topics) {
                    if (!Utils.isBlank(topic)) continue;
                    throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
                }
                HashSet<TopicPartition> currentTopicPartitions = new HashSet<TopicPartition>();
                for (TopicPartition tp : this.subscriptions.assignedPartitions()) {
                    if (!topics.contains(tp.topic())) continue;
                    currentTopicPartitions.add(tp);
                }
                this.fetchBuffer.retainAll(currentTopicPartitions);
                this.log.info("Subscribed to topic(s): {}", (Object)String.join((CharSequence)", ", topics));
                this.applicationEventHandler.addAndGet(new TopicSubscriptionChangeEvent(new HashSet<String>(topics), listener, this.defaultApiTimeoutDeadlineMs()));
            }
        }
        finally {
            this.release();
        }
    }

    boolean processBackgroundEvents() {
        AtomicReference<KafkaException> firstError = new AtomicReference<KafkaException>();
        List<BackgroundEvent> events = this.backgroundEventHandler.drainEvents();
        if (!events.isEmpty()) {
            long startMs = this.time.milliseconds();
            for (BackgroundEvent event : events) {
                this.kafkaConsumerMetrics.recordBackgroundEventQueueTime(this.time.milliseconds() - event.enqueuedMs());
                try {
                    if (event instanceof CompletableEvent) {
                        this.backgroundEventReaper.add((CompletableEvent)((Object)event));
                    }
                    this.backgroundEventProcessor.process(event);
                }
                catch (Throwable t) {
                    KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t);
                    if (firstError.compareAndSet(null, e)) continue;
                    this.log.warn("An error occurred when processing the background event: {}", (Object)e.getMessage(), (Object)e);
                }
            }
            this.kafkaConsumerMetrics.recordBackgroundEventQueueProcessingTime(this.time.milliseconds() - startMs);
        }
        this.backgroundEventReaper.reap(this.time.milliseconds());
        if (firstError.get() != null) {
            throw (KafkaException)firstError.get();
        }
        return !events.isEmpty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    <T> T processBackgroundEvents(Future<T> future, Timer timer, Predicate<Exception> ignoreErrorEventException) {
        do {
            boolean hadEvents;
            block11: {
                hadEvents = false;
                try {
                    hadEvents = this.processBackgroundEvents();
                }
                catch (Exception e) {
                    if (ignoreErrorEventException.test(e)) break block11;
                    throw e;
                }
            }
            try {
                if (future.isDone()) {
                    T e = ConsumerUtils.getResult(future);
                    return e;
                }
                if (!hadEvents) {
                    Timer pollInterval = this.time.timer(100L);
                    T t = ConsumerUtils.getResult(future, pollInterval);
                    return t;
                }
            }
            catch (TimeoutException timeoutException) {
            }
            finally {
                timer.update();
            }
        } while (timer.notExpired());
        throw new TimeoutException("Operation timed out before completion");
    }

    static ConsumerRebalanceListenerCallbackCompletedEvent invokeRebalanceCallbacks(ConsumerRebalanceListenerInvoker rebalanceListenerInvoker, ConsumerRebalanceListenerMethodName methodName, SortedSet<TopicPartition> partitions, CompletableFuture<Void> future) {
        Exception e;
        try {
            switch (methodName) {
                case ON_PARTITIONS_REVOKED: {
                    e = rebalanceListenerInvoker.invokePartitionsRevoked(partitions);
                    break;
                }
                case ON_PARTITIONS_ASSIGNED: {
                    e = rebalanceListenerInvoker.invokePartitionsAssigned(partitions);
                    break;
                }
                case ON_PARTITIONS_LOST: {
                    e = rebalanceListenerInvoker.invokePartitionsLost(partitions);
                    break;
                }
                default: {
                    throw new IllegalArgumentException("The method " + methodName.fullyQualifiedMethodName() + " to invoke was not expected");
                }
            }
        }
        catch (InterruptException | WakeupException ex) {
            e = ex;
        }
        Optional<KafkaException> error = e != null ? Optional.of(ConsumerUtils.maybeWrapAsKafkaException(e, "User rebalance callback throws an error")) : Optional.empty();
        return new ConsumerRebalanceListenerCallbackCompletedEvent(methodName, future, error);
    }

    @Override
    public String clientId() {
        return this.clientId;
    }

    @Override
    public Metrics metricsRegistry() {
        return this.metrics;
    }

    @Override
    public AsyncConsumerMetrics kafkaConsumerMetrics() {
        return this.kafkaConsumerMetrics;
    }

    SubscriptionState subscriptions() {
        return this.subscriptions;
    }

    private long defaultApiTimeoutDeadlineMs() {
        return CompletableEvent.calculateDeadlineMs(this.time, this.defaultApiTimeoutMs);
    }

    static interface ApplicationEventHandlerFactory {
        public ApplicationEventHandler build(LogContext var1, Time var2, BlockingQueue<ApplicationEvent> var3, CompletableEventReaper var4, Supplier<ApplicationEventProcessor> var5, Supplier<NetworkClientDelegate> var6, Supplier<RequestManagers> var7, AsyncConsumerMetrics var8);
    }

    static interface CompletableEventReaperFactory {
        public CompletableEventReaper build(LogContext var1);
    }

    static interface FetchCollectorFactory<K, V> {
        public FetchCollector<K, V> build(LogContext var1, ConsumerMetadata var2, SubscriptionState var3, FetchConfig var4, Deserializers<K, V> var5, FetchMetricsManager var6, Time var7);
    }

    static interface ConsumerMetadataFactory {
        public ConsumerMetadata build(ConsumerConfig var1, SubscriptionState var2, LogContext var3, ClusterResourceListeners var4);
    }

    private class BackgroundEventProcessor
    implements EventProcessor<BackgroundEvent> {
        private BackgroundEventProcessor() {
        }

        @Override
        public void process(BackgroundEvent event) {
            switch (event.type()) {
                case ERROR: {
                    this.process((ErrorEvent)event);
                    break;
                }
                case CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED: {
                    this.process((ConsumerRebalanceListenerCallbackNeededEvent)event);
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Background event type " + String.valueOf((Object)event.type()) + " was not expected");
                }
            }
        }

        @Override
        private void process(ErrorEvent event) {
            throw event.error();
        }

        @Override
        private void process(ConsumerRebalanceListenerCallbackNeededEvent event) {
            ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = AsyncKafkaConsumer.invokeRebalanceCallbacks(AsyncKafkaConsumer.this.rebalanceListenerInvoker, event.methodName(), event.partitions(), event.future());
            AsyncKafkaConsumer.this.applicationEventHandler.add(invokedEvent);
            if (invokedEvent.error().isPresent()) {
                throw invokedEvent.error().get();
            }
        }
    }
}

