diff --git a/perf/reader.go b/perf/reader.go index 1d40e015c..cd249e187 100644 --- a/perf/reader.go +++ b/perf/reader.go @@ -20,6 +20,8 @@ var ( errEOR = errors.New("end of ring") ) +var perfEventHeaderSize = binary.Size(perfEventHeader{}) + // perfEventHeader must match 'struct perf_event_header` in . type perfEventHeader struct { Type uint32 @@ -47,34 +49,39 @@ type Record struct { LostSamples uint64 } -// NB: Has to be preceded by a call to ring.loadHead. -func readRecordFromRing(ring *perfEventRing) (Record, error) { - defer ring.writeTail() - return readRecord(ring, ring.cpu) -} - -func readRecord(rd io.Reader, cpu int) (Record, error) { - var header perfEventHeader - err := binary.Read(rd, internal.NativeEndian, &header) - if err == io.EOF { - return Record{}, errEOR +// Read a record from a reader and tag it as being from the given CPU. +// +// buf must be at least perfEventHeaderSize bytes long. +func readRecord(rd io.Reader, rec *Record, buf []byte) error { + // Assert that the buffer is large enough. + buf = buf[:perfEventHeaderSize] + _, err := io.ReadFull(rd, buf) + if errors.Is(err, io.EOF) { + return errEOR + } else if err != nil { + return fmt.Errorf("read perf event header: %v", err) } - if err != nil { - return Record{}, fmt.Errorf("can't read event header: %v", err) + header := perfEventHeader{ + internal.NativeEndian.Uint32(buf[0:4]), + internal.NativeEndian.Uint16(buf[4:6]), + internal.NativeEndian.Uint16(buf[6:8]), } switch header.Type { case unix.PERF_RECORD_LOST: - lost, err := readLostRecords(rd) - return Record{CPU: cpu, LostSamples: lost}, err + rec.RawSample = rec.RawSample[:0] + rec.LostSamples, err = readLostRecords(rd) + return err case unix.PERF_RECORD_SAMPLE: - sample, err := readRawSample(rd) - return Record{CPU: cpu, RawSample: sample}, err + rec.LostSamples = 0 + // We can reuse buf here because perfEventHeaderSize > perfEventSampleSize. + rec.RawSample, err = readRawSample(rd, buf, rec.RawSample) + return err default: - return Record{}, &unknownEventError{header.Type} + return &unknownEventError{header.Type} } } @@ -93,16 +100,32 @@ func readLostRecords(rd io.Reader) (uint64, error) { return lostHeader.Lost, nil } -func readRawSample(rd io.Reader) ([]byte, error) { - // This must match 'struct perf_event_sample in kernel sources. - var size uint32 - if err := binary.Read(rd, internal.NativeEndian, &size); err != nil { - return nil, fmt.Errorf("can't read sample size: %v", err) +var perfEventSampleSize = binary.Size(uint32(0)) + +// This must match 'struct perf_event_sample in kernel sources. +type perfEventSample struct { + Size uint32 +} + +func readRawSample(rd io.Reader, buf, sampleBuf []byte) ([]byte, error) { + buf = buf[:perfEventSampleSize] + if _, err := io.ReadFull(rd, buf); err != nil { + return nil, fmt.Errorf("read sample size: %v", err) + } + + sample := perfEventSample{ + internal.NativeEndian.Uint32(buf), + } + + var data []byte + if size := int(sample.Size); cap(sampleBuf) < size { + data = make([]byte, size) + } else { + data = sampleBuf[:size] } - data := make([]byte, int(size)) if _, err := io.ReadFull(rd, data); err != nil { - return nil, fmt.Errorf("can't read sample: %v", err) + return nil, fmt.Errorf("read sample: %v", err) } return data, nil } @@ -123,6 +146,7 @@ type Reader struct { rings []*perfEventRing epollEvents []unix.EpollEvent epollRings []*perfEventRing + eventHeader []byte // pauseFds are a copy of the fds in 'rings', protected by 'pauseMu'. // These allow Pause/Resume to be executed independently of any ongoing @@ -215,6 +239,7 @@ func NewReaderWithOptions(array *ebpf.Map, perCPUBuffer int, opts ReaderOptions) poller: poller, epollEvents: make([]unix.EpollEvent, len(rings)), epollRings: make([]*perfEventRing, 0, len(rings)), + eventHeader: make([]byte, perfEventHeaderSize), pauseFds: pauseFds, } if err = pr.Resume(); err != nil { @@ -266,18 +291,24 @@ func (pr *Reader) Close() error { // // Calling Close interrupts the function. func (pr *Reader) Read() (Record, error) { + var r Record + return r, pr.ReadInto(&r) +} + +// ReadInto is like Read except that it allows reusing Record and associated buffers. +func (pr *Reader) ReadInto(rec *Record) error { pr.mu.Lock() defer pr.mu.Unlock() if pr.rings == nil { - return Record{}, fmt.Errorf("perf ringbuffer: %w", ErrClosed) + return fmt.Errorf("perf ringbuffer: %w", ErrClosed) } for { if len(pr.epollRings) == 0 { nEvents, err := pr.poller.Wait(pr.epollEvents) if err != nil { - return Record{}, err + return err } for _, event := range pr.epollEvents[:nEvents] { @@ -294,7 +325,7 @@ func (pr *Reader) Read() (Record, error) { // Start at the last available event. The order in which we // process them doesn't matter, and starting at the back allows // resizing epollRings to keep track of processed rings. - record, err := readRecordFromRing(pr.epollRings[len(pr.epollRings)-1]) + err := pr.readRecordFromRing(rec, pr.epollRings[len(pr.epollRings)-1]) if err == errEOR { // We've emptied the current ring buffer, process // the next one. @@ -302,7 +333,7 @@ func (pr *Reader) Read() (Record, error) { continue } - return record, err + return err } } @@ -353,6 +384,14 @@ func (pr *Reader) Resume() error { return nil } +// NB: Has to be preceded by a call to ring.loadHead. +func (pr *Reader) readRecordFromRing(rec *Record, ring *perfEventRing) error { + defer ring.writeTail() + + rec.CPU = ring.cpu + return readRecord(ring, rec, pr.eventHeader) +} + type unknownEventError struct { eventType uint32 } diff --git a/perf/reader_test.go b/perf/reader_test.go index 4bf5eca66..f7844910a 100644 --- a/perf/reader_test.go +++ b/perf/reader_test.go @@ -294,7 +294,8 @@ func TestReadRecord(t *testing.T) { t.Fatal(err) } - _, err = readRecord(&buf, 0) + var rec Record + err = readRecord(&buf, &rec, make([]byte, perfEventHeaderSize)) if !IsUnknownEvent(err) { t.Error("readRecord should return unknown event error, got", err) } @@ -415,6 +416,39 @@ func BenchmarkReader(b *testing.B) { } } +func BenchmarkReadInto(b *testing.B) { + prog, events := mustOutputSamplesProg(b, 80) + defer prog.Close() + defer events.Close() + + rd, err := NewReader(events, 4096) + if err != nil { + b.Fatal(err) + } + defer rd.Close() + + buf := make([]byte, 14) + + b.ResetTimer() + b.ReportAllocs() + + var rec Record + for i := 0; i < b.N; i++ { + // NB: Submitting samples into the perf event ring dominates + // the benchmark time unfortunately. + ret, _, err := prog.Test(buf) + if err != nil { + b.Fatal(err) + } else if errno := syscall.Errno(-int32(ret)); errno != 0 { + b.Fatal("Expected 0 as return value, got", errno) + } + + if err := rd.ReadInto(&rec); err != nil { + b.Fatal(err) + } + } +} + // This exists just to make the example below nicer. func bpfPerfEventOutputProgram() (*ebpf.Program, *ebpf.Map) { prog, events, err := outputSamplesProg(5) @@ -468,3 +502,33 @@ func ExampleReader() { // Data is padded with 0 for alignment fmt.Println("Sample:", record.RawSample) } + +// ReadRecord allows reducing memory allocations. +func ExampleReader_ReadInto() { + prog, events := bpfPerfEventOutputProgram() + defer prog.Close() + defer events.Close() + + rd, err := NewReader(events, 4096) + if err != nil { + panic(err) + } + defer rd.Close() + + for i := 0; i < 2; i++ { + // Write out two samples + ret, _, err := prog.Test(make([]byte, 14)) + if err != nil || ret != 0 { + panic("Can't write sample") + } + } + + var rec Record + for i := 0; i < 2; i++ { + if err := rd.ReadInto(&rec); err != nil { + panic(err) + } + + fmt.Println("Sample:", rec.RawSample[:5]) + } +} diff --git a/ringbuf/reader.go b/ringbuf/reader.go index 64e5fec6f..3b1f26ef7 100644 --- a/ringbuf/reader.go +++ b/ringbuf/reader.go @@ -20,6 +20,8 @@ var ( errBusy = errors.New("sample not committed yet") ) +var ringbufHeaderSize = binary.Size(ringbufHeader{}) + // ringbufHeader from 'struct bpf_ringbuf_hdr' in kernel/bpf/ringbuf.c type ringbufHeader struct { Len uint32 @@ -42,23 +44,29 @@ type Record struct { RawSample []byte } -func readRecord(rd *ringbufEventRing) (r Record, err error) { +// Read a record from an event ring. +// +// buf must be at least ringbufHeaderSize bytes long. +func readRecord(rd *ringbufEventRing, rec *Record, buf []byte) error { rd.loadConsumer() - var header ringbufHeader - err = binary.Read(rd, internal.NativeEndian, &header) - if err == io.EOF { - return Record{}, err + + buf = buf[:ringbufHeaderSize] + if _, err := io.ReadFull(rd, buf); err == io.EOF { + return err + } else if err != nil { + return fmt.Errorf("read event header: %w", err) } - if err != nil { - return Record{}, fmt.Errorf("can't read event header: %w", err) + header := ringbufHeader{ + internal.NativeEndian.Uint32(buf[0:4]), + internal.NativeEndian.Uint32(buf[4:8]), } if header.isBusy() { // the next sample in the ring is not committed yet so we // exit without storing the reader/consumer position // and start again from the same position. - return Record{}, fmt.Errorf("%w", errBusy) + return fmt.Errorf("%w", errBusy) } /* read up to 8 byte alignment */ @@ -73,18 +81,22 @@ func readRecord(rd *ringbufEventRing) (r Record, err error) { rd.skipRead(dataLenAligned) rd.storeConsumer() - return Record{}, fmt.Errorf("%w", errDiscard) + return fmt.Errorf("%w", errDiscard) } - data := make([]byte, dataLenAligned) + if cap(rec.RawSample) < int(dataLenAligned) { + rec.RawSample = make([]byte, dataLenAligned) + } else { + rec.RawSample = rec.RawSample[:dataLenAligned] + } - if _, err := io.ReadFull(rd, data); err != nil { - return Record{}, fmt.Errorf("can't read sample: %w", err) + if _, err := io.ReadFull(rd, rec.RawSample); err != nil { + return fmt.Errorf("read sample: %w", err) } rd.storeConsumer() - - return Record{RawSample: data[:header.dataLen()]}, nil + rec.RawSample = rec.RawSample[:header.dataLen()] + return nil } // Reader allows reading bpf_ringbuf_output @@ -96,6 +108,7 @@ type Reader struct { mu sync.Mutex ring *ringbufEventRing epollEvents []unix.EpollEvent + header []byte } // NewReader creates a new BPF ringbuf reader. @@ -129,6 +142,7 @@ func NewReader(ringbufMap *ebpf.Map) (*Reader, error) { poller: poller, ring: ring, epollEvents: make([]unix.EpollEvent, 1), + header: make([]byte, ringbufHeaderSize), }, nil } @@ -159,24 +173,30 @@ func (r *Reader) Close() error { // // Calling Close interrupts the function. func (r *Reader) Read() (Record, error) { + var rec Record + return rec, r.ReadInto(&rec) +} + +// ReadInto is like Read except that it allows reusing Record and associated buffers. +func (r *Reader) ReadInto(rec *Record) error { r.mu.Lock() defer r.mu.Unlock() if r.ring == nil { - return Record{}, fmt.Errorf("ringbuffer: %w", ErrClosed) + return fmt.Errorf("ringbuffer: %w", ErrClosed) } for { _, err := r.poller.Wait(r.epollEvents) if err != nil { - return Record{}, err + return err } - record, err := readRecord(r.ring) + err = readRecord(r.ring, rec, r.header) if errors.Is(err, errBusy) || errors.Is(err, errDiscard) { continue } - return record, err + return err } } diff --git a/ringbuf/reader_test.go b/ringbuf/reader_test.go index 71278fbb6..71e1ea04b 100644 --- a/ringbuf/reader_test.go +++ b/ringbuf/reader_test.go @@ -224,9 +224,6 @@ func BenchmarkReader(b *testing.B) { }, } - b.ResetTimer() - b.ReportAllocs() - for _, bm := range readerBenchmarks { b.Run(bm.name, func(b *testing.B) { prog, events := mustOutputSamplesProg(b, bm.flags, 80) @@ -237,8 +234,13 @@ func BenchmarkReader(b *testing.B) { } defer rd.Close() + buf := make([]byte, 14) + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { - ret, _, err := prog.Benchmark(make([]byte, 14), 1, nil) + ret, _, err := prog.Test(buf) if err != nil { b.Fatal(err) } else if errno := syscall.Errno(-int32(ret)); errno != 0 { @@ -252,3 +254,34 @@ func BenchmarkReader(b *testing.B) { }) } } + +func BenchmarkReadInto(b *testing.B) { + testutils.SkipOnOldKernel(b, "5.8", "BPF ring buffer") + + prog, events := mustOutputSamplesProg(b, 0, 80) + + rd, err := NewReader(events) + if err != nil { + b.Fatal(err) + } + defer rd.Close() + + buf := make([]byte, 14) + + b.ResetTimer() + b.ReportAllocs() + + var rec Record + for i := 0; i < b.N; i++ { + ret, _, err := prog.Test(buf) + if err != nil { + b.Fatal(err) + } else if errno := syscall.Errno(-int32(ret)); errno != 0 { + b.Fatal("Expected 0 as return value, got", errno) + } + + if err := rd.ReadInto(&rec); err != nil { + b.Fatal("Can't read samples:", err) + } + } +}