Comments (2)
cc @cockroachdb/cdc
from cockroach.
You can repro this issue by applying this patch and running the cdc/bank
roachtest:
diff --git a/pkg/ccl/changefeedccl/sink_kafka.go b/pkg/ccl/changefeedccl/sink_kafka.go
index 69194e7018b..d64e30065cf 100644
--- a/pkg/ccl/changefeedccl/sink_kafka.go
+++ b/pkg/ccl/changefeedccl/sink_kafka.go
@@ -377,6 +377,10 @@ func (s *kafkaSink) EmitRow(
return s.emitMessage(ctx, msg)
}
+type resolvedMetadata struct {
+ partition int32
+}
+
// EmitResolvedTimestamp implements the Sink interface.
func (s *kafkaSink) EmitResolvedTimestamp(
ctx context.Context, encoder Encoder, resolved hlc.Timestamp,
@@ -419,6 +423,7 @@ func (s *kafkaSink) EmitResolvedTimestamp(
Partition: partition,
Key: nil,
Value: sarama.ByteEncoder(payload),
+ Metadata: resolvedMetadata{partition: partition},
}
if err := s.emitMessage(ctx, msg); err != nil {
return err
@@ -622,6 +627,11 @@ func (s *kafkaSink) finishProducerMessage(ackMsg *sarama.ProducerMessage, ackErr
}
m.alloc.Release(s.ctx)
}
+ if m, ok := ackMsg.Metadata.(resolvedMetadata); ok {
+ if m.partition != ackMsg.Partition {
+ panic(fmt.Sprintf("partition mismatch, requested %d, sent to %d", m.partition, ackMsg.Partition))
+ }
+ }
if s.mu.flushErr == nil && ackError != nil {
s.mu.flushErr = ackError
}
diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go
index b419c6bb21d..710bd9e5d2a 100644
--- a/pkg/cmd/roachtest/tests/cdc.go
+++ b/pkg/cmd/roachtest/tests/cdc.go
@@ -1136,7 +1136,10 @@ func registerCDC(r registry.Registry) {
feed := ct.newChangefeed(feedArgs{
sinkType: kafkaSink,
targets: allTpccTargets,
- opts: map[string]string{"initial_scan": "'no'"},
+ opts: map[string]string{
+ "initial_scan": "'no'",
+ "resolved": "",
+ },
})
ct.runFeedLatencyVerifier(feed, latencyTargets{
initialScanLatency: 3 * time.Minute,
@@ -2746,7 +2749,7 @@ func (k kafkaManager) createTopic(ctx context.Context, topic string) error {
return errors.Wrap(err, "admin client")
}
return admin.CreateTopic(topic, &sarama.TopicDetail{
- NumPartitions: 1,
+ NumPartitions: 5,
ReplicationFactor: 1,
}, false)
})
from cockroach.
Related Issues (20)
- pkg/ccl/testccl/sqlccl/sqlccl_test: TestReadCommittedVolatileUDF failed
- opt: extra projection after GeneratePartialIndexScans
- pkg/sql/logictest/tests/cockroach-go-testserver-upgrade-to-master/cockroach-go-testserver-upgrade-to-master_test: TestLogic_mixed_version_schedule_details failed
- bazel: update to Bazel 7
- Sentry: conn_executor.go:909: panic: × (1) attached stack trace -- stack trace: | github.com/cockroachdb/cockroach/pkg/sql.(*Server).ServeConn.func1 | github.com/cockroachdb/cockroach/pkg/sql/...
- Sentry: conn_executor.go:961: panic: × (1) attached stack trace -- stack trace: | github.com/cockroachdb/cockroach/pkg/sql.(*Server).ServeConn.func1 | github.com/cockroachdb/cockroach/pkg/sql/... HOT 1
- ccl/backupccl: TestDataDriven_system_privileges_table failed
- cdc: improve observability into sink problems HOT 1
- pkg/sql/logictest: Test data containing multi-line values causes test to fail HOT 1
- Sentry: error.go:28: unexpected error: forecasted histogram had first bucket with non-zero NumRange or DistinctRange: (1) × Wraps: (2) issue #93892 Wraps: (3) attached stack trace -- stack trace: ... HOT 1
- roachprod/test-selector: test selection is running certain tests after a week. HOT 1
- roachtest: tpcc/large-schema-benchmark/multiregion=false/tables=1000 failed HOT 1
- roachtest: tpcc/large-schema-benchmark/multiregion=false/tables=10000 failed HOT 3
- roachtest: tpcc/large-schema-benchmark/multiregion=false/tables=25000 failed HOT 1
- roachtest: tpcc/large-schema-benchmark/multiregion=false/tables=40000 failed HOT 1
- roachtest: tpcc/large-schema-benchmark/multiregion=false/tables=5000 failed HOT 1
- roachtest: tpcc/large-schema-benchmark/multiregion=true/tables=1000 failed HOT 1
- roachtest: alterpk-bank failed HOT 3
- roachtest: tpcc/large-schema-benchmark/multiregion=true/tables=10000 failed HOT 1
- roachtest: tpcc/large-schema-benchmark/multiregion=true/tables=5000 failed HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from cockroach.