/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.telemetry.internals;

import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.telemetry.internals.LastValueTracker;
import org.apache.kafka.common.telemetry.internals.MetricKey;
import org.apache.kafka.common.telemetry.internals.MetricNamingStrategy;
import org.apache.kafka.common.telemetry.internals.MetricsCollector;
import org.apache.kafka.common.telemetry.internals.MetricsEmitter;
import org.apache.kafka.common.telemetry.internals.SinglePointMetric;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaMetricsCollector
implements MetricsCollector {
    private static final Logger log = LoggerFactory.getLogger(KafkaMetricsCollector.class);
    private final StateLedger ledger;
    private final Time time;
    private final MetricNamingStrategy<MetricName> metricNamingStrategy;
    private final Set<String> excludeLabels;

    public KafkaMetricsCollector(MetricNamingStrategy<MetricName> metricNamingStrategy, Set<String> excludeLabels) {
        this(metricNamingStrategy, Time.SYSTEM, excludeLabels);
    }

    KafkaMetricsCollector(MetricNamingStrategy<MetricName> metricNamingStrategy, Time time, Set<String> excludeLabels) {
        this.metricNamingStrategy = metricNamingStrategy;
        this.time = time;
        this.ledger = new StateLedger();
        this.excludeLabels = excludeLabels;
    }

    public void init(List<KafkaMetric> metrics) {
        this.ledger.init(metrics);
    }

    public void metricChange(KafkaMetric metric) {
        this.ledger.metricChange(metric);
    }

    public void metricRemoval(KafkaMetric metric) {
        this.ledger.metricRemoval(metric);
    }

    public void metricsReset() {
        this.ledger.metricsStateReset();
    }

    Set<MetricKey> getTrackedMetrics() {
        return this.ledger.metricMap.keySet();
    }

    @Override
    public void collect(MetricsEmitter metricsEmitter) {
        for (Map.Entry entry : this.ledger.getMetrics()) {
            MetricKey metricKey = (MetricKey)entry.getKey();
            KafkaMetric metric = (KafkaMetric)entry.getValue();
            try {
                this.collectMetric(metricsEmitter, metricKey, metric);
            }
            catch (Exception e) {
                log.error("Error processing Kafka metric {}", (Object)metricKey, (Object)e);
            }
        }
    }

    protected void collectMetric(MetricsEmitter metricsEmitter, MetricKey metricKey, KafkaMetric metric) {
        Object metricValue;
        try {
            metricValue = metric.metricValue();
        }
        catch (Exception e) {
            log.warn("Failed to retrieve metric value {}", (Object)metricKey.name(), (Object)e);
            return;
        }
        Instant now = Instant.ofEpochMilli(this.time.milliseconds());
        if (metric.isMeasurable()) {
            Measurable measurable = metric.measurable();
            Double value = (Double)metricValue;
            if (measurable instanceof WindowedCount || measurable instanceof CumulativeSum) {
                this.collectSum(metricKey, value, metricsEmitter, now);
            } else {
                this.collectGauge(metricKey, value, metricsEmitter, now);
            }
        } else if (metricValue instanceof Number) {
            Number value = (Number)metricValue;
            this.collectGauge(metricKey, value, metricsEmitter, now);
        } else {
            log.debug("Skipping non-measurable gauge metric {}", (Object)metricKey.name());
        }
    }

    private void collectSum(MetricKey metricKey, double value, MetricsEmitter metricsEmitter, Instant timestamp) {
        if (!metricsEmitter.shouldEmitMetric(metricKey)) {
            return;
        }
        if (metricsEmitter.shouldEmitDeltaMetrics()) {
            LastValueTracker.InstantAndValue instantAndValue = this.ledger.delta(metricKey, timestamp, value);
            metricsEmitter.emitMetric(SinglePointMetric.deltaSum(metricKey, (Double)instantAndValue.getValue(), true, timestamp, instantAndValue.getIntervalStart(), this.excludeLabels));
        } else {
            metricsEmitter.emitMetric(SinglePointMetric.sum(metricKey, value, true, timestamp, this.ledger.instantAdded(metricKey), this.excludeLabels));
        }
    }

    private void collectGauge(MetricKey metricKey, Number value, MetricsEmitter metricsEmitter, Instant timestamp) {
        if (!metricsEmitter.shouldEmitMetric(metricKey)) {
            return;
        }
        metricsEmitter.emitMetric(SinglePointMetric.gauge(metricKey, value, timestamp, this.excludeLabels));
    }

    private class StateLedger {
        private final Map<MetricKey, KafkaMetric> metricMap = new ConcurrentHashMap<MetricKey, KafkaMetric>();
        private final LastValueTracker<Double> doubleDeltas = new LastValueTracker();
        private final Map<MetricKey, Instant> metricAdded = new ConcurrentHashMap<MetricKey, Instant>();

        private StateLedger() {
        }

        private Instant instantAdded(MetricKey metricKey) {
            return this.metricAdded.computeIfAbsent(metricKey, x -> Instant.ofEpochMilli(KafkaMetricsCollector.this.time.milliseconds()));
        }

        private void init(List<KafkaMetric> metrics) {
            log.info("initializing Kafka metrics collector");
            for (KafkaMetric m : metrics) {
                this.metricMap.put(KafkaMetricsCollector.this.metricNamingStrategy.metricKey(m.metricName()), m);
            }
        }

        private void metricChange(KafkaMetric metric) {
            MetricKey metricKey = KafkaMetricsCollector.this.metricNamingStrategy.metricKey(metric.metricName());
            this.metricMap.put(metricKey, metric);
            if (this.doubleDeltas.contains(metricKey)) {
                log.warn("Registering a new metric {} which already has a last value tracked. Removing metric from delta register.", (Object)metric.metricName(), (Object)new Exception());
                this.doubleDeltas.remove(metricKey);
            }
            this.metricAdded.put(metricKey, Instant.ofEpochMilli(KafkaMetricsCollector.this.time.milliseconds()));
        }

        private void metricRemoval(KafkaMetric metric) {
            log.debug("removing kafka metric : {}", (Object)metric.metricName());
            MetricKey metricKey = KafkaMetricsCollector.this.metricNamingStrategy.metricKey(metric.metricName());
            this.metricMap.remove(metricKey);
            this.doubleDeltas.remove(metricKey);
            this.metricAdded.remove(metricKey);
        }

        private Iterable<? extends Map.Entry<MetricKey, KafkaMetric>> getMetrics() {
            return this.metricMap.entrySet();
        }

        private LastValueTracker.InstantAndValue<Double> delta(MetricKey metricKey, Instant now, Double value) {
            Optional<LastValueTracker.InstantAndValue<Double>> lastValue = this.doubleDeltas.getAndSet(metricKey, now, value);
            return lastValue.map(last -> new LastValueTracker.InstantAndValue<Double>(last.getIntervalStart(), value - (Double)last.getValue())).orElse(new LastValueTracker.InstantAndValue<Double>(this.instantAdded(metricKey), value));
        }

        private void metricsStateReset() {
            this.metricAdded.clear();
            this.doubleDeltas.reset();
        }
    }
}

