//go:generate ../../../tools/readme_config_includer/generator
package quantile

import (
	_ "embed"
	"fmt"

	"github.com/influxdata/telegraf"
	"github.com/influxdata/telegraf/plugins/aggregators"
)

//go:embed sample.conf
var sampleConfig string

type Quantile struct {
	Quantiles     []float64       `toml:"quantiles"`
	Compression   float64         `toml:"compression"`
	AlgorithmType string          `toml:"algorithm"`
	Log           telegraf.Logger `toml:"-"`

	newAlgorithm newAlgorithmFunc
	cache        map[uint64]aggregate

	suffixes []string
}

type aggregate struct {
	name   string
	fields map[string]algorithm
	tags   map[string]string
}

type newAlgorithmFunc func(compression float64) (algorithm, error)

func (*Quantile) SampleConfig() string {
	return sampleConfig
}

func (q *Quantile) Init() error {
	switch q.AlgorithmType {
	case "t-digest", "":
		q.newAlgorithm = newTDigest
	case "exact R7":
		q.newAlgorithm = newExactR7
	case "exact R8":
		q.newAlgorithm = newExactR8
	default:
		return fmt.Errorf("unknown algorithm type %q", q.AlgorithmType)
	}
	if _, err := q.newAlgorithm(q.Compression); err != nil {
		return fmt.Errorf("cannot create %q algorithm: %w", q.AlgorithmType, err)
	}

	if len(q.Quantiles) == 0 {
		q.Quantiles = []float64{0.25, 0.5, 0.75}
	}

	duplicates := make(map[float64]bool)
	q.suffixes = make([]string, 0, len(q.Quantiles))
	for _, qtl := range q.Quantiles {
		if qtl < 0.0 || qtl > 1.0 {
			return fmt.Errorf("quantile %v out of range", qtl)
		}
		if _, found := duplicates[qtl]; found {
			return fmt.Errorf("duplicate quantile %v", qtl)
		}
		duplicates[qtl] = true
		q.suffixes = append(q.suffixes, fmt.Sprintf("_%03d", int(qtl*100.0)))
	}

	q.Reset()

	return nil
}

func (q *Quantile) Add(in telegraf.Metric) {
	id := in.HashID()
	if cached, ok := q.cache[id]; ok {
		fields := in.Fields()
		for k, algo := range cached.fields {
			if field, ok := fields[k]; ok {
				if v, isconvertible := convert(field); isconvertible {
					err := algo.Add(v)
					if err != nil {
						q.Log.Errorf("adding cached field %s: %v", k, err)
					}
				}
			}
		}
		return
	}

	// New metric, setup cache and init algorithm
	a := aggregate{
		name:   in.Name(),
		tags:   in.Tags(),
		fields: make(map[string]algorithm),
	}
	for k, field := range in.Fields() {
		if v, isconvertible := convert(field); isconvertible {
			algo, err := q.newAlgorithm(q.Compression)
			if err != nil {
				q.Log.Errorf("generating algorithm %s: %v", k, err)
			}
			err = algo.Add(v)
			if err != nil {
				q.Log.Errorf("adding field %s: %v", k, err)
			}
			a.fields[k] = algo
		}
	}
	q.cache[id] = a
}

func (q *Quantile) Push(acc telegraf.Accumulator) {
	for _, aggregate := range q.cache {
		fields := make(map[string]interface{}, len(aggregate.fields)*len(q.Quantiles))
		for k, algo := range aggregate.fields {
			for i, qtl := range q.Quantiles {
				fields[k+q.suffixes[i]] = algo.Quantile(qtl)
			}
		}
		acc.AddFields(aggregate.name, fields, aggregate.tags)
	}
}

func (q *Quantile) Reset() {
	q.cache = make(map[uint64]aggregate)
}

func convert(in interface{}) (float64, bool) {
	switch v := in.(type) {
	case float64:
		return v, true
	case int64:
		return float64(v), true
	case uint64:
		return float64(v), true
	default:
		return 0, false
	}
}

func init() {
	aggregators.Add("quantile", func() telegraf.Aggregator {
		return &Quantile{Compression: 100}
	})
}
