github.com/confluentinc/confluent-kafka-go/kafka

github.com/confluentinc/confluent-kafka-go/kafka

confluent-kafka-go is a Go library for Apache Kafka

Inject kafka library version (v1)

Join Point
Definition of kafka.Consumer
Advice
Introduce new declarations:
// Using the following synthetic imports:
import (
	tracing "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/internal/tracing"
)
const __dd_ckgoVersion = tracing.CKGoVersion1

Inject kafka library version (v2)

Join Point
Definition of kafka.Consumer
Advice
Introduce new declarations:
// Using the following synthetic imports:
import (
	tracing "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/internal/tracing"
)
const __dd_ckgoVersion = tracing.CKGoVersion2

Inject wrapped types to the kafka package

Join Point
One of
Advice
Introduce new declarations:
// Using the following synthetic imports:
import (
	telemetry "gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry"
	tracer "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
	tracing "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/internal/tracing"
)
type __dd_wMessage struct {
	*Message
}

func __dd_wrapMessage(msg *Message) tracing.Message {
	if msg == nil {
		return nil
	}
	return &__dd_wMessage{msg}
}

func (w *__dd_wMessage) Unwrap() any {
	return w.Message
}

func (w *__dd_wMessage) GetValue() []byte {
	return w.Message.Value
}

func (w *__dd_wMessage) GetKey() []byte {
	return w.Message.Key
}

func (w *__dd_wMessage) GetHeaders() []tracing.Header {
	hs := make([]tracing.Header, 0, len(w.Headers))
	for _, h := range w.Headers {
		hs = append(hs, __dd_wrapHeader(h))
	}
	return hs
}

func (w *__dd_wMessage) SetHeaders(headers []tracing.Header) {
	hs := make([]Header, 0, len(headers))
	for _, h := range headers {
		hs = append(hs, Header{
			Key:   h.GetKey(),
			Value: h.GetValue(),
		})
	}
	w.Message.Headers = hs
}

func (w *__dd_wMessage) GetTopicPartition() tracing.TopicPartition {
	return __dd_wrapTopicPartition(w.Message.TopicPartition)
}

type __dd_wHeader struct {
	Header
}

func __dd_wrapHeader(h Header) tracing.Header {
	return &__dd_wHeader{h}
}

func (w __dd_wHeader) GetKey() string {
	return w.Header.Key
}

func (w __dd_wHeader) GetValue() []byte {
	return w.Header.Value
}

type __dd_wTopicPartition struct {
	TopicPartition
}

func __dd_wrapTopicPartition(tp TopicPartition) tracing.TopicPartition {
	return __dd_wTopicPartition{tp}
}

func __dd_wrapTopicPartitions(tps []TopicPartition) []tracing.TopicPartition {
	wtps := make([]tracing.TopicPartition, 0, len(tps))
	for _, tp := range tps {
		wtps = append(wtps, __dd_wTopicPartition{tp})
	}
	return wtps
}

func (w __dd_wTopicPartition) GetTopic() string {
	if w.Topic == nil {
		return ""
	}
	return *w.Topic
}

func (w __dd_wTopicPartition) GetPartition() int32 {
	return w.Partition
}

func (w __dd_wTopicPartition) GetOffset() int64 {
	return int64(w.Offset)
}

func (w __dd_wTopicPartition) GetError() error {
	return w.Error
}

type __dd_wEvent struct {
	Event
}

func __dd_wrapEvent(event Event) tracing.Event {
	return __dd_wEvent{event}
}

func (w __dd_wEvent) KafkaMessage() (tracing.Message, bool) {
	if m, ok := w.Event.(*Message); ok {
		return __dd_wrapMessage(m), true
	}
	return nil, false
}

func (w __dd_wEvent) KafkaOffsetsCommitted() (tracing.OffsetsCommitted, bool) {
	if oc, ok := w.Event.(OffsetsCommitted); ok {
		return __dd_wrapOffsetsCommitted(oc), true
	}
	return nil, false
}

type __dd_wOffsetsCommitted struct {
	OffsetsCommitted
}

func __dd_wrapOffsetsCommitted(oc OffsetsCommitted) tracing.OffsetsCommitted {
	return __dd_wOffsetsCommitted{oc}
}

func (w __dd_wOffsetsCommitted) GetError() error {
	return w.Error
}

func (w __dd_wOffsetsCommitted) GetOffsets() []tracing.TopicPartition {
	ttps := make([]tracing.TopicPartition, 0, len(w.Offsets))
	for _, tp := range w.Offsets {
		ttps = append(ttps, __dd_wrapTopicPartition(tp))
	}
	return ttps
}

type __dd_wConfigMap struct {
	cfg *ConfigMap
}

func __dd_wrapConfigMap(cm *ConfigMap) tracing.ConfigMap {
	return &__dd_wConfigMap{cm}
}

func (w *__dd_wConfigMap) Get(key string, defVal any) (any, error) {
	return w.cfg.Get(key, defVal)
}

func init() {
	telemetry.LoadIntegration(tracing.ComponentName(__dd_ckgoVersion))
	tracer.MarkIntegrationImported(tracing.IntegrationName(__dd_ckgoVersion))
}

func __dd_newKafkaTracer(opts ...tracing.Option) *tracing.KafkaTracer {
	v, _ := LibraryVersion()
	return tracing.NewKafkaTracer(__dd_ckgoVersion, v, opts...)
}

func __dd_initConsumer(c *Consumer) {
	if c.__dd_tracer != nil {
		return
	}
	var opts []tracing.Option
	if c.__dd_confmap != nil {
		opts = append(opts, tracing.WithConfig(__dd_wrapConfigMap(c.__dd_confmap)))
	}
	c.__dd_tracer = __dd_newKafkaTracer(opts...)
	// TODO: accessing c.events here might break if the library renames this variable...
	c.__dd_events = tracing.WrapConsumeEventsChannel(c.__dd_tracer, c.events, c, __dd_wrapEvent)
}

func __dd_initProducer(p *Producer) {
	if p.__dd_tracer != nil {
		return
	}
	p.__dd_tracer = __dd_newKafkaTracer()
	// TODO: accessing p.events and p.produceChannel here might break if the library renames this variable...
	p.__dd_events = p.events
	p.__dd_produceChannel = tracing.WrapProduceChannel(p.__dd_tracer, p.produceChannel, __dd_wrapMessage)
	if p.__dd_tracer.DSMEnabled() {
		p.__dd_events = tracing.WrapProduceEventsChannel(p.__dd_tracer, p.events, __dd_wrapEvent)
	}
}

type __dd_eventChan = chan Event
type __dd_messageChan = chan *Message
type __dd_kafkaTracer = tracing.KafkaTracer

Add struct fields to kafka.Consumer

Join Point
One of
Advice
Add new field __dd_tracer of type *__dd_kafkaTracer.
Advice
Add new field __dd_events of type __dd_eventChan.
Advice
Add new field __dd_confmap of type *ConfigMap.

Trace kafka.NewConsumer

Join Point
One of
Advice
Prepend statements produced by the following template:
// Using the following synthetic imports:
import (
	tracing "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/internal/tracing"
)
{{- $conf := .Function.Argument 0 -}}
{{- $c := .Function.Result 0 -}}
defer func() {
	if {{ $c }} == nil {
		return
	}
	{{ $c }}.__dd_confmap = {{ $conf }}
	__dd_initConsumer({{ $c }})
}()

Trace kafka.Consumer#Close

Join Point
One of
  • Function body
  • Function body
Advice
Prepend statements produced by the following template:
// Using the following synthetic imports:
import (
	tracing "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/internal/tracing"
)
{{- $c := .Function.Receiver -}}
__dd_initConsumer({{ $c }})
defer func() {
	if {{ $c }}.__dd_events == nil && {{ $c }}.__dd_tracer.PrevSpan != nil {
		{{ $c }}.__dd_tracer.PrevSpan.Finish()
		{{ $c }}.__dd_tracer.PrevSpan = nil
	}
}()

Trace kafka.Consumer#Events

Join Point
One of
  • Function body
  • Function body
Advice
Prepend statements produced by the following template:
// Using the following synthetic imports:
import (
	tracing "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/internal/tracing"
)
{{- $c := .Function.Receiver -}}
{{- $events := .Function.Result 0 -}}
__dd_initConsumer({{ $c }})
defer func() {
	{{ $events }} = {{ $c }}.__dd_events
}()

Trace kafka.Consumer#Poll

Join Point
One of
  • Function body
  • Function body
Advice
Prepend statements produced by the following template:
// Using the following synthetic imports:
import (
	tracing "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/internal/tracing"
)
{{- $c := .Function.Receiver -}}
{{- $event := .Function.Result 0 -}}
__dd_initConsumer({{ $c }})
if {{ $c }}.__dd_tracer.PrevSpan != nil {
	{{ $c }}.__dd_tracer.PrevSpan.Finish()
	{{ $c }}.__dd_tracer.PrevSpan = nil
}
defer func() {
		if msg, ok := {{ $event }}.(*Message); ok {
			tMsg := __dd_wrapMessage(msg)
			{{ $c }}.__dd_tracer.SetConsumeCheckpoint(tMsg)
			{{ $c }}.__dd_tracer.PrevSpan = {{ $c }}.__dd_tracer.StartConsumeSpan(tMsg)
		} else if offset, ok := {{ $event }}.(OffsetsCommitted); ok {
			tOffsets := __dd_wrapTopicPartitions(offset.Offsets)
			{{ $c }}.__dd_tracer.TrackCommitOffsets(tOffsets, offset.Error)
			{{ $c }}.__dd_tracer.TrackHighWatermarkOffset(tOffsets, {{ $c }})
		}
}()

Trace kafka.Consumer#Commit

Join Point
One of
  • Function body
  • Function body
Advice
Prepend statements produced by the following template:
// Using the following synthetic imports:
import (
	tracing "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/internal/tracing"
)
{{- $c := .Function.Receiver -}}
{{- $tps := .Function.Result 0 -}}
{{- $err := .Function.Result 1 -}}
__dd_initConsumer({{ $c }})
defer func() {
	tOffsets := __dd_wrapTopicPartitions({{ $tps }})
	{{ $c }}.__dd_tracer.TrackCommitOffsets(tOffsets, {{ $err }})
	{{ $c }}.__dd_tracer.TrackHighWatermarkOffset(tOffsets, {{ $c }})
}()

Trace kafka.Consumer#CommitMessage

Join Point
One of
  • Function body
    • Function declaration
  • Function body
    • Function declaration
Advice
Prepend statements produced by the following template:
// Using the following synthetic imports:
import (
	tracing "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/internal/tracing"
)
{{- $c := .Function.Receiver -}}
{{- $tps := .Function.Result 0 -}}
{{- $err := .Function.Result 1 -}}
__dd_initConsumer({{ $c }})
defer func() {
	tOffsets := __dd_wrapTopicPartitions({{ $tps }})
	{{ $c }}.__dd_tracer.TrackCommitOffsets(tOffsets, {{ $err }})
	{{ $c }}.__dd_tracer.TrackHighWatermarkOffset(tOffsets, {{ $c }})
}()

Trace kafka.Consumer#CommitOffsets

Join Point
One of
  • Function body
    • Function declaration
  • Function body
    • Function declaration
Advice
Prepend statements produced by the following template:
// Using the following synthetic imports:
import (
	tracing "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/internal/tracing"
)
{{- $c := .Function.Receiver -}}
{{- $tps := .Function.Result 0 -}}
{{- $err := .Function.Result 1 -}}
__dd_initConsumer({{ $c }})
defer func() {
	tOffsets := __dd_wrapTopicPartitions({{ $tps }})
	{{ $c }}.__dd_tracer.TrackCommitOffsets(tOffsets, {{ $err }})
	{{ $c }}.__dd_tracer.TrackHighWatermarkOffset(tOffsets, {{ $c }})
}()

Add struct fields to kafka.Producer

Join Point
One of
Advice
Add new field __dd_tracer of type *__dd_kafkaTracer.
Advice
Add new field __dd_events of type __dd_eventChan.
Advice
Add new field __dd_produceChannel of type __dd_messageChan.

Trace kafka.Producer#Events

Join Point
One of
  • Function body
  • Function body
Advice
Prepend statements produced by the following template:
{{- $p := .Function.Receiver -}}
{{- $events := .Function.Result 0 -}}
__dd_initProducer({{ $p }})
defer func() {
	{{ $events }} = {{ $p }}.__dd_events
}()

Trace kafka.Producer#ProduceChannel

Join Point
One of
  • Function body
    • Function declaration
  • Function body
    • Function declaration
Advice
Prepend statements produced by the following template:
{{- $p := .Function.Receiver -}}
{{- $produceChannel := .Function.Result 0 -}}
__dd_initProducer({{ $p }})
defer func() {
	{{ $produceChannel }} = {{ $p }}.__dd_produceChannel
}()

Trace kafka.Producer#Close

Join Point
One of
  • Function body
  • Function body
Advice
Prepend statements produced by the following template:
{{- $p := .Function.Receiver -}}
__dd_initProducer({{ $p }})
close({{ $p }}.__dd_produceChannel)

Trace kafka.Producer#Produce

Join Point
One of
  • Function body
  • Function body
Advice
Prepend statements produced by the following template:
// Using the following synthetic imports:
import (
	tracer "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
	tracing "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/internal/tracing"
)
{{- $p := .Function.Receiver -}}
{{- $msg := .Function.Argument 0 -}}
{{- $deliveryChan := .Function.Argument 1 -}}
{{- $err := .Function.Result 0 -}}
__dd_initProducer({{ $p }})
tMsg := __dd_wrapMessage({{ $msg }})
span := p.__dd_tracer.StartProduceSpan(tMsg)

var errChan chan error
{{ $deliveryChan }}, errChan = tracing.WrapDeliveryChannel({{ $p }}.__dd_tracer, {{ $deliveryChan }}, span, __dd_wrapEvent)

{{ $p }}.__dd_tracer.SetProduceCheckpoint(tMsg)
defer func() {
	if {{ $err }} != nil {
		if errChan != nil {
			errChan <- {{ $err }}
		} else {
			span.Finish(tracer.WithError({{ $err }}))
		}
	}
}()