package utils

import (
	"context"
	"encoding/json"
	"hash/fnv"
	"strings"
	"sync/atomic"

	"github.com/jackc/pgx/v5"
	"github.com/jackc/pgx/v5/tracelog"

	"github.com/influxdata/telegraf"
)

func TagListToJSON(tagList []*telegraf.Tag) []byte {
	tags := make(map[string]string, len(tagList))
	for _, tag := range tagList {
		tags[tag.Key] = tag.Value
	}
	//nolint:errcheck // unable to propagate error
	bs, _ := json.Marshal(tags)
	return bs
}

func FieldListToJSON(fieldList []*telegraf.Field) ([]byte, error) {
	fields := make(map[string]interface{}, len(fieldList))
	for _, field := range fieldList {
		fields[field.Key] = field.Value
	}
	return json.Marshal(fields)
}

// QuoteIdentifier returns a sanitized string safe to use in SQL as an identifier
func QuoteIdentifier(name string) string {
	return pgx.Identifier{name}.Sanitize()
}

// QuoteLiteral returns a sanitized string safe to use in sql as a string literal
func QuoteLiteral(name string) string {
	return "'" + strings.Replace(name, "'", "''", -1) + "'"
}

// FullTableName returns a sanitized table name with its schema (if supplied)
func FullTableName(schema, name string) pgx.Identifier {
	if schema != "" {
		return pgx.Identifier{schema, name}
	}

	return pgx.Identifier{name}
}

// PGXLogger makes telegraf.Logger compatible with pgx.Logger
type PGXLogger struct {
	telegraf.Logger
}

func (l PGXLogger) Log(_ context.Context, level tracelog.LogLevel, msg string, data map[string]interface{}) {
	switch level {
	case tracelog.LogLevelError:
		l.Errorf("PG %s - %+v", msg, data)
	case tracelog.LogLevelWarn:
		l.Warnf("PG %s - %+v", msg, data)
	case tracelog.LogLevelInfo, tracelog.LogLevelNone:
		l.Infof("PG %s - %+v", msg, data)
	case tracelog.LogLevelDebug, tracelog.LogLevelTrace:
		l.Debugf("PG %s - %+v", msg, data)
	default:
		l.Debugf("PG %s - %+v", msg, data)
	}
}

func GetTagID(metric telegraf.Metric) int64 {
	hash := fnv.New64a()
	for _, tag := range metric.TagList() {
		hash.Write([]byte(tag.Key))
		hash.Write([]byte{0})
		hash.Write([]byte(tag.Value))
		hash.Write([]byte{0})
	}
	// Convert to int64 as postgres does not support uint64
	return int64(hash.Sum64())
}

// WaitGroup is similar to sync.WaitGroup, but allows interruptible waiting (e.g. a timeout).
type WaitGroup struct {
	count int32
	done  chan struct{}
}

func NewWaitGroup() *WaitGroup {
	return &WaitGroup{
		done: make(chan struct{}),
	}
}

func (wg *WaitGroup) Add(i int32) {
	select {
	case <-wg.done:
		panic("use of an already-done WaitGroup")
	default:
	}
	atomic.AddInt32(&wg.count, i)
}

func (wg *WaitGroup) Done() {
	i := atomic.AddInt32(&wg.count, -1)
	if i == 0 {
		close(wg.done)
	}
	if i < 0 {
		panic("too many Done() calls")
	}
}

func (wg *WaitGroup) C() <-chan struct{} {
	return wg.done
}
