1
0
mirror of https://github.com/woodpecker-ci/woodpecker.git synced 2025-10-30 23:27:39 +02:00

Enhancement log stream reading and writing and handle new lines and max-size (#5683)

Co-authored-by: qwerty287 <80460567+qwerty287@users.noreply.github.com>
Co-authored-by: Anbraten <6918444+anbraten@users.noreply.github.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
This commit is contained in:
LUKIEYF
2025-10-30 02:13:20 +08:00
committed by GitHub
parent 28a4197160
commit 7ce65365b1
2 changed files with 241 additions and 10 deletions

View File

@@ -16,6 +16,7 @@ package log
import (
"bufio"
"bytes"
"errors"
"io"
)
@@ -43,18 +44,62 @@ func writeChunks(dst io.Writer, data []byte, size int) error {
func CopyLineByLine(dst io.Writer, src io.Reader, maxSize int) error {
r := bufio.NewReader(src)
// buffer to cache
var buf []byte
// buffer to read
readBuf := make([]byte, maxSize)
for {
// TODO: read til newline or maxSize directly
line, err := r.ReadBytes('\n')
if len(line) > 0 {
if err := writeChunks(dst, line, maxSize); err != nil {
return err
n, err := r.Read(readBuf)
// handle the data first
if n > 0 {
// if it has data, cache into the buffer
buf = append(buf, readBuf[:n]...)
processBuffer:
for len(buf) > 0 {
// find the index to anchor the new line
idx := bytes.IndexByte(buf, '\n')
switch {
case idx >= 0:
// found the new line, write to the dst
lineEnd := idx + 1
if lineEnd > maxSize {
if wErr := writeChunks(dst, buf[:lineEnd], maxSize); wErr != nil {
return wErr
}
} else {
if _, wErr := dst.Write(buf[:lineEnd]); wErr != nil {
return wErr
}
}
// remove the line written from the buffer
buf = buf[lineEnd:]
case len(buf) >= maxSize:
if _, wErr := dst.Write(buf[:maxSize]); wErr != nil {
return wErr
}
buf = buf[maxSize:]
default:
// no newline found and buffer not full, read more data
break processBuffer
}
}
}
// and then if it is EOF, write the remaining data and break the loop
if errors.Is(err, io.EOF) {
if len(buf) == 0 {
break
} else if err != nil {
}
if _, wErr := dst.Write(buf); wErr != nil {
return wErr
}
break
}
if err != nil {
return err
}
}

View File

@@ -71,7 +71,7 @@ func TestCopyLineByLine(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
// Wait until no writes have occurred (should be immediate)
// wait until no writes have occurred (should be immediate)
assert.Eventually(t, func() bool {
return len(testWriter.GetWrites()) == 0
}, time.Second, 5*time.Millisecond, "expected 0 writes after first write")
@@ -81,7 +81,7 @@ func TestCopyLineByLine(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
// Wait until two writes have occurred
// wait until two writes have occurred
assert.Eventually(t, func() bool {
return len(testWriter.GetWrites()) == 2
}, time.Second, 5*time.Millisecond, "expected 2 writes after second write")
@@ -93,7 +93,7 @@ func TestCopyLineByLine(t *testing.T) {
// closing the writer should flush the remaining data
w.Close()
// Wait for the goroutine to finish
// wait for the goroutine to finish
select {
case <-done:
case <-time.After(time.Second):
@@ -131,7 +131,7 @@ func TestCopyLineByLineSizeLimit(t *testing.T) {
}
writes := testWriter.GetWrites()
assert.Lenf(t, testWriter.GetWrites(), 0, "expected 0 writes, got: %v", writes)
assert.Lenf(t, testWriter.GetWrites(), 1, "expected 1 writes, got: %v", writes)
// write more bytes
if _, err := w.Write([]byte("67\n89")); err != nil {
@@ -167,3 +167,189 @@ func TestStringReader(t *testing.T) {
writes := testWriter.GetWrites()
assert.Lenf(t, writes, 3, "expected 3 writes, got: %v", writes)
}
func TestCopyLineByLineNewlineCharacter(t *testing.T) {
r, w := io.Pipe()
testWriter := &testWriter{
Mutex: &sync.Mutex{},
writes: make([]string, 0),
}
done := make(chan struct{})
go func() {
err := log.CopyLineByLine(testWriter, r, 4)
assert.NoError(t, err)
close(done)
}()
// write one newline character before the maximum size of the buffer
if _, err := w.Write([]byte("123\n45678")); err != nil {
t.Fatalf("unexpected error: %v", err)
}
// wait until 2 writes have occurred
assert.Eventually(t, func() bool {
return len(testWriter.GetWrites()) == 2
}, time.Second, 5*time.Millisecond, "expected 2 writes after first write")
writes := testWriter.GetWrites()
writtenData := strings.Join(writes, "-")
assert.Equal(t, "123\n-4567", writtenData)
// write one newline character at the beginning before the maximum size of the buffer
if _, err := w.Write([]byte("\n123\n45678")); err != nil {
t.Fatalf("unexpected error: %v", err)
}
// wait until 5 writes have occurred (2 from before + 3 new ones)
assert.Eventually(t, func() bool {
return len(testWriter.GetWrites()) == 5
}, time.Second, 5*time.Millisecond, "expected 5 writes total after second write")
writes = testWriter.GetWrites()
writtenData = strings.Join(writes, "-")
assert.Equal(t, "123\n-4567-8\n-123\n-4567", writtenData)
// Close the writer first to signal EOF
w.Close()
// wait for the goroutine to finish
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("timeout waiting for goroutine to finish")
}
// Verify final flush (should have "8" remaining)
writes = testWriter.GetWrites()
writtenData = strings.Join(writes, "-")
assert.Equal(t, "123\n-4567-8\n-123\n-4567-8", writtenData)
}
// TestCopyLineByLineLongLine is for the long line testing to trigger the writeChunks function.
func TestCopyLineByLineLongLine(t *testing.T) {
r, w := io.Pipe()
testWriter := &testWriter{
Mutex: &sync.Mutex{},
writes: make([]string, 0),
}
done := make(chan struct{})
// max size = 10
maxSize := 10
go func() {
err := log.CopyLineByLine(testWriter, r, maxSize)
assert.NoError(t, err)
close(done)
}()
// wait for the goroutine to start
time.Sleep(time.Millisecond)
// will trigger the writeChunks function
if _, err := w.Write([]byte("this is a very long line\n")); err != nil {
t.Fatalf("unexpected error: %v", err)
}
// wait for the writer to write
time.Sleep(time.Millisecond)
// verify the number of writes is equal to 3
assert.Eventually(t, func() bool {
return len(testWriter.GetWrites()) == 3
}, time.Second, 5*time.Millisecond, "expected 3 writes after first write")
// verify all data was written correctly
writtenData := ""
assert.Eventually(t, func() bool {
writtenData = strings.Join(testWriter.GetWrites(), "-")
return writtenData == "this is a -very long -line\n"
}, time.Second, 5*time.Millisecond, "unexpected writtenData: %s", writtenData)
// closing the writer should flush the remaining data
w.Close()
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("timeout waiting for goroutine to finish")
}
}
// TestCopyLineByLineWriteChunks is for the writeChunks function testing.
func TestCopyLineByLineWriteChunks(t *testing.T) {
r, w := io.Pipe()
testWriter := &testWriter{
Mutex: &sync.Mutex{},
writes: make([]string, 0),
}
done := make(chan struct{})
// max size = 8
maxSize := 8
go func() {
err := log.CopyLineByLine(testWriter, r, maxSize)
assert.NoError(t, err)
close(done)
}()
// first line: 20 chars + newline = 21 bytes (will be chunked: 8 + 8 + 5)
// second line: 5 chars + newline = 6 bytes (normal write, no chunking)
// third line: 16 chars + newline = 17 bytes (will be chunked: 8 + 9)
input := "12345678901234567890\n" +
"short\n" +
"abcdefghijklmnop\n"
if _, err := w.Write([]byte(input)); err != nil {
t.Fatalf("unexpected error: %v", err)
}
// verify the number of writes is equal to 7
assert.Eventually(t, func() bool {
return len(testWriter.GetWrites()) == 7
}, time.Second, 5*time.Millisecond, "expected 7 writes after first write")
// verify all data was written correctly
writtenData := ""
assert.Eventually(t, func() bool {
writtenData = strings.Join(testWriter.GetWrites(), "")
return writtenData == input
}, time.Second, 5*time.Millisecond, "unexpected writtenData: %s", writtenData)
// verify the number of writes
expectedWrites := 7
assert.Eventually(t, func() bool {
return len(testWriter.GetWrites()) == expectedWrites
}, time.Second, 5*time.Millisecond, "expected %d writes, got %d: %v", expectedWrites, len(testWriter.GetWrites()), testWriter.GetWrites())
writes := testWriter.GetWrites()
// verify first line chunks
assert.Equal(t, "12345678", writes[0], "first chunk of first line")
assert.Equal(t, "90123456", writes[1], "second chunk of first line")
assert.Equal(t, "7890\n", writes[2], "third chunk of first line")
// verify second line (not chunked)
assert.Equal(t, "short\n", writes[3], "second line should not be chunked")
// verify third line chunks
assert.Equal(t, "abcdefgh", writes[4], "first chunk of third line")
assert.Equal(t, "ijklmnop", writes[5], "second chunk of third line")
assert.Equal(t, "\n", writes[6], "third chunk of third line (just newline)")
// closing the writer should flush the remaining data
w.Close()
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("timeout waiting for goroutine to finish")
}
}