Stream messages from a RabbitMQ queue to Intercom.
Run __main__.py
with environment variables:
CJW_RABBITMQ_HOST
: URL of RabbitMQCJW_INTERCOM_API_TOKEN
: API token for Intercom; or the magic stringmock
CJW_INTERCOM_QUEUE_NAME
: RabbitMQ queue to read
Messages on the queue must be msgpack-encoded maps with the following keys:
method
: HTTP method (usuallyPOST
)path
: HTTP path (e.g., `/contacts')data
: Body of message (we'll encoded it as JSON)
This program will send messages, one at a time, and then ack the RabbitMQ message.
Use the environment variable CJW_INTERCOM_API_TOKEN=mock
to make this program
merely log messages instead of sending them to Intercom. This is useful in
development and test environments.
On the Intercom side, this program adheres to at-least once semantics. We'll resend a request until we're certain Intercom received it.
If a message on the RabbitMQ queue is malformed, this program will log an error and ack the message.
Otherwise, we only ack a message after it's been delivered to Intercom.
In the event of a RabbitMQ error, this program will crash. This should cause the cluster to restart this service and resume sending the same, un-acked message that was being sent previously. (TODO reconnect)
For each request, there is a "last response was 50X error" flag. Any retry (except after 50X error) resets the flag to False.
In the event of HTTP 429, this program will pause according to rate-limiting rules and then retry.
In the event of HTTP 404, this program will log a warning but consider the message successfully delivered. (An HTTP 404 can happen if we, say, try to delete a contact we already deleted -- which is a natural thing to happen with at-least-once semantics.)
In the event of HTTP 50X, this program will retry once with "last response was 50X error" flag set (as per Intercom docs.
In the event of HTTP 40X or a second HTTP 50X, this program will log an error but consider the message successfully delivered. This indicates human intervention is necessary; messages will be dropped so the queue doesn't back up forever.
In the event of a TCP error, this program will log an error and retry.
docker build .
to make sure code is correctly formatted- Develop a new feature (and maybe add tests?)
docker build .
to make sure it's still okaygit push
to trigger a build
Use the Docker image, gcr.io/workbenchdata-ci/cjw-intercom-sink:SHA1
MIT. See LICENSE.