From 7ce65365b11a2de5e1ae05701932ce5353b4819a Mon Sep 17 00:00:00 2001 From: LUKIEYF <103362370+LUKIEYF@users.noreply.github.com> Date: Thu, 30 Oct 2025 02:13:20 +0800 Subject: [PATCH] Enhancement log stream reading and writing and handle new lines and max-size (#5683) Co-authored-by: qwerty287 <80460567+qwerty287@users.noreply.github.com> Co-authored-by: Anbraten <6918444+anbraten@users.noreply.github.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- pipeline/log/utils.go | 57 +++++++++-- pipeline/log/utils_test.go | 194 ++++++++++++++++++++++++++++++++++++- 2 files changed, 241 insertions(+), 10 deletions(-) diff --git a/pipeline/log/utils.go b/pipeline/log/utils.go index 600a6271e..84a29e946 100644 --- a/pipeline/log/utils.go +++ b/pipeline/log/utils.go @@ -16,6 +16,7 @@ package log import ( "bufio" + "bytes" "errors" "io" ) @@ -43,18 +44,62 @@ func writeChunks(dst io.Writer, data []byte, size int) error { func CopyLineByLine(dst io.Writer, src io.Reader, maxSize int) error { r := bufio.NewReader(src) + // buffer to cache + var buf []byte + // buffer to read + readBuf := make([]byte, maxSize) for { - // TODO: read til newline or maxSize directly - line, err := r.ReadBytes('\n') - if len(line) > 0 { - if err := writeChunks(dst, line, maxSize); err != nil { - return err + n, err := r.Read(readBuf) + + // handle the data first + if n > 0 { + // if it has data, cache into the buffer + buf = append(buf, readBuf[:n]...) + + processBuffer: + for len(buf) > 0 { + // find the index to anchor the new line + idx := bytes.IndexByte(buf, '\n') + switch { + case idx >= 0: + // found the new line, write to the dst + lineEnd := idx + 1 + if lineEnd > maxSize { + if wErr := writeChunks(dst, buf[:lineEnd], maxSize); wErr != nil { + return wErr + } + } else { + if _, wErr := dst.Write(buf[:lineEnd]); wErr != nil { + return wErr + } + } + // remove the line written from the buffer + buf = buf[lineEnd:] + case len(buf) >= maxSize: + if _, wErr := dst.Write(buf[:maxSize]); wErr != nil { + return wErr + } + buf = buf[maxSize:] + default: + // no newline found and buffer not full, read more data + break processBuffer + } } } + + // and then if it is EOF, write the remaining data and break the loop if errors.Is(err, io.EOF) { + if len(buf) == 0 { + break + } + if _, wErr := dst.Write(buf); wErr != nil { + return wErr + } break - } else if err != nil { + } + + if err != nil { return err } } diff --git a/pipeline/log/utils_test.go b/pipeline/log/utils_test.go index c5d6effb3..8b83b67ea 100644 --- a/pipeline/log/utils_test.go +++ b/pipeline/log/utils_test.go @@ -71,7 +71,7 @@ func TestCopyLineByLine(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - // Wait until no writes have occurred (should be immediate) + // wait until no writes have occurred (should be immediate) assert.Eventually(t, func() bool { return len(testWriter.GetWrites()) == 0 }, time.Second, 5*time.Millisecond, "expected 0 writes after first write") @@ -81,7 +81,7 @@ func TestCopyLineByLine(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - // Wait until two writes have occurred + // wait until two writes have occurred assert.Eventually(t, func() bool { return len(testWriter.GetWrites()) == 2 }, time.Second, 5*time.Millisecond, "expected 2 writes after second write") @@ -93,7 +93,7 @@ func TestCopyLineByLine(t *testing.T) { // closing the writer should flush the remaining data w.Close() - // Wait for the goroutine to finish + // wait for the goroutine to finish select { case <-done: case <-time.After(time.Second): @@ -131,7 +131,7 @@ func TestCopyLineByLineSizeLimit(t *testing.T) { } writes := testWriter.GetWrites() - assert.Lenf(t, testWriter.GetWrites(), 0, "expected 0 writes, got: %v", writes) + assert.Lenf(t, testWriter.GetWrites(), 1, "expected 1 writes, got: %v", writes) // write more bytes if _, err := w.Write([]byte("67\n89")); err != nil { @@ -167,3 +167,189 @@ func TestStringReader(t *testing.T) { writes := testWriter.GetWrites() assert.Lenf(t, writes, 3, "expected 3 writes, got: %v", writes) } + +func TestCopyLineByLineNewlineCharacter(t *testing.T) { + r, w := io.Pipe() + + testWriter := &testWriter{ + Mutex: &sync.Mutex{}, + writes: make([]string, 0), + } + + done := make(chan struct{}) + + go func() { + err := log.CopyLineByLine(testWriter, r, 4) + assert.NoError(t, err) + close(done) + }() + + // write one newline character before the maximum size of the buffer + if _, err := w.Write([]byte("123\n45678")); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // wait until 2 writes have occurred + assert.Eventually(t, func() bool { + return len(testWriter.GetWrites()) == 2 + }, time.Second, 5*time.Millisecond, "expected 2 writes after first write") + + writes := testWriter.GetWrites() + writtenData := strings.Join(writes, "-") + assert.Equal(t, "123\n-4567", writtenData) + + // write one newline character at the beginning before the maximum size of the buffer + if _, err := w.Write([]byte("\n123\n45678")); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // wait until 5 writes have occurred (2 from before + 3 new ones) + assert.Eventually(t, func() bool { + return len(testWriter.GetWrites()) == 5 + }, time.Second, 5*time.Millisecond, "expected 5 writes total after second write") + + writes = testWriter.GetWrites() + writtenData = strings.Join(writes, "-") + assert.Equal(t, "123\n-4567-8\n-123\n-4567", writtenData) + + // Close the writer first to signal EOF + w.Close() + + // wait for the goroutine to finish + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for goroutine to finish") + } + + // Verify final flush (should have "8" remaining) + writes = testWriter.GetWrites() + writtenData = strings.Join(writes, "-") + assert.Equal(t, "123\n-4567-8\n-123\n-4567-8", writtenData) +} + +// TestCopyLineByLineLongLine is for the long line testing to trigger the writeChunks function. +func TestCopyLineByLineLongLine(t *testing.T) { + r, w := io.Pipe() + + testWriter := &testWriter{ + Mutex: &sync.Mutex{}, + writes: make([]string, 0), + } + + done := make(chan struct{}) + + // max size = 10 + maxSize := 10 + + go func() { + err := log.CopyLineByLine(testWriter, r, maxSize) + assert.NoError(t, err) + close(done) + }() + + // wait for the goroutine to start + time.Sleep(time.Millisecond) + + // will trigger the writeChunks function + if _, err := w.Write([]byte("this is a very long line\n")); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // wait for the writer to write + time.Sleep(time.Millisecond) + + // verify the number of writes is equal to 3 + assert.Eventually(t, func() bool { + return len(testWriter.GetWrites()) == 3 + }, time.Second, 5*time.Millisecond, "expected 3 writes after first write") + + // verify all data was written correctly + writtenData := "" + assert.Eventually(t, func() bool { + writtenData = strings.Join(testWriter.GetWrites(), "-") + return writtenData == "this is a -very long -line\n" + }, time.Second, 5*time.Millisecond, "unexpected writtenData: %s", writtenData) + + // closing the writer should flush the remaining data + w.Close() + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for goroutine to finish") + } +} + +// TestCopyLineByLineWriteChunks is for the writeChunks function testing. +func TestCopyLineByLineWriteChunks(t *testing.T) { + r, w := io.Pipe() + + testWriter := &testWriter{ + Mutex: &sync.Mutex{}, + writes: make([]string, 0), + } + + done := make(chan struct{}) + + // max size = 8 + maxSize := 8 + + go func() { + err := log.CopyLineByLine(testWriter, r, maxSize) + assert.NoError(t, err) + close(done) + }() + + // first line: 20 chars + newline = 21 bytes (will be chunked: 8 + 8 + 5) + // second line: 5 chars + newline = 6 bytes (normal write, no chunking) + // third line: 16 chars + newline = 17 bytes (will be chunked: 8 + 9) + input := "12345678901234567890\n" + + "short\n" + + "abcdefghijklmnop\n" + + if _, err := w.Write([]byte(input)); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // verify the number of writes is equal to 7 + assert.Eventually(t, func() bool { + return len(testWriter.GetWrites()) == 7 + }, time.Second, 5*time.Millisecond, "expected 7 writes after first write") + + // verify all data was written correctly + writtenData := "" + assert.Eventually(t, func() bool { + writtenData = strings.Join(testWriter.GetWrites(), "") + return writtenData == input + }, time.Second, 5*time.Millisecond, "unexpected writtenData: %s", writtenData) + + // verify the number of writes + expectedWrites := 7 + assert.Eventually(t, func() bool { + return len(testWriter.GetWrites()) == expectedWrites + }, time.Second, 5*time.Millisecond, "expected %d writes, got %d: %v", expectedWrites, len(testWriter.GetWrites()), testWriter.GetWrites()) + + writes := testWriter.GetWrites() + // verify first line chunks + assert.Equal(t, "12345678", writes[0], "first chunk of first line") + assert.Equal(t, "90123456", writes[1], "second chunk of first line") + assert.Equal(t, "7890\n", writes[2], "third chunk of first line") + + // verify second line (not chunked) + assert.Equal(t, "short\n", writes[3], "second line should not be chunked") + + // verify third line chunks + assert.Equal(t, "abcdefgh", writes[4], "first chunk of third line") + assert.Equal(t, "ijklmnop", writes[5], "second chunk of third line") + assert.Equal(t, "\n", writes[6], "third chunk of third line (just newline)") + + // closing the writer should flush the remaining data + w.Close() + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for goroutine to finish") + } +}