github.com/segmentio/kafka-go

github.com/segmentio/kafka-go

Kafka library in Go

Add struct fields to kafka.Reader

Join Point
Definition of kafka.Reader
Advice
Introduce new declarations:
// Using the following synthetic imports:
import (
	ddtrace "gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
	strings "strings"
	tracing "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0/internal/tracing"
)
type __dd_wMessage struct {
	*Message
}

func __dd_wrapMessage(msg *Message) tracing.Message {
	if msg == nil {
		return nil
	}
	return &__dd_wMessage{msg}
}

func (w *__dd_wMessage) GetValue() []byte {
	return w.Value
}

func (w *__dd_wMessage) GetKey() []byte {
	return w.Key
}

func (w *__dd_wMessage) GetHeaders() []tracing.Header {
	hs := make([]tracing.Header, 0, len(w.Headers))
	for _, h := range w.Headers {
		hs = append(hs, __dd_wrapHeader(h))
	}
	return hs
}

func (w *__dd_wMessage) SetHeaders(headers []tracing.Header) {
	hs := make([]Header, 0, len(headers))
	for _, h := range headers {
		hs = append(hs, Header{
			Key:   h.GetKey(),
			Value: h.GetValue(),
		})
	}
	w.Message.Headers = hs
}

func (w *__dd_wMessage) GetTopic() string {
	return w.Topic
}

func (w *__dd_wMessage) GetPartition() int {
	return w.Partition
}

func (w *__dd_wMessage) GetOffset() int64 {
	return w.Offset
}

type __dd_wHeader struct {
	Header
}

func __dd_wrapHeader(h Header) tracing.Header {
	return &__dd_wHeader{h}
}

func (w __dd_wHeader) GetKey() string {
	return w.Key
}

func (w __dd_wHeader) GetValue() []byte {
	return w.Value
}

type __dd_wWriter struct {
	*Writer
}

func (w *__dd_wWriter) GetTopic() string {
	return w.Topic
}

func __dd_wrapTracingWriter(w *Writer) tracing.Writer {
	return &__dd_wWriter{w}
}

func __dd_initReader(r *Reader) {
	if r.__dd_tracer != nil {
		return
	}
	kafkaCfg := tracing.KafkaConfig{}
	if r.Config().Brokers != nil {
		kafkaCfg.BootstrapServers = strings.Join(r.Config().Brokers, ",")
	}
	if r.Config().GroupID != "" {
		kafkaCfg.ConsumerGroupID = r.Config().GroupID
	}
	r.__dd_tracer = tracing.NewTracer(kafkaCfg)
}

type __dd_span = ddtrace.Span
Advice
Add new field __dd_tracer of type *tracing.Tracer.
Advice
Add new field __dd_prevSpan of type __dd_span.

Trace kafka.Reader#FetchMessage

Join Point
Function body
  • Function declaration
Advice
Prepend statements produced by the following template:
// Using the following synthetic imports:
import (
	tracing "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0/internal/tracing"
)
{{- $r := .Function.Receiver -}}
{{- $ctx := .Function.Argument 0 -}}
{{- $msg := .Function.Result 0 -}}
{{- $err := .Function.Result 1 -}}
__dd_initReader(r)
if {{ $r }}.__dd_prevSpan != nil {
	{{ $r }}.__dd_prevSpan.Finish()
	{{ $r }}.__dd_prevSpan = nil
}
defer func() {
	if {{ $err }} != nil {
		return
	}
	tMsg := __dd_wrapMessage(&{{ $msg }})
	{{ $r }}.__dd_prevSpan = {{ $r }}.__dd_tracer.StartConsumeSpan({{ $ctx }}, tMsg)
	{{ $r }}.__dd_tracer.SetConsumeDSMCheckpoint(tMsg)
}()

Trace kafka.Reader#Close

Join Point
Function body
  • Function declaration
Advice
Prepend statements produced by the following template:
{{- $r := .Function.Receiver -}}
if {{ $r }}.__dd_prevSpan != nil {
	{{ $r }}.__dd_prevSpan.Finish()
	{{ $r }}.__dd_prevSpan = nil
}

Add struct fields to kafka.Writer

Join Point
Definition of kafka.Writer
Advice
Introduce new declarations:
// Using the following synthetic imports:
import (
	tracing "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0/internal/tracing"
)
func __dd_initWriter(w *Writer) {
	if w.__dd_tracer != nil {
		return
	}
	kafkaCfg := tracing.KafkaConfig{
		BootstrapServers: w.Addr.String(),
	}
	w.__dd_tracer = tracing.NewTracer(kafkaCfg)
}
Advice
Add new field __dd_tracer of type *tracing.Tracer.

Trace kafka.Writer#WriteMessages

Join Point
Function body
  • Function declaration
Advice
Prepend statements produced by the following template:
// Using the following synthetic imports:
import (
	ddtrace "gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
	tracer "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
	tracing "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0/internal/tracing"
)
{{- $w := .Function.Receiver -}}
{{- $ctx := .Function.Argument 0 -}}
{{- $msgs := .Function.Argument 1 -}}
{{- $err := .Function.Result 0 -}}
spans := make([]ddtrace.Span, len({{ $msgs }}))
__dd_initWriter(w)

var spanOpts []tracer.StartSpanOption
prevSpan, ok := tracer.SpanFromContext({{ $ctx }})
if ok {
	spanOpts = append(spanOpts, tracer.ChildOf(prevSpan.Context()))
}

for i := range {{ $msgs }} {
	tMsg := __dd_wrapMessage(&{{ $msgs }}[i])
	tWriter := __dd_wrapTracingWriter({{ $w }})
	spans[i] = {{ $w }}.__dd_tracer.StartProduceSpan(nil, tWriter, tMsg, spanOpts...)
	{{ $w }}.__dd_tracer.SetProduceDSMCheckpoint(tMsg, tWriter)
}

defer func() {
	for i, span := range spans {
		{{ $w }}.__dd_tracer.FinishProduceSpan(span, {{ $msgs }}[i].Partition, {{ $msgs }}[i].Offset, {{ $err }})
	}
}()