1
0
mirror of https://github.com/ManyakRus/starter.git synced 2025-12-18 01:47:27 +02:00

сделал telegram_client.go

This commit is contained in:
Nikitin Aleksandr
2023-05-19 16:05:54 +03:00
parent c03e2072bd
commit 53f4efc185
3465 changed files with 731510 additions and 24593 deletions

View File

@@ -19,7 +19,7 @@ const (
)
const (
// defaultCommitRetries holds the number commit attempts to make
// defaultCommitRetries holds the number of commit attempts to make
// before giving up.
defaultCommitRetries = 3
)
@@ -238,7 +238,7 @@ func (r *Reader) commitLoopInterval(ctx context.Context, gen *Generation) {
commit := func() {
if err := r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries); err != nil {
r.withErrorLogger(func(l Logger) { l.Printf(err.Error()) })
r.withErrorLogger(func(l Logger) { l.Printf("%v", err) })
} else {
offsets.reset()
}
@@ -311,7 +311,7 @@ func (r *Reader) run(cg *ConsumerGroup) {
}
r.stats.errors.observe(1)
r.withErrorLogger(func(l Logger) {
l.Printf(err.Error())
l.Printf("%v", err)
})
// Continue with next attempt...
}
@@ -785,7 +785,7 @@ func (r *Reader) Close() error {
// offset when called. Note that this could result in an offset being committed
// before the message is fully processed.
//
// If more fine grained control of when offsets are committed is required, it
// If more fine-grained control of when offsets are committed is required, it
// is recommended to use FetchMessage with CommitMessages instead.
func (r *Reader) ReadMessage(ctx context.Context) (Message, error) {
m, err := r.FetchMessage(ctx)
@@ -1220,7 +1220,7 @@ func (r *Reader) start(offsetsByPartition map[topicPartition]int64) {
}
// A reader reads messages from kafka and produces them on its channels, it's
// used as an way to asynchronously fetch messages while the main program reads
// used as a way to asynchronously fetch messages while the main program reads
// them using the high level reader API.
type reader struct {
dialer *Dialer
@@ -1346,7 +1346,7 @@ func (r *reader) run(ctx context.Context, offset int64) {
case errors.Is(err, UnknownTopicOrPartition):
r.withErrorLogger(func(log Logger) {
log.Printf("failed to read from current broker for partition %d of %s at offset %d, topic or parition not found on this broker, %v", r.partition, r.topic, toHumanOffset(offset), r.brokers)
log.Printf("failed to read from current broker %v for partition %d of %s at offset %d: %v", r.brokers, r.partition, r.topic, toHumanOffset(offset), err)
})
conn.Close()
@@ -1358,7 +1358,7 @@ func (r *reader) run(ctx context.Context, offset int64) {
case errors.Is(err, NotLeaderForPartition):
r.withErrorLogger(func(log Logger) {
log.Printf("failed to read from current broker for partition %d of %s at offset %d, not the leader", r.partition, r.topic, toHumanOffset(offset))
log.Printf("failed to read from current broker for partition %d of %s at offset %d: %v", r.partition, r.topic, toHumanOffset(offset), err)
})
conn.Close()
@@ -1372,7 +1372,7 @@ func (r *reader) run(ctx context.Context, offset int64) {
// Timeout on the kafka side, this can be safely retried.
errcount = 0
r.withLogger(func(log Logger) {
log.Printf("no messages received from kafka within the allocated time for partition %d of %s at offset %d", r.partition, r.topic, toHumanOffset(offset))
log.Printf("no messages received from kafka within the allocated time for partition %d of %s at offset %d: %v", r.partition, r.topic, toHumanOffset(offset), err)
})
r.stats.timeouts.observe(1)
continue