/*
   Copyright The containerd Authors.

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
*/

package composer

import (
	"context"
	"errors"
	"fmt"
	"sync"

	"github.com/compose-spec/compose-go/v2/format"
	"github.com/compose-spec/compose-go/v2/types"
	"golang.org/x/sync/errgroup"

	"github.com/containerd/log"

	"github.com/containerd/nerdctl/v2/pkg/composer/serviceparser"
	"github.com/containerd/nerdctl/v2/pkg/idgen"
)

type RunOptions struct {
	ServiceName string
	Args        []string

	NoBuild       bool
	NoColor       bool
	NoLogPrefix   bool
	ForceBuild    bool
	QuietPull     bool
	RemoveOrphans bool

	Name         string
	Detach       bool
	NoDeps       bool
	Tty          bool
	SigProxy     bool
	Interactive  bool
	Rm           bool
	User         string
	Volume       []string
	Entrypoint   []string
	Env          []string
	Label        []string
	WorkDir      string
	ServicePorts bool
	Publish      []string
}

func (c *Composer) Run(ctx context.Context, ro RunOptions) error {
	for shortName := range c.project.Networks {
		if err := c.upNetwork(ctx, shortName); err != nil {
			return err
		}
	}

	for shortName := range c.project.Volumes {
		if err := c.upVolume(ctx, shortName); err != nil {
			return err
		}
	}

	for shortName, secret := range c.project.Secrets {
		obj := types.FileObjectConfig(secret)
		if err := validateFileObjectConfig(obj, shortName, "service", c.project); err != nil {
			return err
		}
	}

	for shortName, config := range c.project.Configs {
		obj := types.FileObjectConfig(config)
		if err := validateFileObjectConfig(obj, shortName, "config", c.project); err != nil {
			return err
		}
	}

	var svcs []types.ServiceConfig

	if ro.NoDeps {
		svc, err := c.project.GetService(ro.ServiceName)
		if err != nil {
			return err
		}
		svcs = append(svcs, svc)
	} else {
		if err := c.project.ForEachService([]string{ro.ServiceName}, func(name string, svc *types.ServiceConfig) error {
			svcs = append(svcs, *svc)
			return nil
		}); err != nil {
			return err
		}
	}

	var targetSvc *types.ServiceConfig
	for i := range svcs {
		if svcs[i].Name == ro.ServiceName {
			targetSvc = &svcs[i]
			break
		}
	}
	if targetSvc == nil {
		return fmt.Errorf("error cannot find service name: %s", ro.ServiceName)
	}

	for i := range svcs {
		// FYI: https://github.com/docker/compose/blob/v2.18.1/pkg/compose/run.go#L65
		svcs[i].ContainerName = fmt.Sprintf("%[1]s%[4]s%[2]s%[4]srun%[4]s%[3]s", c.project.Name, svcs[i].Name, idgen.TruncateID(idgen.GenerateID()), serviceparser.Separator)
	}

	targetSvc.Tty = ro.Tty
	targetSvc.StdinOpen = ro.Interactive

	if ro.Name != "" {
		targetSvc.ContainerName = ro.Name
	}
	if ro.User != "" {
		targetSvc.User = ro.User
	}
	if len(ro.Volume) > 0 {
		for _, v := range ro.Volume {
			vc, err := format.ParseVolume(v)
			if err != nil {
				return err
			}
			targetSvc.Volumes = append(targetSvc.Volumes, vc)
		}
	}
	if len(ro.Entrypoint) > 0 {
		targetSvc.Entrypoint = make([]string, len(ro.Entrypoint))
		copy(targetSvc.Entrypoint, ro.Entrypoint)
	}
	if len(ro.Env) > 0 {
		envs := types.NewMappingWithEquals(ro.Env)
		targetSvc.Environment.OverrideBy(envs)
	}
	if len(ro.Label) > 0 {
		label := types.NewMappingWithEquals(ro.Label)
		for k, v := range label {
			if v != nil {
				targetSvc.Labels.Add(k, *v)
			}
		}
	}
	if ro.WorkDir != "" {
		c.project.WorkingDir = ro.WorkDir
	}

	// `compose run` command does not create any of the ports specified in the service configuration.
	if !ro.ServicePorts {
		for k := range svcs {
			svcs[k].Ports = []types.ServicePortConfig{}
		}
		if len(ro.Publish) > 0 {
			for _, p := range ro.Publish {
				pc, err := types.ParsePortConfig(p)
				if err != nil {
					return fmt.Errorf("error parse --publish: %w", err)
				}
				targetSvc.Ports = append(targetSvc.Ports, pc...)
			}
		}
	}

	// `compose run` command overrides the command defined in the service configuration.
	if len(ro.Args) != 0 {
		targetSvc.Command = make([]string, len(ro.Args))
		copy(targetSvc.Command, ro.Args)
	}

	parsedServices := make([]*serviceparser.Service, 0)
	for _, svc := range svcs {
		ps, err := serviceparser.Parse(c.project, svc)
		if err != nil {
			return err
		}
		parsedServices = append(parsedServices, ps)
	}

	// remove orphan containers before the service has be started
	// FYI: https://github.com/docker/compose/blob/v2.3.4/pkg/compose/create.go#L91-L112
	orphans, err := c.getOrphanContainers(ctx, parsedServices)
	if err != nil && ro.RemoveOrphans {
		return fmt.Errorf("error getting orphaned containers: %w", err)
	}
	if len(orphans) > 0 {
		if ro.RemoveOrphans {
			if err := c.removeContainers(ctx, orphans, RemoveOptions{Stop: true, Volumes: true}); err != nil {
				return fmt.Errorf("error removing orphaned containers: %w", err)
			}
		} else {
			log.G(ctx).Warnf("found %d orphaned containers: %v, you can run this command with the --remove-orphans flag to clean it up", len(orphans), containerShortIDs(orphans))
		}
	}

	return c.runServices(ctx, parsedServices, ro)
}

func (c *Composer) runServices(ctx context.Context, parsedServices []*serviceparser.Service, ro RunOptions) error {
	if len(parsedServices) == 0 {
		return errors.New("no service was provided")
	}

	// TODO: parallelize loop for ensuring images (make sure not to mess up tty)
	for _, ps := range parsedServices {
		if err := c.ensureServiceImage(ctx, ps, !ro.NoBuild, ro.ForceBuild, BuildOptions{}, ro.QuietPull, ""); err != nil {
			return err
		}
	}

	var (
		containers   = make(map[string]serviceparser.Container) // key: container ID
		services     = []string{}
		containersMu sync.Mutex
		runEG        errgroup.Group
		cid          string // For printing cid when -d exists
	)

	for _, ps := range parsedServices {
		ps := ps
		services = append(services, ps.Unparsed.Name)

		if len(ps.Containers) != 1 {
			log.G(ctx).Warnf("compose run does not support scale but %s is currently %v, automatically it will configure 1", ps.Unparsed.Name, len(ps.Containers))
		}

		if len(ps.Containers) == 0 {
			return fmt.Errorf("error, a service should have at least one container but %s does not have any container", ps.Unparsed.Name)
		}
		container := ps.Containers[0]

		runEG.Go(func() error {
			id, err := c.upServiceContainer(ctx, ps, container, RecreateForce)
			if err != nil {
				return err
			}
			containersMu.Lock()
			containers[id] = container
			containersMu.Unlock()
			if ps.Unparsed.Name == ro.ServiceName {
				cid = id
			}
			return nil
		})
	}
	if err := runEG.Wait(); err != nil {
		return err
	}

	if ro.Detach {
		log.G(ctx).Printf("%s\n", cid)
		return nil
	}

	// TODO: fix it when `nerdctl logs` supports `nerdctl run` without detach
	// https://github.com/containerd/nerdctl/blob/v0.22.2/pkg/taskutil/taskutil.go#L55
	if !ro.Interactive && !ro.Tty {
		log.G(ctx).Info("Attaching to logs")
		lo := LogsOptions{
			Follow:      true,
			NoColor:     ro.NoColor,
			NoLogPrefix: ro.NoLogPrefix,
		}
		// it finally causes to show logs of some containers which are stopped but not deleted.
		if err := c.Logs(ctx, lo, services); err != nil {
			return err
		}
	}

	log.G(ctx).Infof("Stopping containers (forcibly)") // TODO: support gracefully stopping
	c.stopContainersFromParsedServices(ctx, containers)

	if ro.Rm {
		c.removeContainersFromParsedServices(ctx, containers)
	}
	return nil
}
