github.com/confluentinc/confluent-kafka-go/kafka
confluent-kafka-go is a Go library for Apache Kafka
Inject kafka library version (v1)
Definition of
kafka.Consumer
Introduce new declarations:
// Using the following synthetic imports:
import (
tracing "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/internal/tracing"
)
const __dd_ckgoVersion = tracing.CKGoVersion1
Inject kafka library version (v2)
Definition of
kafka.Consumer
Introduce new declarations:
// Using the following synthetic imports:
import (
tracing "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/internal/tracing"
)
const __dd_ckgoVersion = tracing.CKGoVersion2
Inject wrapped types to the kafka package
One of
- Definition of
kafka.Consumer
- Definition of
kafka.Consumer
Introduce new declarations:
// Using the following synthetic imports:
import (
telemetry "gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry"
tracer "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
tracing "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/internal/tracing"
)
type __dd_wMessage struct {
*Message
}
func __dd_wrapMessage(msg *Message) tracing.Message {
if msg == nil {
return nil
}
return &__dd_wMessage{msg}
}
func (w *__dd_wMessage) Unwrap() any {
return w.Message
}
func (w *__dd_wMessage) GetValue() []byte {
return w.Message.Value
}
func (w *__dd_wMessage) GetKey() []byte {
return w.Message.Key
}
func (w *__dd_wMessage) GetHeaders() []tracing.Header {
hs := make([]tracing.Header, 0, len(w.Headers))
for _, h := range w.Headers {
hs = append(hs, __dd_wrapHeader(h))
}
return hs
}
func (w *__dd_wMessage) SetHeaders(headers []tracing.Header) {
hs := make([]Header, 0, len(headers))
for _, h := range headers {
hs = append(hs, Header{
Key: h.GetKey(),
Value: h.GetValue(),
})
}
w.Message.Headers = hs
}
func (w *__dd_wMessage) GetTopicPartition() tracing.TopicPartition {
return __dd_wrapTopicPartition(w.Message.TopicPartition)
}
type __dd_wHeader struct {
Header
}
func __dd_wrapHeader(h Header) tracing.Header {
return &__dd_wHeader{h}
}
func (w __dd_wHeader) GetKey() string {
return w.Header.Key
}
func (w __dd_wHeader) GetValue() []byte {
return w.Header.Value
}
type __dd_wTopicPartition struct {
TopicPartition
}
func __dd_wrapTopicPartition(tp TopicPartition) tracing.TopicPartition {
return __dd_wTopicPartition{tp}
}
func __dd_wrapTopicPartitions(tps []TopicPartition) []tracing.TopicPartition {
wtps := make([]tracing.TopicPartition, 0, len(tps))
for _, tp := range tps {
wtps = append(wtps, __dd_wTopicPartition{tp})
}
return wtps
}
func (w __dd_wTopicPartition) GetTopic() string {
if w.Topic == nil {
return ""
}
return *w.Topic
}
func (w __dd_wTopicPartition) GetPartition() int32 {
return w.Partition
}
func (w __dd_wTopicPartition) GetOffset() int64 {
return int64(w.Offset)
}
func (w __dd_wTopicPartition) GetError() error {
return w.Error
}
type __dd_wEvent struct {
Event
}
func __dd_wrapEvent(event Event) tracing.Event {
return __dd_wEvent{event}
}
func (w __dd_wEvent) KafkaMessage() (tracing.Message, bool) {
if m, ok := w.Event.(*Message); ok {
return __dd_wrapMessage(m), true
}
return nil, false
}
func (w __dd_wEvent) KafkaOffsetsCommitted() (tracing.OffsetsCommitted, bool) {
if oc, ok := w.Event.(OffsetsCommitted); ok {
return __dd_wrapOffsetsCommitted(oc), true
}
return nil, false
}
type __dd_wOffsetsCommitted struct {
OffsetsCommitted
}
func __dd_wrapOffsetsCommitted(oc OffsetsCommitted) tracing.OffsetsCommitted {
return __dd_wOffsetsCommitted{oc}
}
func (w __dd_wOffsetsCommitted) GetError() error {
return w.Error
}
func (w __dd_wOffsetsCommitted) GetOffsets() []tracing.TopicPartition {
ttps := make([]tracing.TopicPartition, 0, len(w.Offsets))
for _, tp := range w.Offsets {
ttps = append(ttps, __dd_wrapTopicPartition(tp))
}
return ttps
}
type __dd_wConfigMap struct {
cfg *ConfigMap
}
func __dd_wrapConfigMap(cm *ConfigMap) tracing.ConfigMap {
return &__dd_wConfigMap{cm}
}
func (w *__dd_wConfigMap) Get(key string, defVal any) (any, error) {
return w.cfg.Get(key, defVal)
}
func init() {
telemetry.LoadIntegration(tracing.ComponentName(__dd_ckgoVersion))
tracer.MarkIntegrationImported(tracing.IntegrationName(__dd_ckgoVersion))
}
func __dd_newKafkaTracer(opts ...tracing.Option) *tracing.KafkaTracer {
v, _ := LibraryVersion()
return tracing.NewKafkaTracer(__dd_ckgoVersion, v, opts...)
}
func __dd_initConsumer(c *Consumer) {
if c.__dd_tracer != nil {
return
}
var opts []tracing.Option
if c.__dd_confmap != nil {
opts = append(opts, tracing.WithConfig(__dd_wrapConfigMap(c.__dd_confmap)))
}
c.__dd_tracer = __dd_newKafkaTracer(opts...)
// TODO: accessing c.events here might break if the library renames this variable...
c.__dd_events = tracing.WrapConsumeEventsChannel(c.__dd_tracer, c.events, c, __dd_wrapEvent)
}
func __dd_initProducer(p *Producer) {
if p.__dd_tracer != nil {
return
}
p.__dd_tracer = __dd_newKafkaTracer()
// TODO: accessing p.events and p.produceChannel here might break if the library renames this variable...
p.__dd_events = p.events
p.__dd_produceChannel = tracing.WrapProduceChannel(p.__dd_tracer, p.produceChannel, __dd_wrapMessage)
if p.__dd_tracer.DSMEnabled() {
p.__dd_events = tracing.WrapProduceEventsChannel(p.__dd_tracer, p.events, __dd_wrapEvent)
}
}
type __dd_eventChan = chan Event
type __dd_messageChan = chan *Message
type __dd_kafkaTracer = tracing.KafkaTracer
Add struct fields to kafka.Consumer
Trace kafka.NewConsumer
One of
- All of
- Import path
github.com/
confluentinc/ confluent-kafka-go/ kafka - Function body
- Function declaration
- Function name
NewConsumer
- All of
- Function body
- Function declaration
- Function name
NewConsumer
Prepend statements produced by the following template:
// Using the following synthetic imports:
import (
tracing "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/internal/tracing"
)
{{- $conf := .Function.Argument 0 -}}
{{- $c := .Function.Result 0 -}}
defer func() {
if {{ $c }} == nil {
return
}
{{ $c }}.__dd_confmap = {{ $conf }}
__dd_initConsumer({{ $c }})
}()
Trace kafka.Consumer#Close
One of
- Function body
- Function declaration
- Is method of
*kafka.Consumer
- Function name
Close
- Function body
- Function declaration
- Is method of
*kafka.Consumer
- Function name
Close
Prepend statements produced by the following template:
// Using the following synthetic imports:
import (
tracing "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/internal/tracing"
)
{{- $c := .Function.Receiver -}}
__dd_initConsumer({{ $c }})
defer func() {
if {{ $c }}.__dd_events == nil && {{ $c }}.__dd_tracer.PrevSpan != nil {
{{ $c }}.__dd_tracer.PrevSpan.Finish()
{{ $c }}.__dd_tracer.PrevSpan = nil
}
}()
Trace kafka.Consumer#Events
One of
- Function body
- Function declaration
- Is method of
*kafka.Consumer
- Function name
Events
- Function body
- Function declaration
- Is method of
*kafka.Consumer
- Function name
Events
Prepend statements produced by the following template:
// Using the following synthetic imports:
import (
tracing "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/internal/tracing"
)
{{- $c := .Function.Receiver -}}
{{- $events := .Function.Result 0 -}}
__dd_initConsumer({{ $c }})
defer func() {
{{ $events }} = {{ $c }}.__dd_events
}()
Trace kafka.Consumer#Poll
One of
- Function body
- Function declaration
- Is method of
*kafka.Consumer
- Function name
Poll
- Function body
- Function declaration
- Is method of
*kafka.Consumer
- Function name
Poll
Prepend statements produced by the following template:
// Using the following synthetic imports:
import (
tracing "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/internal/tracing"
)
{{- $c := .Function.Receiver -}}
{{- $event := .Function.Result 0 -}}
__dd_initConsumer({{ $c }})
if {{ $c }}.__dd_tracer.PrevSpan != nil {
{{ $c }}.__dd_tracer.PrevSpan.Finish()
{{ $c }}.__dd_tracer.PrevSpan = nil
}
defer func() {
if msg, ok := {{ $event }}.(*Message); ok {
tMsg := __dd_wrapMessage(msg)
{{ $c }}.__dd_tracer.SetConsumeCheckpoint(tMsg)
{{ $c }}.__dd_tracer.PrevSpan = {{ $c }}.__dd_tracer.StartConsumeSpan(tMsg)
} else if offset, ok := {{ $event }}.(OffsetsCommitted); ok {
tOffsets := __dd_wrapTopicPartitions(offset.Offsets)
{{ $c }}.__dd_tracer.TrackCommitOffsets(tOffsets, offset.Error)
{{ $c }}.__dd_tracer.TrackHighWatermarkOffset(tOffsets, {{ $c }})
}
}()
Trace kafka.Consumer#Commit
One of
- Function body
- Function declaration
- Is method of
*kafka.Consumer
- Function name
Commit
- Function body
- Function declaration
- Is method of
*kafka.Consumer
- Function name
Commit
Prepend statements produced by the following template:
// Using the following synthetic imports:
import (
tracing "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/internal/tracing"
)
{{- $c := .Function.Receiver -}}
{{- $tps := .Function.Result 0 -}}
{{- $err := .Function.Result 1 -}}
__dd_initConsumer({{ $c }})
defer func() {
tOffsets := __dd_wrapTopicPartitions({{ $tps }})
{{ $c }}.__dd_tracer.TrackCommitOffsets(tOffsets, {{ $err }})
{{ $c }}.__dd_tracer.TrackHighWatermarkOffset(tOffsets, {{ $c }})
}()
Trace kafka.Consumer#CommitMessage
One of
- Function body
- Function declaration
- Is method of
*kafka.Consumer
- Function name
CommitMessage
- Function body
- Function declaration
- Is method of
*kafka.Consumer
- Function name
CommitMessage
Prepend statements produced by the following template:
// Using the following synthetic imports:
import (
tracing "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/internal/tracing"
)
{{- $c := .Function.Receiver -}}
{{- $tps := .Function.Result 0 -}}
{{- $err := .Function.Result 1 -}}
__dd_initConsumer({{ $c }})
defer func() {
tOffsets := __dd_wrapTopicPartitions({{ $tps }})
{{ $c }}.__dd_tracer.TrackCommitOffsets(tOffsets, {{ $err }})
{{ $c }}.__dd_tracer.TrackHighWatermarkOffset(tOffsets, {{ $c }})
}()
Trace kafka.Consumer#CommitOffsets
One of
- Function body
- Function declaration
- Is method of
*kafka.Consumer
- Function name
CommitOffsets
- Function body
- Function declaration
- Is method of
*kafka.Consumer
- Function name
CommitOffsets
Prepend statements produced by the following template:
// Using the following synthetic imports:
import (
tracing "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/internal/tracing"
)
{{- $c := .Function.Receiver -}}
{{- $tps := .Function.Result 0 -}}
{{- $err := .Function.Result 1 -}}
__dd_initConsumer({{ $c }})
defer func() {
tOffsets := __dd_wrapTopicPartitions({{ $tps }})
{{ $c }}.__dd_tracer.TrackCommitOffsets(tOffsets, {{ $err }})
{{ $c }}.__dd_tracer.TrackHighWatermarkOffset(tOffsets, {{ $c }})
}()
Add struct fields to kafka.Producer
Trace kafka.Producer#Events
One of
- Function body
- Function declaration
- Is method of
*kafka.Producer
- Function name
Events
- Function body
- Function declaration
- Is method of
*kafka.Producer
- Function name
Events
Prepend statements produced by the following template:
{{- $p := .Function.Receiver -}}
{{- $events := .Function.Result 0 -}}
__dd_initProducer({{ $p }})
defer func() {
{{ $events }} = {{ $p }}.__dd_events
}()
Trace kafka.Producer#ProduceChannel
One of
- Function body
- Function declaration
- Is method of
*kafka.Producer
- Function name
ProduceChannel
- Function body
- Function declaration
- Is method of
*kafka.Producer
- Function name
ProduceChannel
Prepend statements produced by the following template:
{{- $p := .Function.Receiver -}}
{{- $produceChannel := .Function.Result 0 -}}
__dd_initProducer({{ $p }})
defer func() {
{{ $produceChannel }} = {{ $p }}.__dd_produceChannel
}()
Trace kafka.Producer#Close
One of
- Function body
- Function declaration
- Is method of
*kafka.Producer
- Function name
Close
- Function body
- Function declaration
- Is method of
*kafka.Producer
- Function name
Close
Prepend statements produced by the following template:
{{- $p := .Function.Receiver -}}
__dd_initProducer({{ $p }})
close({{ $p }}.__dd_produceChannel)
Trace kafka.Producer#Produce
One of
- Function body
- Function declaration
- Is method of
*kafka.Producer
- Function name
Produce
- Function body
- Function declaration
- Is method of
*kafka.Producer
- Function name
Produce
Prepend statements produced by the following template:
// Using the following synthetic imports:
import (
tracer "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
tracing "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/internal/tracing"
)
{{- $p := .Function.Receiver -}}
{{- $msg := .Function.Argument 0 -}}
{{- $deliveryChan := .Function.Argument 1 -}}
{{- $err := .Function.Result 0 -}}
__dd_initProducer({{ $p }})
tMsg := __dd_wrapMessage({{ $msg }})
span := p.__dd_tracer.StartProduceSpan(tMsg)
var errChan chan error
{{ $deliveryChan }}, errChan = tracing.WrapDeliveryChannel({{ $p }}.__dd_tracer, {{ $deliveryChan }}, span, __dd_wrapEvent)
{{ $p }}.__dd_tracer.SetProduceCheckpoint(tMsg)
defer func() {
if {{ $err }} != nil {
if errChan != nil {
errChan <- {{ $err }}
} else {
span.Finish(tracer.WithError({{ $err }}))
}
}
}()