github.com/segmentio/kafka-go
Kafka library in Go
Add struct fields to kafka.Reader
Definition of
kafka.Reader
Introduce new declarations:
// Using the following synthetic imports:
import (
ddtrace "gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
strings "strings"
tracing "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0/internal/tracing"
)
type __dd_wMessage struct {
*Message
}
func __dd_wrapMessage(msg *Message) tracing.Message {
if msg == nil {
return nil
}
return &__dd_wMessage{msg}
}
func (w *__dd_wMessage) GetValue() []byte {
return w.Value
}
func (w *__dd_wMessage) GetKey() []byte {
return w.Key
}
func (w *__dd_wMessage) GetHeaders() []tracing.Header {
hs := make([]tracing.Header, 0, len(w.Headers))
for _, h := range w.Headers {
hs = append(hs, __dd_wrapHeader(h))
}
return hs
}
func (w *__dd_wMessage) SetHeaders(headers []tracing.Header) {
hs := make([]Header, 0, len(headers))
for _, h := range headers {
hs = append(hs, Header{
Key: h.GetKey(),
Value: h.GetValue(),
})
}
w.Message.Headers = hs
}
func (w *__dd_wMessage) GetTopic() string {
return w.Topic
}
func (w *__dd_wMessage) GetPartition() int {
return w.Partition
}
func (w *__dd_wMessage) GetOffset() int64 {
return w.Offset
}
type __dd_wHeader struct {
Header
}
func __dd_wrapHeader(h Header) tracing.Header {
return &__dd_wHeader{h}
}
func (w __dd_wHeader) GetKey() string {
return w.Key
}
func (w __dd_wHeader) GetValue() []byte {
return w.Value
}
type __dd_wWriter struct {
*Writer
}
func (w *__dd_wWriter) GetTopic() string {
return w.Topic
}
func __dd_wrapTracingWriter(w *Writer) tracing.Writer {
return &__dd_wWriter{w}
}
func __dd_initReader(r *Reader) {
if r.__dd_tracer != nil {
return
}
kafkaCfg := tracing.KafkaConfig{}
if r.Config().Brokers != nil {
kafkaCfg.BootstrapServers = strings.Join(r.Config().Brokers, ",")
}
if r.Config().GroupID != "" {
kafkaCfg.ConsumerGroupID = r.Config().GroupID
}
r.__dd_tracer = tracing.NewTracer(kafkaCfg)
}
type __dd_span = ddtrace.Span
Trace kafka.Reader#FetchMessage
Function body
- Function declaration
- Is method of
*kafka.Reader
- Function name
FetchMessage
Prepend statements produced by the following template:
// Using the following synthetic imports:
import (
tracing "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0/internal/tracing"
)
{{- $r := .Function.Receiver -}}
{{- $ctx := .Function.Argument 0 -}}
{{- $msg := .Function.Result 0 -}}
{{- $err := .Function.Result 1 -}}
__dd_initReader(r)
if {{ $r }}.__dd_prevSpan != nil {
{{ $r }}.__dd_prevSpan.Finish()
{{ $r }}.__dd_prevSpan = nil
}
defer func() {
if {{ $err }} != nil {
return
}
tMsg := __dd_wrapMessage(&{{ $msg }})
{{ $r }}.__dd_prevSpan = {{ $r }}.__dd_tracer.StartConsumeSpan({{ $ctx }}, tMsg)
{{ $r }}.__dd_tracer.SetConsumeDSMCheckpoint(tMsg)
}()
Trace kafka.Reader#Close
Function body
- Function declaration
- Is method of
*kafka.Reader
- Function name
Close
Prepend statements produced by the following template:
{{- $r := .Function.Receiver -}}
if {{ $r }}.__dd_prevSpan != nil {
{{ $r }}.__dd_prevSpan.Finish()
{{ $r }}.__dd_prevSpan = nil
}
Add struct fields to kafka.Writer
Definition of
kafka.Writer
Introduce new declarations:
// Using the following synthetic imports:
import (
tracing "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0/internal/tracing"
)
func __dd_initWriter(w *Writer) {
if w.__dd_tracer != nil {
return
}
kafkaCfg := tracing.KafkaConfig{
BootstrapServers: w.Addr.String(),
}
w.__dd_tracer = tracing.NewTracer(kafkaCfg)
}
Trace kafka.Writer#WriteMessages
Function body
- Function declaration
- Is method of
*kafka.Writer
- Function name
WriteMessages
Prepend statements produced by the following template:
// Using the following synthetic imports:
import (
ddtrace "gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
tracer "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
tracing "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0/internal/tracing"
)
{{- $w := .Function.Receiver -}}
{{- $ctx := .Function.Argument 0 -}}
{{- $msgs := .Function.Argument 1 -}}
{{- $err := .Function.Result 0 -}}
spans := make([]ddtrace.Span, len({{ $msgs }}))
__dd_initWriter(w)
var spanOpts []tracer.StartSpanOption
prevSpan, ok := tracer.SpanFromContext({{ $ctx }})
if ok {
spanOpts = append(spanOpts, tracer.ChildOf(prevSpan.Context()))
}
for i := range {{ $msgs }} {
tMsg := __dd_wrapMessage(&{{ $msgs }}[i])
tWriter := __dd_wrapTracingWriter({{ $w }})
spans[i] = {{ $w }}.__dd_tracer.StartProduceSpan(nil, tWriter, tMsg, spanOpts...)
{{ $w }}.__dd_tracer.SetProduceDSMCheckpoint(tMsg, tWriter)
}
defer func() {
for i, span := range spans {
{{ $w }}.__dd_tracer.FinishProduceSpan(span, {{ $msgs }}[i].Partition, {{ $msgs }}[i].Offset, {{ $err }})
}
}()