-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(agent): output buffer persistence #15221
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @DStrand1 for this draft. It looks quite nice overall and I left some suggestions in the code.
What concerns me a bit is the underlying WAL implementation. Looking through their code I discovered some fundamental flaws.
- The WAL file is not opened with SYNC, so any issue on disk or with Telegraf will loose more metrics than expected (in the worst case with a lot of RAM) all metrics are lost...
- The filenames are saved with the pattern
<user defined name>.<index>.<offset>
and are string-sorted, so if theindex
oroffset
cross the order-boundary (i.e. a digit is added9 -> 10
) the order is messed up aslala.10.xyz
followslala.1.xyz
instead oflala.9.xyz
! This will in turn mess up the metric order. - The WAL implementation does not care about removing files, so if you got WAL file(s) and restart Telegraf multiple time the metrics in the WAL will be written multiple times without further handling. Doing this handling outside of the WAL implementation is hard as you cannot know which file was processed.
- In the current form the WAL implementation is not capable of truncate the files front-to-back, so metrics are prone to be sent multiple times if the file was not completely flushed...
There are more (smaller) issues, but looking at the issues above I think you should go for another WAL library that is more mature. I looked at https://github.com/tidwall/wal in the past and think it can do what we need. Not a must, just a suggestion. ;-)
Thanks for the PR, super excided to see this. To capture what we talked about in pairs; specifically, to have you address some of the initial comments and switch to the proposed WAL library. Let's plan to talk through where you are at or any issues with the new library in Monday's pairs. Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @DStrand1 for the nice update! My only concern (despite test-coverage) is the error-handling. IMO we should pass errors up to be able to log them e.g. if we are running out-of-disk. For unrecoverable errors (e.g. corrupt WAL files, non-serializable metrics, etc) we probably should panic, but everything else should propagate up to the plugin level (at least).
models/buffer_disk.go
Outdated
func (b *DiskBuffer) addSingle(metric telegraf.Metric) bool { | ||
err := b.walFile.Write(b.writeIndex(), b.metricToBytes(metric)) | ||
metric.Accept() | ||
if err == nil { | ||
b.metricAdded() | ||
return true | ||
} | ||
return false | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it really worth the code duplication or should we use the batch-interface instead?
return index | ||
} | ||
|
||
func (b *DiskBuffer) Add(metrics ...telegraf.Metric) int { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we should return an error here to log it or do other mitigation...
return written | ||
} | ||
|
||
func (b *DiskBuffer) Batch(batchSize int) []telegraf.Metric { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return the error.
Download PR build artifacts for linux_amd64.tar.gz, darwin_arm64.tar.gz, and windows_amd64.zip. 📦 Click here to get additional PR build artifactsArtifact URLs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update @DStrand1! Here are my major concerns with the disk implementation
- The error handling is not in a good shape yet, we panic in quite a few places, allowing for no mitigation or graceful operation. While this is OK for cases where we can be sure no error occurs (like when serializing the metric), I do have some headaches with the WAL writing not producing an error or panicking as there might be a few cases where errors can occur like permission issues, disk full etc.
- We must provide a way to close the WAL file in a clean fashion in case the underlying library relies on this. Please extend the interface accordingly!
- We cannot
accept
metrics in the current spot as it is not yet guaranteed to be written to disk. Please carefully check for those issues as we really must guarantee that if we say we accepted the metric it will not be lost! - We are currently prone to accept the metric twice, once in the
addBatch
path and once when the output callsAccept()
for the same metric. IIRC this will cause a panic as the refcount becomes negative... I think each buffer implementation requires an own way ofmetricWritten
... - There is an undefined situation for what happens if the output drops or rejects the metric? Do we remove them from disk? If so, the input will never notice that the metric is dropped as we already accepted it... We must clarify this case (similar for reject)!
case "overflow": | ||
// todo implementme | ||
// todo log currently unimplemented | ||
return NewMemoryBuffer(capacity, bm) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this for now and add it back in a later PR!
// todo log invalid buffer strategy configuration provided, falling back to memory | ||
return NewMemoryBuffer(capacity, bm) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. Please fail here with
// todo log invalid buffer strategy configuration provided, falling back to memory | |
return NewMemoryBuffer(capacity, bm) | |
return nil, fmt.Errorf("invalid buffer strategy %q", strategy) |
Don't try to be clever as this leads to unexpected behavior in a very critical area!
"github.com/influxdata/telegraf" | ||
"github.com/influxdata/telegraf/metric" | ||
"github.com/tidwall/wal" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"github.com/influxdata/telegraf" | |
"github.com/influxdata/telegraf/metric" | |
"github.com/tidwall/wal" | |
) | |
"github.com/tidwall/wal" | |
"github.com/influxdata/telegraf" | |
"github.com/influxdata/telegraf/metric" | |
) |
walFile *wal.Log | ||
walFilePath string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might want to shorten the names e.g.
walFile *wal.Log | |
walFilePath string | |
file *wal.Log | |
path string |
dropped := 0 | ||
for _, m := range metrics { | ||
if !b.addSingle(m) { | ||
dropped++ | ||
} | ||
} | ||
b.BufferSize.Set(int64(b.length())) | ||
return dropped | ||
// todo implement batched writes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't you use the addBatch
function here?
if err != nil { | ||
panic(err) | ||
} | ||
m.Accept() // accept here, since the metric object is no longer retained from here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we can accept the metric here. We need to really be sure it's written first, even though this might require another for loop...
|
||
if b.length() == 0 { | ||
// no metrics in the wal file, so return an empty array | ||
return make([]telegraf.Metric, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return make([]telegraf.Metric, 0) | |
return []telegraf.Metric{} |
I think you might even be able to return nil
here...
b.batchFirst = b.readIndex() | ||
metrics := make([]telegraf.Metric, b.batchSize) | ||
|
||
for i := 0; i < int(b.batchSize); i++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for i := 0; i < int(b.batchSize); i++ { | |
for i := b.batchFirst; i < b.batchFirst + b.batchSize; i++ { |
is easier to use in the code I guess...
err := b.walFile.WriteBatch(batch) | ||
if err != nil { | ||
return 0 // todo error handle, test if a partial write occur | ||
} | ||
return written |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot do this! Either we panic here if we decide this is the right thing to do or we do proper error handling.
Imagine a use-case where it is crucial to collect all metrics and not loose any and your disk runs full. The user might have a kafka instance for buffering. Now you accepted the metrics in the for-loop above but then writing fails here due to the disk being full... There will be no error, Telegraf will show zero metrics written (just as if no metric arrived at the output) but the metrics are removed from Kafka because we accepted them. Now imaging the fun debugging this situation! :-)
@@ -0,0 +1,22 @@ | |||
package models |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please move this to the tests instead of adding another package.
Summary
Implements the
write-through
buffer persistence strategy detailed in the spec added in #14928.Currently this PR is mostly a draft of separating the output buffer into multiple implementations, as well as experimentation with a WAL file library. Largest outstanding issue is around metric serialization to
[]byte
.encoding/gob
, which is what the PR is currently doing. However this has issues with un-exported fields, so I need to look more into how to work around thisChecklist
Related issues
Related to #802, #14805