//go:generate ../../../tools/readme_config_includer/generator
package graphite

import (
	"crypto/tls"
	_ "embed"
	"errors"
	"fmt"
	"io"
	"math/rand"
	"net"
	"strconv"
	"strings"
	"time"

	"github.com/influxdata/telegraf"
	"github.com/influxdata/telegraf/config"
	common_tls "github.com/influxdata/telegraf/plugins/common/tls"
	"github.com/influxdata/telegraf/plugins/outputs"
	"github.com/influxdata/telegraf/plugins/serializers/graphite"
)

//go:embed sample.conf
var sampleConfig string

var ErrNotConnected = errors.New("could not write to any server in cluster")

type connection struct {
	name      string
	conn      net.Conn
	connected bool
}

type Graphite struct {
	GraphiteTagSupport      bool   `toml:"graphite_tag_support"`
	GraphiteTagSanitizeMode string `toml:"graphite_tag_sanitize_mode"`
	GraphiteSeparator       string `toml:"graphite_separator"`
	GraphiteStrictRegex     string `toml:"graphite_strict_sanitize_regex"`
	// URL is only for backwards compatibility
	Servers   []string        `toml:"servers"`
	LocalAddr string          `toml:"local_address"`
	Prefix    string          `toml:"prefix"`
	Template  string          `toml:"template"`
	Templates []string        `toml:"templates"`
	Timeout   config.Duration `toml:"timeout"`
	Log       telegraf.Logger `toml:"-"`
	common_tls.ClientConfig

	connections []connection
	serializer  *graphite.Serializer
}

func (*Graphite) SampleConfig() string {
	return sampleConfig
}

func (g *Graphite) Init() error {
	s := &graphite.Serializer{
		Prefix:          g.Prefix,
		Template:        g.Template,
		StrictRegex:     g.GraphiteStrictRegex,
		TagSupport:      g.GraphiteTagSupport,
		TagSanitizeMode: g.GraphiteTagSanitizeMode,
		Separator:       g.GraphiteSeparator,
		Templates:       g.Templates,
	}
	if err := s.Init(); err != nil {
		return err
	}
	g.serializer = s

	// Set default values
	if len(g.Servers) == 0 {
		g.Servers = append(g.Servers, "localhost:2003")
	}

	// Fill in the connections from the server
	g.connections = make([]connection, 0, len(g.Servers))
	for _, server := range g.Servers {
		g.connections = append(g.connections, connection{
			name:      server,
			connected: false,
		})
	}

	return nil
}

func (g *Graphite) Connect() error {
	// Set tls config
	tlsConfig, err := g.ClientConfig.TLSConfig()
	if err != nil {
		return err
	}

	// Find all non-connected servers and try to reconnect
	var newConnection bool
	var connectedServers int
	var failedServers []string
	for i, server := range g.connections {
		if server.connected {
			connectedServers++
			continue
		}
		newConnection = true

		// Dialer with timeout
		d := net.Dialer{Timeout: time.Duration(g.Timeout)}
		if g.LocalAddr != "" {
			// Resolve the local address into IP address and the given port if any
			addr, sPort, err := net.SplitHostPort(g.LocalAddr)
			if err != nil {
				if !strings.Contains(err.Error(), "missing port") {
					return fmt.Errorf("invalid local address: %w", err)
				}
				addr = g.LocalAddr
			}
			local, err := net.ResolveIPAddr("ip", addr)
			if err != nil {
				return fmt.Errorf("cannot resolve local address: %w", err)
			}

			var port int
			if sPort != "" {
				p, err := strconv.ParseUint(sPort, 10, 16)
				if err != nil {
					return fmt.Errorf("invalid port: %w", err)
				}
				port = int(p)
			}

			d.LocalAddr = &net.TCPAddr{IP: local.IP, Port: port, Zone: local.Zone}
		}

		// Get secure connection if tls config is set
		var conn net.Conn
		if tlsConfig != nil {
			conn, err = tls.DialWithDialer(&d, "tcp", server.name, tlsConfig)
		} else {
			conn, err = d.Dial("tcp", server.name)
		}

		if err == nil {
			g.connections[i].conn = conn
			g.connections[i].connected = true
			connectedServers++
		} else {
			g.Log.Debugf("Failed to establish connection: %v", err)
			failedServers = append(failedServers, server.name)
		}
	}

	if newConnection {
		g.Log.Debugf("Successful connections: %d of %d", connectedServers, len(g.connections))
	}
	if len(failedServers) > 0 {
		g.Log.Debugf("Failed servers: %d", len(failedServers))
	}

	return nil
}

func (g *Graphite) Close() error {
	// Closing all connections
	for _, c := range g.connections {
		_ = c.conn.Close()
		c.connected = false
	}
	return nil
}

// We need check eof as we can write to nothing without noticing anything is wrong
// the connection stays in a close_wait
// We can detect that by finding an eof
// if not for this, we can happily write and flush without getting errors (in Go) but getting RST tcp packets back (!)
// props to Tv via the authors of carbon-relay-ng` for this trick.
func (g *Graphite) checkEOF(conn net.Conn) error {
	b := make([]byte, 1024)

	if err := conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond)); err != nil {
		g.Log.Debugf(
			"Couldn't set read deadline for connection due to error %v with remote address %s. closing conn explicitly",
			err,
			conn.RemoteAddr().String(),
		)
		err = conn.Close()
		g.Log.Debugf("Failed to close the connection: %v", err)
		return err
	}
	num, err := conn.Read(b)
	if errors.Is(err, io.EOF) {
		g.Log.Debugf("Conn %s is closed. closing conn explicitly", conn.RemoteAddr().String())
		err = conn.Close()
		g.Log.Debugf("Failed to close the connection: %v", err)
		return err
	}
	// just in case i misunderstand something or the remote behaves badly
	if num != 0 {
		g.Log.Infof("conn %s .conn.Read data? did not expect that. data: %s", conn, b[:num])
	}
	// Log non-timeout errors and close.
	var netErr net.Error
	if !errors.As(err, &netErr) || !netErr.Timeout() {
		g.Log.Debugf("conn %s checkEOF .conn.Read returned err != EOF, which is unexpected.  closing conn. error: %s", conn, err)
		err = conn.Close()
		g.Log.Debugf("Failed to close the connection: %v", err)
		return err
	}

	return nil
}

func (g *Graphite) Write(metrics []telegraf.Metric) error {
	// Prepare data
	var batch []byte
	for _, metric := range metrics {
		buf, err := g.serializer.Serialize(metric)
		if err != nil {
			g.Log.Errorf("Error serializing some metrics to graphite: %s", err.Error())
		}
		batch = append(batch, buf...)
	}

	// Try to connect to all servers not yet connected if any
	if err := g.Connect(); err != nil {
		return fmt.Errorf("failed to reconnect: %w", err)
	}

	// Return on success of if we encounter a non-retryable error
	if err := g.send(batch); err == nil || !errors.Is(err, ErrNotConnected) {
		return err
	}

	// Try to reconnect and resend
	failedServers := make([]string, 0, len(g.connections))
	for _, c := range g.connections {
		if !c.connected {
			failedServers = append(failedServers, c.name)
		}
	}
	if len(failedServers) > 0 {
		g.Log.Debugf("Reconnecting and retrying for the following servers: %s", strings.Join(failedServers, ","))
		if err := g.Connect(); err != nil {
			return fmt.Errorf("failed to reconnect: %w", err)
		}
	}

	return g.send(batch)
}

func (g *Graphite) send(batch []byte) error {
	// Try sending the data to a server. Try them in random order
	p := rand.Perm(len(g.connections))
	for i, n := range p {
		server := g.connections[n]

		// Skip unconnected servers
		if !server.connected {
			continue
		}

		if g.Timeout > 0 {
			deadline := time.Now().Add(time.Duration(g.Timeout))
			if err := server.conn.SetWriteDeadline(deadline); err != nil {
				g.Log.Warnf("failed to set write deadline for %q: %v", server.name, err)
				g.connections[n].connected = false
				continue
			}
		}

		// Check the connection state
		if err := g.checkEOF(server.conn); err != nil {
			// Mark server as failed so a new connection will be made
			g.connections[n].connected = false
			continue
		}
		_, err := server.conn.Write(batch)
		if err == nil {
			// Sending the data was successfully
			return nil
		}

		g.Log.Errorf("Writing to %q failed: %v", server.name, err)
		if i < len(p)-1 {
			g.Log.Info("Trying next server...")
		}
		// Mark server as failed so a new connection will be made
		if server.conn != nil {
			if err := server.conn.Close(); err != nil {
				g.Log.Debugf("Failed to close connection to %q: %v", server.name, err)
			}
		}
		g.connections[n].connected = false
	}

	// If we end here, none of the writes were successful
	return ErrNotConnected
}

func init() {
	outputs.Add("graphite", func() telegraf.Output {
		return &Graphite{Timeout: config.Duration(2 * time.Second)}
	})
}
