package models

import (
	"os"
	"testing"
	"time"

	"github.com/stretchr/testify/suite"

	"github.com/influxdata/telegraf"
	"github.com/influxdata/telegraf/metric"
	"github.com/influxdata/telegraf/testutil"
)

type BufferSuiteTest struct {
	suite.Suite
	bufferType string
	bufferPath string

	hasMaxCapacity bool // whether the buffer type being tested supports a maximum metric capacity
}

func (s *BufferSuiteTest) SetupTest() {
	switch s.bufferType {
	case "", "memory":
		s.hasMaxCapacity = true
	case "disk_write_through":
		path, err := os.MkdirTemp("", "*-buffer-test")
		s.Require().NoError(err)
		s.bufferPath = path
		s.hasMaxCapacity = false
	}
}

func (s *BufferSuiteTest) TearDownTest() {
	if s.bufferPath != "" {
		s.NoError(os.RemoveAll(s.bufferPath))
		s.bufferPath = ""
	}
}

func TestMemoryBufferSuite(t *testing.T) {
	suite.Run(t, &BufferSuiteTest{bufferType: "memory"})
}

func TestDiskBufferSuite(t *testing.T) {
	suite.Run(t, &BufferSuiteTest{bufferType: "disk_write_through"})
}

func (s *BufferSuiteTest) newTestBuffer(capacity int) Buffer {
	s.T().Helper()
	buf, err := NewBuffer("test", "123", "", capacity, s.bufferType, s.bufferPath, true)
	s.Require().NoError(err)
	buf.Stats().MetricsAdded.Set(0)
	buf.Stats().MetricsWritten.Set(0)
	buf.Stats().MetricsRejected.Set(0)
	buf.Stats().MetricsDropped.Set(0)
	return buf
}

func (s *BufferSuiteTest) TestBufferLenEmpty() {
	buf := s.newTestBuffer(5)
	defer buf.Close()

	s.Equal(0, buf.Len())
}

func (s *BufferSuiteTest) TestBufferLenOne() {
	buf := s.newTestBuffer(5)
	defer buf.Close()

	m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
	buf.Add(m)
	s.Equal(1, buf.Len())
}

func (s *BufferSuiteTest) TestBufferLenFull() {
	buf := s.newTestBuffer(5)
	defer buf.Close()

	m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
	buf.Add(m, m, m, m, m)
	s.Equal(5, buf.Len())
}

func (s *BufferSuiteTest) TestBufferLenOverfill() {
	if !s.hasMaxCapacity {
		s.T().Skip("tested buffer does not have a maximum capacity")
	}

	buf := s.newTestBuffer(5)
	defer buf.Close()

	m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
	buf.Add(m, m, m, m, m, m)
	s.Equal(5, buf.Len())
}

func (s *BufferSuiteTest) TestBufferBatchLenZero() {
	buf := s.newTestBuffer(5)
	defer buf.Close()

	tx := buf.BeginTransaction(0)
	s.Empty(tx.Batch)
}

func (s *BufferSuiteTest) TestBufferBatchLenBufferEmpty() {
	buf := s.newTestBuffer(5)
	defer buf.Close()

	tx := buf.BeginTransaction(2)
	s.Empty(tx.Batch)
}

func (s *BufferSuiteTest) TestBufferBatchLenUnderfill() {
	buf := s.newTestBuffer(5)
	defer buf.Close()

	m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
	buf.Add(m)
	tx := buf.BeginTransaction(2)
	s.Len(tx.Batch, 1)
}

func (s *BufferSuiteTest) TestBufferBatchLenFill() {
	buf := s.newTestBuffer(5)
	defer buf.Close()

	m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
	buf.Add(m, m, m)
	tx := buf.BeginTransaction(2)
	s.Len(tx.Batch, 2)
}

func (s *BufferSuiteTest) TestBufferBatchLenExact() {
	buf := s.newTestBuffer(5)
	defer buf.Close()

	m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
	buf.Add(m, m)
	tx := buf.BeginTransaction(2)
	s.Len(tx.Batch, 2)
}

func (s *BufferSuiteTest) TestBufferBatchLenLargerThanBuffer() {
	buf := s.newTestBuffer(5)
	defer buf.Close()

	m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
	buf.Add(m, m, m, m, m)
	tx := buf.BeginTransaction(6)
	s.Len(tx.Batch, 5)
}

func (s *BufferSuiteTest) TestBufferBatchWrap() {
	buf := s.newTestBuffer(5)
	defer buf.Close()

	m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
	buf.Add(m, m, m, m, m)
	tx := buf.BeginTransaction(2)
	tx.AcceptAll()
	buf.EndTransaction(tx)
	buf.Add(m, m)
	tx = buf.BeginTransaction(5)
	s.Len(tx.Batch, 5)
}

func (s *BufferSuiteTest) TestBufferBatchLatest() {
	buf := s.newTestBuffer(4)
	defer buf.Close()

	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
	tx := buf.BeginTransaction(2)

	testutil.RequireMetricsEqual(s.T(),
		[]telegraf.Metric{
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)),
		}, tx.Batch)
}

func (s *BufferSuiteTest) TestBufferBatchLatestWrap() {
	if !s.hasMaxCapacity {
		s.T().Skip("tested buffer does not have a maximum capacity")
	}

	buf := s.newTestBuffer(4)
	defer buf.Close()

	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))
	tx := buf.BeginTransaction(2)

	testutil.RequireMetricsEqual(s.T(),
		[]telegraf.Metric{
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)),
		}, tx.Batch)
}

func (s *BufferSuiteTest) TestBufferMultipleBatch() {
	buf := s.newTestBuffer(10)
	defer buf.Close()

	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)))
	tx := buf.BeginTransaction(5)
	testutil.RequireMetricsEqual(s.T(),
		[]telegraf.Metric{
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)),
		}, tx.Batch)
	tx.AcceptAll()
	buf.EndTransaction(tx)
	tx = buf.BeginTransaction(5)
	testutil.RequireMetricsEqual(s.T(),
		[]telegraf.Metric{
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)),
		}, tx.Batch)
	tx.AcceptAll()
	buf.EndTransaction(tx)
}

func (s *BufferSuiteTest) TestBufferRejectWithRoom() {
	buf := s.newTestBuffer(5)
	defer buf.Close()

	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
	tx := buf.BeginTransaction(2)
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))
	tx.KeepAll()
	buf.EndTransaction(tx)

	s.Equal(int64(0), buf.Stats().MetricsDropped.Get())

	tx = buf.BeginTransaction(5)
	testutil.RequireMetricsEqual(s.T(),
		[]telegraf.Metric{
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)),
		}, tx.Batch)
}

func (s *BufferSuiteTest) TestBufferRejectNothingNewFull() {
	buf := s.newTestBuffer(5)
	defer buf.Close()

	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))
	tx := buf.BeginTransaction(2)
	tx.KeepAll()
	buf.EndTransaction(tx)

	s.Equal(int64(0), buf.Stats().MetricsDropped.Get())

	tx = buf.BeginTransaction(5)
	testutil.RequireMetricsEqual(s.T(),
		[]telegraf.Metric{
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)),
		}, tx.Batch)
}

func (s *BufferSuiteTest) TestBufferRejectNoRoom() {
	if !s.hasMaxCapacity {
		s.T().Skip("tested buffer does not have a maximum capacity")
	}

	buf := s.newTestBuffer(5)
	defer buf.Close()

	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
	tx := buf.BeginTransaction(2)

	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(8, 0)))
	tx.KeepAll()
	buf.EndTransaction(tx)

	s.Equal(int64(3), buf.Stats().MetricsDropped.Get())

	tx = buf.BeginTransaction(5)
	testutil.RequireMetricsEqual(s.T(),
		[]telegraf.Metric{
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(8, 0)),
		}, tx.Batch)
}

func (s *BufferSuiteTest) TestBufferRejectRoomExact() {
	buf := s.newTestBuffer(5)
	defer buf.Close()

	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
	tx := buf.BeginTransaction(2)
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))

	tx.KeepAll()
	buf.EndTransaction(tx)

	s.Equal(int64(0), buf.Stats().MetricsDropped.Get())

	tx = buf.BeginTransaction(5)
	testutil.RequireMetricsEqual(s.T(),
		[]telegraf.Metric{
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)),
		}, tx.Batch)
}

func (s *BufferSuiteTest) TestBufferRejectRoomOverwriteOld() {
	if !s.hasMaxCapacity {
		s.T().Skip("tested buffer does not have a maximum capacity")
	}

	buf := s.newTestBuffer(5)
	defer buf.Close()

	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
	tx := buf.BeginTransaction(1)
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)))

	tx.KeepAll()
	buf.EndTransaction(tx)

	s.Equal(int64(1), buf.Stats().MetricsDropped.Get())

	tx = buf.BeginTransaction(5)
	testutil.RequireMetricsEqual(s.T(),
		[]telegraf.Metric{
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)),
		}, tx.Batch)
}

func (s *BufferSuiteTest) TestBufferRejectPartialRoom() {
	if !s.hasMaxCapacity {
		s.T().Skip("tested buffer does not have a maximum capacity")
	}

	buf := s.newTestBuffer(5)
	defer buf.Close()

	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
	tx := buf.BeginTransaction(2)
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0)))
	tx.KeepAll()
	buf.EndTransaction(tx)

	s.Equal(int64(2), buf.Stats().MetricsDropped.Get())

	tx = buf.BeginTransaction(5)
	testutil.RequireMetricsEqual(s.T(),
		[]telegraf.Metric{
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0)),
		}, tx.Batch)
}

func (s *BufferSuiteTest) TestBufferRejectNewMetricsWrapped() {
	if !s.hasMaxCapacity {
		s.T().Skip("tested buffer does not have a maximum capacity")
	}

	buf := s.newTestBuffer(5)
	defer buf.Close()

	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
	tx := buf.BeginTransaction(2)
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))

	// buffer: 1, 4, 5; batch: 2, 3
	s.Equal(int64(0), buf.Stats().MetricsDropped.Get())

	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(8, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(9, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(10, 0)))

	// buffer: 8, 9, 10, 6, 7; batch: 2, 3
	s.Equal(int64(3), buf.Stats().MetricsDropped.Get())

	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(11, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(12, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(13, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(14, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(15, 0)))
	// buffer: 13, 14, 15, 11, 12; batch: 2, 3
	s.Equal(int64(8), buf.Stats().MetricsDropped.Get())
	tx.KeepAll()
	buf.EndTransaction(tx)

	s.Equal(int64(10), buf.Stats().MetricsDropped.Get())

	tx = buf.BeginTransaction(5)
	testutil.RequireMetricsEqual(s.T(),
		[]telegraf.Metric{
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(11, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(12, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(13, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(14, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(15, 0)),
		}, tx.Batch)
}

func (s *BufferSuiteTest) TestBufferRejectWrapped() {
	if !s.hasMaxCapacity {
		s.T().Skip("tested buffer does not have a maximum capacity")
	}

	buf := s.newTestBuffer(5)
	defer buf.Close()

	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))

	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(8, 0)))
	tx := buf.BeginTransaction(3)

	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(9, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(10, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(11, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(12, 0)))

	tx.KeepAll()
	buf.EndTransaction(tx)

	tx = buf.BeginTransaction(5)
	testutil.RequireMetricsEqual(s.T(),
		[]telegraf.Metric{
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(8, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(9, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(10, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(11, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(12, 0)),
		}, tx.Batch)
}

func (s *BufferSuiteTest) TestBufferRejectAdjustFirst() {
	if !s.hasMaxCapacity {
		s.T().Skip("tested buffer does not have a maximum capacity")
	}

	buf := s.newTestBuffer(10)
	defer buf.Close()

	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
	tx := buf.BeginTransaction(3)

	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)))
	tx.KeepAll()
	buf.EndTransaction(tx)

	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(8, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(9, 0)))
	tx = buf.BeginTransaction(3)

	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(10, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(11, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(12, 0)))
	tx.KeepAll()
	buf.EndTransaction(tx)

	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(13, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(14, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(15, 0)))
	tx = buf.BeginTransaction(3)

	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(16, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(17, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(18, 0)))
	tx.KeepAll()
	buf.EndTransaction(tx)

	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(19, 0)))

	tx = buf.BeginTransaction(10)
	testutil.RequireMetricsEqual(s.T(),
		[]telegraf.Metric{
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(10, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(11, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(12, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(13, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(14, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(15, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(16, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(17, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(18, 0)),
			metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(19, 0)),
		}, tx.Batch)
}

func (s *BufferSuiteTest) TestBufferAddDropsOverwrittenMetrics() {
	if !s.hasMaxCapacity {
		s.T().Skip("tested buffer does not have a maximum capacity")
	}

	buf := s.newTestBuffer(5)
	defer buf.Close()

	m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
	buf.Add(m, m, m, m, m)
	buf.Add(m, m, m, m, m)

	s.Equal(int64(5), buf.Stats().MetricsDropped.Get())
	s.Equal(int64(0), buf.Stats().MetricsWritten.Get())
}

func (s *BufferSuiteTest) TestBufferAcceptRemovesBatch() {
	buf := s.newTestBuffer(5)
	defer buf.Close()

	m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
	buf.Add(m, m, m)
	tx := buf.BeginTransaction(2)
	tx.AcceptAll()
	buf.EndTransaction(tx)
	s.Equal(1, buf.Len())
}

func (s *BufferSuiteTest) TestBufferRejectLeavesBatch() {
	buf := s.newTestBuffer(5)
	defer buf.Close()

	m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
	buf.Add(m, m, m)
	tx := buf.BeginTransaction(2)
	tx.KeepAll()
	buf.EndTransaction(tx)
	s.Equal(3, buf.Len())
}

func (s *BufferSuiteTest) TestBufferAcceptWritesOverwrittenBatch() {
	buf := s.newTestBuffer(5)
	defer buf.Close()

	m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
	buf.Add(m, m, m, m, m)
	tx := buf.BeginTransaction(5)
	buf.Add(m, m, m, m, m)
	tx.AcceptAll()
	buf.EndTransaction(tx)

	s.Equal(int64(0), buf.Stats().MetricsDropped.Get())
	s.Equal(int64(5), buf.Stats().MetricsWritten.Get())
}

func (s *BufferSuiteTest) TestBufferBatchRejectDropsOverwrittenBatch() {
	if !s.hasMaxCapacity {
		s.T().Skip("tested buffer does not have a maximum capacity")
	}

	buf := s.newTestBuffer(5)
	defer buf.Close()

	m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
	buf.Add(m, m, m, m, m)
	tx := buf.BeginTransaction(5)
	buf.Add(m, m, m, m, m)
	tx.KeepAll()
	buf.EndTransaction(tx)

	s.Equal(int64(5), buf.Stats().MetricsDropped.Get())
	s.Equal(int64(0), buf.Stats().MetricsWritten.Get())
}

func (s *BufferSuiteTest) TestBufferMetricsOverwriteBatchAccept() {
	buf := s.newTestBuffer(5)
	defer buf.Close()

	m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
	buf.Add(m, m, m, m, m)
	tx := buf.BeginTransaction(3)
	buf.Add(m, m, m)
	tx.AcceptAll()
	buf.EndTransaction(tx)
	s.Equal(int64(0), buf.Stats().MetricsDropped.Get(), "dropped")
	s.Equal(int64(3), buf.Stats().MetricsWritten.Get(), "written")
}

func (s *BufferSuiteTest) TestBufferMetricsOverwriteBatchReject() {
	if !s.hasMaxCapacity {
		s.T().Skip("tested buffer does not have a maximum capacity")
	}

	buf := s.newTestBuffer(5)
	defer buf.Close()

	m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
	buf.Add(m, m, m, m, m)
	tx := buf.BeginTransaction(3)
	buf.Add(m, m, m)
	tx.KeepAll()
	buf.EndTransaction(tx)
	s.Equal(int64(3), buf.Stats().MetricsDropped.Get())
	s.Equal(int64(0), buf.Stats().MetricsWritten.Get())
}

func (s *BufferSuiteTest) TestBufferMetricsBatchAcceptRemoved() {
	if !s.hasMaxCapacity {
		s.T().Skip("tested buffer does not have a maximum capacity")
	}

	buf := s.newTestBuffer(5)
	defer buf.Close()

	m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
	buf.Add(m, m, m, m, m)
	tx := buf.BeginTransaction(3)
	buf.Add(m, m, m, m, m)
	tx.AcceptAll()
	buf.EndTransaction(tx)
	s.Equal(int64(2), buf.Stats().MetricsDropped.Get())
	s.Equal(int64(3), buf.Stats().MetricsWritten.Get())
}

func (s *BufferSuiteTest) TestBufferWrapWithBatch() {
	if !s.hasMaxCapacity {
		s.T().Skip("tested buffer does not have a maximum capacity")
	}

	buf := s.newTestBuffer(5)
	defer buf.Close()

	m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
	buf.Add(m, m, m)
	tx := buf.BeginTransaction(3)
	buf.Add(m, m, m, m, m, m)
	s.Equal(int64(1), buf.Stats().MetricsDropped.Get())
	buf.EndTransaction(tx)
}

func (s *BufferSuiteTest) TestBufferBatchNotRemoved() {
	buf := s.newTestBuffer(5)
	defer buf.Close()

	m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
	buf.Add(m, m, m, m, m)
	tx := buf.BeginTransaction(2)
	s.Equal(5, buf.Len())
	buf.EndTransaction(tx)
}

func (s *BufferSuiteTest) TestBufferBatchRejectAcceptNoop() {
	buf := s.newTestBuffer(5)
	defer buf.Close()

	m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
	buf.Add(m, m, m, m, m)
	tx := buf.BeginTransaction(2)
	tx.KeepAll()
	buf.EndTransaction(tx)
	tx.AcceptAll()
	buf.EndTransaction(tx)
	s.Equal(5, buf.Len())
}

func (s *BufferSuiteTest) TestBufferAddCallsMetricRejectWhenNoBatch() {
	if !s.hasMaxCapacity {
		s.T().Skip("tested buffer does not have a maximum capacity")
	}

	buf := s.newTestBuffer(5)
	defer buf.Close()

	var reject int
	mm := &mockMetric{
		Metric: metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)),
		RejectF: func() {
			reject++
		},
	}
	buf.Add(mm, mm, mm, mm, mm)
	buf.Add(mm, mm)
	s.Equal(2, reject)
}

func (s *BufferSuiteTest) TestBufferAddCallsMetricRejectWhenNotInBatch() {
	if !s.hasMaxCapacity {
		s.T().Skip("tested buffer does not have a maximum capacity")
	}

	buf := s.newTestBuffer(5)
	defer buf.Close()

	var reject int
	mm := &mockMetric{
		Metric: metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)),
		RejectF: func() {
			reject++
		},
	}
	buf.Add(mm, mm, mm, mm, mm)
	tx := buf.BeginTransaction(2)
	buf.Add(mm, mm, mm, mm)
	s.Equal(2, reject)
	tx.KeepAll()
	buf.EndTransaction(tx)
	s.Equal(4, reject)
}

func (s *BufferSuiteTest) TestBufferRejectCallsMetricRejectWithOverwritten() {
	if !s.hasMaxCapacity {
		s.T().Skip("tested buffer does not have a maximum capacity")
	}

	buf := s.newTestBuffer(5)
	defer buf.Close()

	var reject int
	mm := &mockMetric{
		Metric: metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)),
		RejectF: func() {
			reject++
		},
	}
	buf.Add(mm, mm, mm, mm, mm)
	tx := buf.BeginTransaction(5)
	buf.Add(mm, mm)
	s.Equal(0, reject)
	tx.KeepAll()
	buf.EndTransaction(tx)
	s.Equal(2, reject)
}

func (s *BufferSuiteTest) TestBufferAddOverwriteAndReject() {
	if !s.hasMaxCapacity {
		s.T().Skip("tested buffer does not have a maximum capacity")
	}

	buf := s.newTestBuffer(5)
	defer buf.Close()

	var reject int
	mm := &mockMetric{
		Metric: metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)),
		RejectF: func() {
			reject++
		},
	}
	buf.Add(mm, mm, mm, mm, mm)
	tx := buf.BeginTransaction(5)
	buf.Add(mm, mm, mm, mm, mm)
	buf.Add(mm, mm, mm, mm, mm)
	buf.Add(mm, mm, mm, mm, mm)
	buf.Add(mm, mm, mm, mm, mm)
	s.Equal(15, reject)
	tx.KeepAll()
	buf.EndTransaction(tx)
	s.Equal(20, reject)
}

func (s *BufferSuiteTest) TestBufferAddOverwriteAndRejectOffset() {
	if !s.hasMaxCapacity {
		s.T().Skip("tested buffer does not have a maximum capacity")
	}

	buf := s.newTestBuffer(5)
	defer buf.Close()

	var reject int
	var accept int
	mm := &mockMetric{
		Metric: metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)),
		RejectF: func() {
			reject++
		},
		AcceptF: func() {
			accept++
		},
	}
	buf.Add(mm, mm, mm)
	buf.Add(mm, mm, mm, mm)
	s.Equal(2, reject)
	tx := buf.BeginTransaction(5)
	buf.Add(mm, mm, mm, mm)
	s.Equal(2, reject)
	buf.Add(mm, mm, mm, mm)
	s.Equal(5, reject)
	buf.Add(mm, mm, mm, mm)
	s.Equal(9, reject)
	buf.Add(mm, mm, mm, mm)
	s.Equal(13, reject)
	tx.AcceptAll()
	buf.EndTransaction(tx)
	s.Equal(13, reject)
	s.Equal(5, accept)
}

func (s *BufferSuiteTest) TestBufferRejectEmptyBatch() {
	buf := s.newTestBuffer(5)
	defer buf.Close()

	tx := buf.BeginTransaction(2)
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
	tx.KeepAll()
	buf.EndTransaction(tx)
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
	tx = buf.BeginTransaction(2)
	for _, m := range tx.Batch {
		s.NotNil(m)
	}
	buf.EndTransaction(tx)
}

func (s *BufferSuiteTest) TestBufferFlushedPartial() {
	buf := s.newTestBuffer(5)
	defer buf.Close()

	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)))
	tx := buf.BeginTransaction(2)
	s.Len(tx.Batch, 2)

	tx.AcceptAll()
	buf.EndTransaction(tx)
	s.Equal(1, buf.Len())
}

func (s *BufferSuiteTest) TestBufferFlushedFull() {
	buf := s.newTestBuffer(5)
	defer buf.Close()

	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)))
	buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)))
	tx := buf.BeginTransaction(2)
	s.Len(tx.Batch, 2)

	tx.AcceptAll()
	buf.EndTransaction(tx)
	s.Equal(0, buf.Len())
}

func (s *BufferSuiteTest) TestPartialWriteBackToFront() {
	buf := s.newTestBuffer(5)
	defer buf.Close()

	m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0))
	buf.Add(m, m, m, m, m)

	// Get a batch of all metrics but only reject the last one
	tx := buf.BeginTransaction(5)
	s.Len(tx.Batch, 5)
	tx.Reject = []int{4}
	buf.EndTransaction(tx)
	s.Equal(4, buf.Len())

	// Get the next batch which should miss the last metric
	tx = buf.BeginTransaction(5)
	s.Len(tx.Batch, 4)
	tx.Accept = []int{3}
	buf.EndTransaction(tx)
	s.Equal(3, buf.Len())

	// Now get the next batch and reject the remaining metrics
	tx = buf.BeginTransaction(5)
	s.Len(tx.Batch, 3)
	tx.Accept = []int{0, 1, 2}
	buf.EndTransaction(tx)
	s.Equal(0, buf.Len())

	s.Equal(int64(5), buf.Stats().MetricsAdded.Get(), "metrics added")
	s.Equal(int64(4), buf.Stats().MetricsWritten.Get(), "metrics written")
	s.Equal(int64(1), buf.Stats().MetricsRejected.Get(), "metrics rejected")
	s.Equal(int64(0), buf.Stats().MetricsDropped.Get(), "metrics dropped")
}

type mockMetric struct {
	telegraf.Metric
	AcceptF func()
	RejectF func()
	DropF   func()
}

func (m *mockMetric) Accept() {
	if m.AcceptF != nil {
		m.AcceptF()
	}
}

func (m *mockMetric) Reject() {
	if m.RejectF != nil {
		m.RejectF()
	}
}

func (m *mockMetric) Drop() {
	if m.DropF != nil {
		m.DropF()
	}
}
