// Copyright 2023 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package queue

import (
	"context"
	"fmt"
	"sync"
	"testing"
	"time"

	"github.com/stretchr/testify/assert"

	"go.woodpecker-ci.org/woodpecker/v2/server/model"
)

var noContext = context.Background()

func TestFifo(t *testing.T) {
	want := &model.Task{ID: "1"}

	q := New(context.Background())
	assert.NoError(t, q.Push(noContext, want))
	info := q.Info(noContext)
	assert.Len(t, info.Pending, 1, "expect task in pending queue")

	got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true })
	assert.NoError(t, err)
	assert.Equal(t, want, got)

	info = q.Info(noContext)
	assert.Len(t, info.Pending, 0, "expect task removed from pending queue")
	assert.Len(t, info.Running, 1, "expect task in running queue")

	assert.NoError(t, q.Done(noContext, got.ID, model.StatusSuccess))
	info = q.Info(noContext)
	assert.Len(t, info.Pending, 0, "expect task removed from pending queue")
	assert.Len(t, info.Running, 0, "expect task removed from running queue")
}

func TestFifoExpire(t *testing.T) {
	want := &model.Task{ID: "1"}

	q, _ := New(context.Background()).(*fifo)
	q.extension = 0
	assert.NoError(t, q.Push(noContext, want))
	info := q.Info(noContext)
	assert.Len(t, info.Pending, 1, "expect task in pending queue")

	got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true })
	assert.NoError(t, err)
	assert.Equal(t, want, got)

	q.process()
	assert.Len(t, info.Pending, 1, "expect task re-added to pending queue")
}

func TestFifoWait(t *testing.T) {
	want := &model.Task{ID: "1"}

	q, _ := New(context.Background()).(*fifo)
	assert.NoError(t, q.Push(noContext, want))

	got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true })
	assert.NoError(t, err)
	assert.Equal(t, want, got)

	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		assert.NoError(t, q.Wait(noContext, got.ID))
		wg.Done()
	}()

	<-time.After(time.Millisecond)
	assert.NoError(t, q.Done(noContext, got.ID, model.StatusSuccess))
	wg.Wait()
}

func TestFifoEvict(t *testing.T) {
	t1 := &model.Task{ID: "1"}

	q := New(context.Background())
	assert.NoError(t, q.Push(noContext, t1))
	info := q.Info(noContext)
	assert.Len(t, info.Pending, 1, "expect task in pending queue")
	err := q.Evict(noContext, t1.ID)
	assert.NoError(t, err)
	info = q.Info(noContext)
	assert.Len(t, info.Pending, 0)
	err = q.Evict(noContext, t1.ID)
	assert.ErrorIs(t, err, ErrNotFound)
}

func TestFifoDependencies(t *testing.T) {
	task1 := &model.Task{
		ID: "1",
	}

	task2 := &model.Task{
		ID:           "2",
		Dependencies: []string{"1"},
		DepStatus:    make(map[string]model.StatusValue),
	}

	q, _ := New(context.Background()).(*fifo)
	assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task1}))

	got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true })
	assert.NoError(t, err)
	assert.Equal(t, task1, got)

	assert.NoError(t, q.Done(noContext, got.ID, model.StatusSuccess))

	got, err = q.Poll(noContext, 1, func(*model.Task) bool { return true })
	assert.NoError(t, err)
	assert.Equal(t, task2, got)
}

func TestFifoErrors(t *testing.T) {
	task1 := &model.Task{
		ID: "1",
	}

	task2 := &model.Task{
		ID:           "2",
		Dependencies: []string{"1"},
		DepStatus:    make(map[string]model.StatusValue),
	}

	task3 := &model.Task{
		ID:           "3",
		Dependencies: []string{"1"},
		DepStatus:    make(map[string]model.StatusValue),
		RunOn:        []string{"success", "failure"},
	}

	q, _ := New(context.Background()).(*fifo)
	assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1}))

	got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true })
	assert.NoError(t, err)
	assert.Equal(t, task1, got)

	assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exit code 1, there was an error")))

	got, err = q.Poll(noContext, 1, func(*model.Task) bool { return true })
	assert.NoError(t, err)
	assert.Equal(t, task2, got)
	assert.False(t, got.ShouldRun())

	got, err = q.Poll(noContext, 1, func(*model.Task) bool { return true })
	assert.NoError(t, err)
	assert.Equal(t, task3, got)
	assert.True(t, got.ShouldRun())
}

func TestFifoErrors2(t *testing.T) {
	task1 := &model.Task{
		ID: "1",
	}

	task2 := &model.Task{
		ID: "2",
	}

	task3 := &model.Task{
		ID:           "3",
		Dependencies: []string{"1", "2"},
		DepStatus:    make(map[string]model.StatusValue),
	}

	q, _ := New(context.Background()).(*fifo)
	assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1}))

	for i := 0; i < 2; i++ {
		got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true })
		assert.NoError(t, err)
		assert.False(t, got != task1 && got != task2, "expect task1 or task2 returned from queue as task3 depends on them")

		if got != task1 {
			assert.NoError(t, q.Done(noContext, got.ID, model.StatusSuccess))
		}
		if got != task2 {
			assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exit code 1, there was an error")))
		}
	}

	got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true })
	assert.NoError(t, err)
	assert.Equal(t, task3, got)
	assert.False(t, got.ShouldRun())
}

func TestFifoErrorsMultiThread(t *testing.T) {
	task1 := &model.Task{
		ID: "1",
	}

	task2 := &model.Task{
		ID:           "2",
		Dependencies: []string{"1"},
		DepStatus:    make(map[string]model.StatusValue),
	}

	task3 := &model.Task{
		ID:           "3",
		Dependencies: []string{"1", "2"},
		DepStatus:    make(map[string]model.StatusValue),
	}

	q, _ := New(context.Background()).(*fifo)
	assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1}))

	obtainedWorkCh := make(chan *model.Task)

	for i := 0; i < 10; i++ {
		go func(i int) {
			for {
				fmt.Printf("Worker %d started\n", i)
				got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true })
				assert.NoError(t, err)
				obtainedWorkCh <- got
			}
		}(i)
	}

	task1Processed := false
	task2Processed := false

	for {
		select {
		case got := <-obtainedWorkCh:
			fmt.Println(got.ID)

			switch {
			case !task1Processed:
				assert.Equal(t, task1, got)
				task1Processed = true
				assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exit code 1, there was an error")))
				go func() {
					for {
						fmt.Printf("Worker spawned\n")
						got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true })
						obtainedWorkCh <- got
					}
				}()
			case !task2Processed:
				assert.Equal(t, task2, got)
				task2Processed = true
				assert.NoError(t, q.Done(noContext, got.ID, model.StatusSuccess))
				go func() {
					for {
						fmt.Printf("Worker spawned\n")
						got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true })
						obtainedWorkCh <- got
					}
				}()
			default:
				assert.Equal(t, task3, got)
				assert.False(t, got.ShouldRun(), "expect task3 should not run, task1 succeeded but task2 failed")
				return
			}

		case <-time.After(5 * time.Second):
			info := q.Info(noContext)
			fmt.Println(info.String())
			t.Errorf("test timed out")
			return
		}
	}
}

func TestFifoTransitiveErrors(t *testing.T) {
	task1 := &model.Task{
		ID: "1",
	}

	task2 := &model.Task{
		ID:           "2",
		Dependencies: []string{"1"},
		DepStatus:    make(map[string]model.StatusValue),
	}

	task3 := &model.Task{
		ID:           "3",
		Dependencies: []string{"2"},
		DepStatus:    make(map[string]model.StatusValue),
	}

	q, _ := New(context.Background()).(*fifo)
	assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1}))

	got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true })
	assert.NoError(t, err)
	assert.Equal(t, task1, got)
	assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exit code 1, there was an error")))

	got, err = q.Poll(noContext, 1, func(*model.Task) bool { return true })
	assert.NoError(t, err)
	assert.Equal(t, task2, got)
	assert.False(t, got.ShouldRun(), "expect task2 should not run, since task1 failed")
	assert.NoError(t, q.Done(noContext, got.ID, model.StatusSkipped))

	got, err = q.Poll(noContext, 1, func(*model.Task) bool { return true })
	assert.NoError(t, err)
	assert.Equal(t, task3, got)
	assert.False(t, got.ShouldRun(), "expect task3 should not run, task1 failed, thus task2 was skipped, task3 should be skipped too")
}

func TestFifoCancel(t *testing.T) {
	task1 := &model.Task{
		ID: "1",
	}

	task2 := &model.Task{
		ID:           "2",
		Dependencies: []string{"1"},
		DepStatus:    make(map[string]model.StatusValue),
	}

	task3 := &model.Task{
		ID:           "3",
		Dependencies: []string{"1"},
		DepStatus:    make(map[string]model.StatusValue),
		RunOn:        []string{"success", "failure"},
	}

	q, _ := New(context.Background()).(*fifo)
	assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1}))

	_, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true })
	assert.NoError(t, q.Error(noContext, task1.ID, fmt.Errorf("canceled")))
	assert.NoError(t, q.Error(noContext, task2.ID, fmt.Errorf("canceled")))
	assert.NoError(t, q.Error(noContext, task3.ID, fmt.Errorf("canceled")))

	info := q.Info(noContext)
	assert.Len(t, info.Pending, 0, "all pipelines should be canceled")
}

func TestFifoPause(t *testing.T) {
	task1 := &model.Task{
		ID: "1",
	}

	q, _ := New(context.Background()).(*fifo)
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		_, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true })
		wg.Done()
	}()

	q.Pause()
	t0 := time.Now()
	assert.NoError(t, q.Push(noContext, task1))
	time.Sleep(20 * time.Millisecond)
	q.Resume()

	wg.Wait()
	t1 := time.Now()

	assert.Greater(t, t1.Sub(t0), 20*time.Millisecond, "should have waited til resume")

	q.Pause()
	assert.NoError(t, q.Push(noContext, task1))
	q.Resume()
	_, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true })
}

func TestFifoPauseResume(t *testing.T) {
	task1 := &model.Task{
		ID: "1",
	}

	q, _ := New(context.Background()).(*fifo)
	q.Pause()
	assert.NoError(t, q.Push(noContext, task1))
	q.Resume()

	_, _ = q.Poll(noContext, 1, func(*model.Task) bool { return true })
}

func TestWaitingVsPending(t *testing.T) {
	task1 := &model.Task{
		ID: "1",
	}

	task2 := &model.Task{
		ID:           "2",
		Dependencies: []string{"1"},
		DepStatus:    make(map[string]model.StatusValue),
	}

	task3 := &model.Task{
		ID:           "3",
		Dependencies: []string{"1"},
		DepStatus:    make(map[string]model.StatusValue),
		RunOn:        []string{"success", "failure"},
	}

	q, _ := New(context.Background()).(*fifo)
	assert.NoError(t, q.PushAtOnce(noContext, []*model.Task{task2, task3, task1}))

	got, _ := q.Poll(noContext, 1, func(*model.Task) bool { return true })

	info := q.Info(noContext)
	assert.Equal(t, 2, info.Stats.WaitingOnDeps)

	assert.NoError(t, q.Error(noContext, got.ID, fmt.Errorf("exit code 1, there was an error")))
	got, err := q.Poll(noContext, 1, func(*model.Task) bool { return true })
	assert.NoError(t, err)
	assert.EqualValues(t, task2, got)

	info = q.Info(noContext)
	assert.Equal(t, 0, info.Stats.WaitingOnDeps)
	assert.Equal(t, 1, info.Stats.Pending)
}

func TestShouldRun(t *testing.T) {
	task := &model.Task{
		ID:           "2",
		Dependencies: []string{"1"},
		DepStatus: map[string]model.StatusValue{
			"1": model.StatusSuccess,
		},
		RunOn: []string{"failure"},
	}
	assert.False(t, task.ShouldRun(), "expect task to not run, it runs on failure only")

	task = &model.Task{
		ID:           "2",
		Dependencies: []string{"1"},
		DepStatus: map[string]model.StatusValue{
			"1": model.StatusSuccess,
		},
		RunOn: []string{"failure", "success"},
	}
	assert.True(t, task.ShouldRun())

	task = &model.Task{
		ID:           "2",
		Dependencies: []string{"1"},
		DepStatus: map[string]model.StatusValue{
			"1": model.StatusFailure,
		},
	}
	assert.False(t, task.ShouldRun())

	task = &model.Task{
		ID:           "2",
		Dependencies: []string{"1"},
		DepStatus: map[string]model.StatusValue{
			"1": model.StatusSuccess,
		},
		RunOn: []string{"success"},
	}
	assert.True(t, task.ShouldRun())

	task = &model.Task{
		ID:           "2",
		Dependencies: []string{"1"},
		DepStatus: map[string]model.StatusValue{
			"1": model.StatusFailure,
		},
		RunOn: []string{"failure"},
	}
	assert.True(t, task.ShouldRun())

	task = &model.Task{
		ID:           "2",
		Dependencies: []string{"1"},
		DepStatus: map[string]model.StatusValue{
			"1": model.StatusSkipped,
		},
	}
	assert.False(t, task.ShouldRun(), "task should not run if dependency is skipped")

	task = &model.Task{
		ID:           "2",
		Dependencies: []string{"1"},
		DepStatus: map[string]model.StatusValue{
			"1": model.StatusSkipped,
		},
		RunOn: []string{"failure"},
	}
	assert.True(t, task.ShouldRun(), "on failure, tasks should run on skipped deps, something failed higher up the chain")
}