ancashoria / graphql-kafka-subscriptions Goto Github PK
View Code? Open in Web Editor NEWApollo graphql subscriptions over Kafka protocol
License: MIT License
Apollo graphql subscriptions over Kafka protocol
License: MIT License
I was able to get my subscriptions working with my kafka instance locally, but when I try to dockerize them they stop firing. The playground works just like always: the sub starts up, is listening for a publish, then when I hit the endpoint that triggers the sub nothing happens. It feels very similar to accidentally subscribing and publishing to different pubsub instances. Does anyone know if there is some underlying issue with running these subs in a container?
I'm just trying to test my KafkaPubSub and have a really simple setup:
Resolver
Subscription: {
messageCreated: {
subscribe: () => pubsub.asyncIterator(MESSAGE_CREATED)
}
}
Publish
setInterval(() => {
let r = pubsub.publish({
channel: MESSAGE_CREATED,
messageCreated: { id, content: new Date().toString() },
});
console.log(r, id)
id++;
}, 5000);
The publishes are happening and I see no error with them, but even though my subscription is started up in the playground it just listens and nothing ever comes through. I'm not sure if maybe it is something with kafka itself? My configs are
export const KafkaPubSubConfig = {
topic: <TOPIC>,
host: <BROKER>,
port: <PORT>,
groupId: 'graphql-sub-test',
}
export const PublisherKafkaPubSubConfig = {
...KafkaPubSubConfig,
globalConfig: {
'security.protocol': 'sasl_ssl',
'ssl.endpoint.identification.algorithm': 'https',
'ssl.ca.location': <PATH_TO_CERT>,
'enable.ssl.certificate.verification': false,
'ssl.keystore.location': <PATH_TO_KEYSTORE>,
'ssl.keystore.password': <PASSWORD>,
'sasl.mechanism': 'GSSAPI',
'sasl.kerberos.keytab': <PATH_TO_KEYTAB>,
'sasl.kerberos.principal': <PRINCIPAL>,
'sasl.kerberos.service.name': <NAME>,
'group.id': 'librd-test',
}
}
Is this project still active.
One of the main benefits of Kafka is message durability and the ability for the client to specify an offset. If the client goes offline, will they receive the "skipped" messages over the subscription. This could be different per client although they subscribe to the same Topic/Partition. Its fine to have real-time push, but we really need more of a pull/push system.
Good day,
First of all, good job on the library, its really easy to use. Will you consider building in sasl support? I would like to connect to confluent.cloud to use the kafka as pubsub as we are already using it for a different consumption purpose. I would help and do a PR if required?
Thank you
Thanks for making this package. I think it's really great.
I notice that logger method used are only debug, warn, info, and error, but when I pass my logger that supports those function I get the following.
error TS2345: Argument of type 'MyLogger' is not assignable to parameter of type 'Logger'.
Type 'IsamLogger' is missing the following properties from type 'Logger': addStream, addSerializers, child, reopenFileStreams, and 21 more.
It's problematic to support all the interfaces that bunyan offers.
No receipt for 'com.apple.pkg.DeveloperToolsCLILeo' found at '/'.
No receipt for 'com.apple.pkg.DeveloperToolsCLI' found at '/'.
gyp: No Xcode or CLT version detected!
gyp ERR! configure error
gyp ERR! stack Error: gyp
failed with exit code: 1
gyp ERR! stack at ChildProcess.onCpExit (/usr/local/lib/node_modules/npm/node_modules/node-gyp/lib/configure.js:351:16)
gyp ERR! stack at ChildProcess.emit (events.js:315:20)
gyp ERR! stack at Process.ChildProcess._handle.onexit (internal/child_process.js:275:12)
gyp ERR! System Darwin 19.6.0
gyp ERR! command "/usr/local/bin/node" "/usr/local/lib/node_modules/npm/node_modules/node-gyp/bin/node-gyp.js" "rebuild"
gyp ERR! cwd /Users/103cuong/Workspace/software_engineer/graphql-kafka-subscriptions/node_modules/fsevents
[8/9] โข node-rdkafka
[9/9] โข nodemon
[-/9] โข waiting...
[-/9] โข waiting...
error /Users/103cuong/Workspace/software_engineer/graphql-kafka-subscriptions/node_modules/node-rdkafka: Command failed.
Exit code: 1
Command: node-gyp rebuild
Arguments:
Directory: /Users/103cuong/Workspace/software_engineer/graphql-kafka-subscriptions/node_modules/node-rdkafka
Output:
gyp info it worked if it ends with ok
gyp info using [email protected]
gyp info using [email protected] | darwin | x64
gyp info find Python using Python version 2.7.16 found at "/System/Library/Frameworks/Python.framework/Versions/2.7/Resources/Python.app/Contents/MacOS/Python"
gyp info spawn /System/Library/Frameworks/Python.framework/Versions/2.7/Resources/Python.app/Contents/MacOS/Python
gyp info spawn args [
gyp info spawn args '/usr/local/lib/node_modules/npm/node_modules/node-gyp/gyp/gyp_main.py',
gyp info spawn args 'binding.gyp',
gyp info spawn args '-f',
gyp info spawn args 'make',
gyp info spawn args '-I',
gyp info spawn args '/Users/103cuong/Workspace/software_engineer/graphql-kafka-subscriptions/node_modules/node-rdkafka/build/config.gypi',
gyp info spawn args '-I',
gyp info spawn args '/usr/local/lib/node_modules/npm/node_modules/node-gyp/addon.gypi',
gyp info spawn args '-I',
gyp info spawn args '/Users/103cuong/Library/Caches/node-gyp/12.18.2/include/node/common.gypi',
gyp info spawn args '-Dlibrary=shared_library',
gyp info spawn args '-Dvisibility=default',
gyp info spawn args '-Dnode_root_dir=/Users/103cuong/Library/Caches/node-gyp/12.18.2',
gyp info spawn args '-Dnode_gyp_dir=/usr/local/lib/node_modules/npm/node_modules/node-gyp',
gyp info spawn args '-Dnode_lib_file=/Users/103cuong/Library/Caches/node-gyp/12.18.2/<(target_arch)/node.lib',
gyp info spawn args '-Dmodule_root_dir=/Users/103cuong/Workspace/software_engineer/graphql-kafka-subscriptions/node_modules/node-rdkafka',
gyp info spawn args '-Dnode_engine=v8',
gyp info spawn args '--depth=.',
gyp info spawn args '--no-parallel',
gyp info spawn args '--generator-output',
gyp info spawn args 'build',
gyp info spawn args '-Goutput_dir=.'
gyp info spawn args ]
No receipt for 'com.apple.pkg.CLTools_Executables' found at '/'.
No receipt for 'com.apple.pkg.DeveloperToolsCLILeo' found at '/'.
No receipt for 'com.apple.pkg.DeveloperToolsCLI' found at '/'.
gyp: No Xcode or CLT version detected!
gyp ERR! configure error
gyp ERR! stack Error: gyp
failed with exit code: 1
gyp ERR! stack at ChildProcess.onCpExit (/usr/local/lib/node_modules/npm/node_modules/node-gyp/lib/configure.js:351:16)
gyp ERR! stack at ChildProcess.emit (events.js:315:20)
gyp ERR! stack at Process.ChildProcess._handle.onexit (internal/child_process.js:275:12)
gyp ERR! System Darwin 19.6.0
gyp ERR! command "/usr/local/bin/node" "/usr/local/lib/node_modules/npm/node_modules/node-gyp/bin/node-gyp.js" "rebuild"
gyp ERR! cwd /Users/103cuong/Workspace/software_engineer/graphql-kafka-subscriptions/node_modules/node-rdkafka
- macOS Catalina 10.15.7
- node: v12.18.2
- yarn: 1.22.4
How can I resolve it? Thanks in advance.
First off, thanks for making this package!
I am still pretty green when it comes to subscriptions in GraphQL. I have been going back and forth between the different implementations (Redis, Kafka, PubSub) and I am at a loss for how to get this setup properly.
I am assuming that I have it properly setup with the connection options since I get a steady stream of Got Message
console log statements.
Then I have the following in the resolver for the subscription, but I don't see any of the console.log
statements from within the filterFn.
const { withFilter } = require('graphql-subscriptions');
const { KafkaPubSub } = require('graphql-kafka-subscriptions');
const pubsub = new KafkaPubSub({
topic: 'eog_ioptimize_prod_timeseries',
host: 'ktymapr30.eogresources.com',
port: '9092',
});
const subscribe = withFilter(
() => pubsub.asyncIterator('eog_ioptimize_prod_timeseries'),
(payload, variables) => {
console.log('PAYLOAD', payload);
console.log('VARIABLES', variables);
return true;
}
)
module.exports = {
timeseries: {
subscribe,
}
}
I'm certain that it is a setup issue on my part, but I just can't figure out where it is
We are running GraphQL APIs on Apollo server, deployed on the OpenShift container platform, and we are using the graphql-kafka-subscriptions package.
When we activate a subscription, we receive notifications for its triggers twice.
This fact does not happen when running the server locally.
Could this issue be due to some unexpected mechanism due to the execution of Apollo as a containerized application?
Thanks for helping.
Hi,
we encountered issue with your package, when private method createConsumer
in kafka-pubsub.js
does not handle node-rdkafka Stream API in correct way. This problem leads to not catching all messages from Kafka broker, in our case, after 3 events were recieved, consumer stops consuming.
Current code:
stream.consumer.on('data', (message) => {
let parsedMessage = JSON.parse(message.value.toString())
...
})
You access Standard API via Stream API object without manual setup for flowing/non-flowing mode (see examples on https://www.npmjs.com/package/node-rdkafka). One of the solutions is use only Stream API (example below).
Fix:
stream.on('data', (message) => {
let parsedMessage = JSON.parse(message.value.toString())
...
})
This fix helps us to get it working, so we'll be glad to have this thing fixed in the next release. Thanks!
Hi,
I've found that if I use the graphql-kafka-subscriptions module (0.2.3) with webpack then I get the following error when the module is required:
path.js:28
throw new TypeError('Path must be a string. Received ' + inspect(path));
^
TypeError: Path must be a string. Received undefined
at assertPath (path.js:28:11)
at dirname (path.js:1349:5)
at Function.getRoot (webpack:///./node_modules/bindings/bindings.js?:151:13)
at bindings (webpack:///./node_modules/bindings/bindings.js?:60:32)
at eval (webpack:///./node_modules/node-rdkafka/librdkafka.js?:10:87)
at Object../node_modules/node-rdkafka/librdkafka.js (/home/vagrant/Git/money-management/subscription-server/dist/app.js:468:1)
at __webpack_require__ (/home/vagrant/Git/money-management/subscription-server/dist/app.js:20:30)
at eval (webpack:///./node_modules/node-rdkafka/lib/client.js?:14:13)
at Object../node_modules/node-rdkafka/lib/client.js (/home/vagrant/Git/money-management/subscription-server/dist/app.js:355:1)
at __webpack_require__ (/home/vagrant/Git/money-management/subscription-server/dist/app.js:20:30)
This is simple to reproduce by requiring the module in the webpack startup target, e.g
const { KafkaPubSub } = require('graphql-kafka-subscriptions');
It would be great if the module was compatible with webpack.
Kind regards,
Matt
Hi, i love to see your implement in the list here. :)
https://github.com/apollographql/graphql-subscriptions#pubsub-implementations
I'm working on CentOS and I've installed the latest graphql-kafka-subscriptions
and an passing the following configurations to the KafkaPubSub
{
topic: <kafka_topic>,
host: <host>,
port: <port>,
globalConfig: {
'security.protocol': 'SASL_PLAINTEXT',
'sasl.kerberos.service.name': 'kafka',
'sasl.kerberos.keytab': <keytab_file>,
'sasl.kerberos.principal': <principal>
}
}
From here I have two problems (though maybe they're related?). If I do a curl on the the graphql instance I get back
Error: No provider for SASL mechanism GSSAPI: recompile librdkafka with libsasl2 or openssl support. Current build options: PLAIN
This is of course a problem in librdkafka, but as much as I've searched I cannot figure out how to get it the libs that it needs. I've even tried exporting the env vars for MacOS with no success.
If however I don't send anything, no curl request, I get
[Error: Local: Host resolution failure]
[Error: Local: All broker connections are down]
<Path>/node_modules/node-rdkafka/lib/error.js:402
return new LibrdKafkaError(e);
^
Error: Local: Broker transport failure
at Function.createLibrdkafkaError [as create] (<Path>/node_modules/node-rdkafka/lib/error.js:402:10)
at <Path>/node_modules/node-rdkafka/lib/client.js:341:28
The host however is reachable from my machine so I'm wondering if there is some underlying problem that the error messages don't fulling explain. Also as a note
[Error: Local: Host resolution failure]
[Error: Local: All broker connections are down]
Were console logs I added manually to node-rdkafka.
Any help/direction you could give me on this would be much appreciated!
Hi there. Great work on this.
Have you considered adding the ability to pass options through to the underlying consumer that is created? For example, we are looking to encrypt Kafka topics and would like to configure some of the ssl.*
properties specified in https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md, but that doesn't look possible yet.
Question: Would there be a good way to pass a consumer groupId on each subscription? This would allow those who's connection was broken to reconnect and get missed events.
Similarly, is there a good way to specify a topic on publish and to consume from, such that we could publish and consume from multiple topics?
Do you happen to have a simple example of a subscription resolver that consumes a Kafka topic? I haven't really seen any examples of this online and was wondering if you could help.
It is possible that an existing kafka producer for a topic may not use channels. Topics can have messages that are not targetted to a channel. It should be possible to setup the KafkaPubSub with a configuration or the asyncIterator with some sort of null value or empty array to allow all messages found on a topic to be considered valid payloads to process.
how to use this package subscribing kafka with acl security authentication?
this package uses the oldest graphql-subscriptions package(0.5.x). but the latest version is 1.1.x.
Hi, I followed the usage guide and it works for a few messages at first, and then after maybe 4/5 payloads, it does not deliver to clients. I started a separate Kafka console consumer, and I am still able to receive messages despite this, which leads me to believe the issue is with the async iterator? Need to understand how to fix this otherwise this library won't be usable unfortunately. Not sure what exactly is causing this. Any guidance is greatly appreciated! (PS. I am new to Kafka so this could totally be an unrelated problem)
Would you be willing to accept a PR for adding support for multiple brokers.
I'm thinking a comma delimited list, as node-rdkafka normally uses
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.