Hi :)
I was playing around with this neat module (good job btw) and I was thinking that it was lacking of support on event listeners.
Basically my queues are defined on one server and processed on another server (producer/consumer pattern), the producer needs to listen to some events, based on bull documentation, this is done by listening on global:*
events.
But basically the principle would be the same with non-globals events aswell:
queue.on('global:active', doSomething);
queue.on('active', doSomethingElse);
I guess this kind of thing should be done when creating the queue, so I was thinking about something like this:
BullModule.forRootAsync({
imports: [ModuleA, ConfigModule],
inject: [ServiceA, ConfigService],
name: '__TEST_QUEUE__',
useFactory: async (serviceA: ServiceA, configService: ConfigService): Promise<BullModuleOptions> => ({
name: '__TEST_QUEUE__',
options: {
redis: configService.get('redis'),
},
listeners: {
'global:active': serviceA.onActive.bind(serviceA),
},
}),
}),
Or using a Map:
enum BullQueueEvent {
GlobalWaiting = 'global:waiting',
GlobalActive = 'global:active',
GlobalCompleted = 'global:completed',
Waiting = 'waiting',
Active = 'active',
Completed = 'completed',
....
}
type BullQueueEventListenerCallback = (...args: any[]) => void; // Could do a better typing here
BullModule.forRootAsync({
imports: [ModuleA, ConfigModule],
inject: [ServiceA, ConfigService],
name: '__TEST_QUEUE__',
useFactory: async (serviceA: ServiceA, configService: ConfigService): Promise<BullModuleOptions> => ({
name: '__TEST_QUEUE__',
options: {
redis: configService.get('redis'),
},
listeners: new Map<BullQueueEvent, BullQueueEventListenerCallback>(
[BullQueueEvent.GlobalWaiting, serviceA.onGlobalWaitingEvent.bind(serviceA)],
[BullQueueEvent.GlobalActive, serviceA.onGlobalActiveEvent.bind(serviceA)],
[BullQueueEvent.GlobalCompleted, serviceA.onGlobalCompletedEvent.bind(serviceA)],
....
),
}),
}),
And the provider with an object literal:
// bull.provider.ts
function buildQueue(option: BullModuleOptions): Bull.Queue {
const queue: Bull.Queue = new Bull(option.name ? option.name : 'default', option.options);
if (option.listeners) {
for (const event of Object.keys(option.listeners)) {
queue.on(event, option.listeners[event]);
}
}
...
}
With a Map:
// bull.provider.ts
function buildQueue(option: BullModuleOptions): Bull.Queue {
const queue: Bull.Queue = new Bull(option.name ? option.name : 'default', option.options);
if (option.listeners) {
option.listeners.forEach((cb: BullQueueEventListenerCallback, event: BullQueueEvent) => {
queue.on(event, cb);
});
}
...
}
I tested it locally and it works fine, the only downside I found is that I have to bind context to the callback so I can keep using dependencies of my service(s), there might be something better to do.
I might by totally wrong by the way, if you guys have another way of handlings events, I'd be glad to discuss about it.
Dylan