Giter VIP home page Giter VIP logo

http-source-connector's Introduction

Fluvio HTTP Inbound Connector

Read HTTP Responses given input HTTP request configuration options and produce them to Fluvio topics.

This connector can be configured to operate in three modes.

  • Polling: Unless otherwise specified, the endpoint will be polled periodically, with the polling interval specified by providing the interval config option. Each response will be produced as an individual Fluvio record.
  • Streaming: When the stream config option is provided, the HTTP response will be processed as a data stream. A record will be produced to Fluvio every time a delimiter segment is encountered, which is set to \n by default.
  • WebSocket: When the provided endpoint config option is prefixed with ws://, a WebSocket connection will be established, and each incoming message will be produced.

Supports HTTP/1.0, HTTP/1.1, HTTP/2.0 protocols.

See docs here. Tutorial for HTTP to SQL Pipeline.

Configuration

Option default type description
interval 10s String Interval between each HTTP Request. This is in the form of "1s", "10ms", "1m", "1ns", etc.
method GET String GET, POST, PUT, HEAD
endpoint - String HTTP URL endpoint. Use ws:// for websocket URLs.
headers - Array<String> Request header(s) "Key:Value" pairs
body - String Request body e.g. in POST
user-agent "fluvio/http-source 0.1.0" String Request user-agent
output_type text String text = UTF-8 String Output, json = UTF-8 JSON Serialized String
output_parts body String body = body only, full = all status, header and body parts
stream false bool Flag to indicate HTTP streaming mode
delimiter '\n' String Delimiter to separate records when producing from an HTTP streaming endpoint

Record Type Output

Matrix Output
output_type = text (default), output_parts = body (default) Only the body of the HTTP Response
output_type = text (default), output_parts = full The full HTTP Response
output_type = json, output_parts = body (default) Only the "body" in JSON struct
output_type = json, output_parts = full HTTP "status", "body" and "header" JSON

Usage Example

This is an example of simple connector config file for polling an endpoint:

# config-example.yaml
apiVersion: 0.1.0
meta:
  version: 0.3.7
  name: cat-facts
  type: http-source
  topic: cat-facts
  create-topic: true
  secrets:
    - name: AUTHORIZATION_TOKEN
http:
  endpoint: "https://catfact.ninja/fact"
  interval: 10s  
  headers:
    - "Authorization: token ${{ secrets.AUTHORIZATION_TOKEN }}"
    - "Cache-Control: no-cache"

The produced record in Fluvio topic will be:

{
  "fact": "The biggest wildcat today is the Siberian Tiger. It can be more than 12 feet (3.6 m) long (about the size of a small car) and weigh up to 700 pounds (317 kg).",
  "length": 158
}

Secrets

Fluvio HTTP Source Connector supports Secrets in the endpoint and in the headers parameters:

# config-example.yaml
apiVersion: 0.1.0
meta:
  version: 0.3.7
  name: cat-facts
  type: http-source
  topic: cat-facts
  create-topic: true
  secrets:
    - name: MY_SECRET_URL
    - name: MY_AUTHORIZATION_HEADER
http:
 endpoint: 
   secret:
     name: MY_SECRET_URL
 headers: 
  - "Authorization: ${{ secrets.MY_AUTHORIZATION_HEADER }}
 interval: 10s

Transformations

Fluvio HTTP Source Connector supports Transformations. Records can be modified before sending to Fluvio topic.

The previous example can be extended to add extra transformations to outgoing records:

# config-example.yaml
apiVersion: 0.1.0
meta:
  version: 0.3.7
  name: cat-facts
  type: http-source
  topic: cat-facts
  create-topic: true
http:
  endpoint: "https://catfact.ninja/fact"
  interval: 10s
transforms:
  - uses: infinyon/[email protected]
    with:
      spec:
        - operation: default
          spec:
            source: "http-connector"
        - operation: remove
          spec:
            length: ""

In this case, additional transformation will be performed before records are sent to Fluvio topic: field length will be removed and field source with string value http-connector will be added.

Now produced records will have a different shape, for example:

{
  "fact": "A cat has more bones than a human; humans have 206, and the cat - 230.",
  "source": "http-connector"
}

Read more about JSON to JSON transformations.

Streaming Mode

Provide the stream configuration option to enable streaming mode with delimiter to determine how the incoming records are separated.

# config-example.yaml
apiVersion: 0.1.0
meta:
  version: 0.3.7
  name: wiki-updates
  type: http-source
  topic: wiki-updates
http:
  endpoint: "https://stream.wikimedia.org/v2/stream/recentchange"
  method: GET
  stream: true
  delimiter: "\n\n"

Websocket Mode

Connect to a websocket endpoint using a ws:// URL. When reading text messages, they are emitted as equivalent records. Binary messages are initially attempted to be converted into strings.

# config-example.yaml
apiVersion: 0.1.0
meta:
  version: 0.3.7
  name: websocket-connector
  type: http-source
  topic: websocket-updates
http:
  endpoint: ws://websocket.example/websocket

http-source-connector's People

Contributors

avikam avatar crajcan avatar dependabot[bot] avatar digikata avatar estebanborai avatar galibey avatar morenol avatar nacardin avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

http-source-connector's Issues

Release v0.3.3

Publish multiple targets to Hub

Need to build and publish to prod and dev Hubs the following list of targets:

x86_64-unknown-linux-musl
x86_64-unknown-linux-gnu
aarch64-unknown-linux-gnu
aarch64-unknown-linux-musl

aarch64-apple-darwin
x86_64-apple-darwin

[Feature] Add HTTP Sink Connector to InfinyOn Hub

Productionize - https://github.com/infinyon/http-sink-prototype.

The sink connector must have

  • Integration Test
  • Publish workflow
  • Doc

HTTP Sink Connector

Official Infinyon HTTP Sink connector

Sink Connector

HTTP sink connector reads records from data streaming and generates an HTTP

Supports HTTP/1.0, HTTP/1.1, HTTP/2.0 protocols.

See docs here.

Configuration

Option default type description
method POST String POST, PUT
endpoint - String HTTP URL endpoint
headers - Array<String> Request header(s) "Key:Value" pairs
user-agent "fluvio/http-sink 0.1.0" String Request user-agent

Usage Example

This is an example of simple connector config file:

# config-example.yaml
meta:
  version: 0.1.0
  name: my-http-sink
  type: http-sink
  topic: http-sink-topic
http:
  endpoint: "http://127.0.0.1/post"
  headers:
    - "Authorization: token MySecretToken"
    - "Cache-Control: no-cache"

Transformations

Fluvio HTTP Sink Connector supports Transformations. Records can be modified before sending to endpoint.

The previous example can be extended to add extra transformations to outgoing records:

# config-example.yaml
meta:
  version: 0.1.0
  name: my-http-sink
  type: http-sink
  topic: http-sink-topic
http:
  endpoint: "http://127.0.0.1/post"
  headers:
    - "Authorization: token MySecretToken"
    - "Cache-Control: no-cache"
transforms:
  - uses: infinyon/[email protected]
    with:
      spec:
        - operation: shift
          spec:
            "result": "text"

In this case, additional transformation will be performed before records are sent the http endpoint. A json field called result will be renamed to text.

Read more about JSON to JSON transformations.

Release v0.3.2

  • Check that the version in crates/http-source/Connector.toml has been incremented. If needed increment it.
  • Tag the intended release with the pattern vX.Y.Z matching the version and push it to the repository git push --tags. This will start the Publish Hub wokflow.
  • Search fluvio-website repo for documentation examples and update version in examples and documentation of features.
  • Check that connector has been published fluvio hub connector list

bug: user reports a error trying to connecto to a secure websocket url

2024-03-15T21:27:10.336540Z  WARN http_source: Error connecting to streaming source: "Failed to run WebSocket connection", reconnecting in 20ms.
2024-03-15T21:27:10.361703Z ERROR http_source::websocket_source: WebSocket connection error: URL error: TLS support not compiled in

First look seems like the tungstenite crate needs tls enabled as a feature.

Default user-agent should contain crate's version

Currently, it is a static string fluvio/http-source 0.5.0 that contains an old version. We need to put the current crate's version defined in compile-time to avoid the need to maintain it manually.

Errors in connector log.

Setup Environment

Configuration file:

# quotes-source-connector.yml
apiVersion: 0.1.0
meta:
  version: 0.2.5
  name: http-quotes
  type: http-source
  topic: quotes
http:
  endpoint: https://demo-data.infinyon.com/api/quote
  interval: 1s
  1. Start local cluster
fluvio cluster start
  1. Start a local connector
cdk hub download infinyon/[email protected]
cdk deploy start --ipkg infinyon-http-source-0.2.6.ipkg -c quotes-source-connector.yml  

Problem

After running for about 12 hours I see a lot of errors in the log:

$ cat http-source.log            
2023-12-12T04:56:17.962004Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T05:37:07.126475Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T05:54:01.614032Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T06:12:02.732754Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T06:30:47.463402Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T07:10:04.909691Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T08:01:04.735115Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T08:39:15.852677Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T09:04:49.218565Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T09:28:34.428434Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:19.216624Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:19.231342Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:19.234005Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:19.236210Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:19.238518Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:19.240803Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:19.243003Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:19.245315Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:19.247505Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:19.249707Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:19.251959Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:19.254147Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:19.256410Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:19.258623Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:19.260852Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:19.263080Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:19.265307Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:19.267576Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:20.304207Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:23.304180Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:26.305094Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:29.305348Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:32.303998Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:35.303183Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:38.303221Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:41.303269Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:44.302887Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:47.303504Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:50.303632Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:53.303633Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:56.303241Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T10:36:59.305851Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T12:12:28.068767Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T12:56:28.061001Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T13:46:28.676934Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T14:08:34.840646Z ERROR http_source::source: Request execution failed: Request failed
2023-12-12T15:19:52.014923Z ERROR http_source::source: Request execution failed: Request failed

What is the reason for the error, can we add more information?

Reference secret name in http header

A common use of http headers is to insert authorization tokens, a header should be able to reference a secret by name. There are several ways to accomplish this which need further analysis:

  • full header lines as secret names for the http connector
  • split header name, value config, and allow value to reference secret names
  • provide a sdk method for config parsing substrings of some config items for secret replacement
  • others?

Needs PRD for further spec and options, but one desire by @AJ is to keep some resemblance to curl syntax.

add an option to read data from a websocket source

The http-source-connector can read data in many forms over http protocols to be produced into a fluvio cluster. But it doesn't understand websockets.

Add the option to read websockets to the connector..

Other open-source users have built a nice prototype: https://github.com/mrasputin/fluvio-websocket-source-connector

Integrating the feature with this connector would allow for a standardized way of accessing http sources (e.g. with features alike being able to supply api keys to the external source).

HTTP Connector: Streaming Support

Currently, HTTP Connector uses polling to retrieve contents from HTTP Server. This only covers smaller use cases of HTTP. We need to support Streaming. The challenge for the existing HTTP connector is that each stream output format requires a custom decoder to separate out records. This is where we can use SmartModule to "chunk" records.

Example of Streaming HTTP

  • Kubernetes API Server
  • Wikipedia

Update

  • Integration Test
  • E2E Cloud Test

Source connector `streaming` from server stops processing records after 6 hours

Run the connector for several hours, and it will eventually stop. As this is TCP, I would not expect errors.

Connector configuration file:

apiVersion: 0.1.0
meta:
  version: 0.3.1
  name: car-callout
  type: http-source
  topic: cars 
http:
  endpoint: https://demo-data.infinyon.com/api/car-stream
  method: GET
  stream: true
  delimiter: "\n\n"

Error in the log:

2024-01-31T10:25:09.753239Z ERROR http_source::http_streaming_source: could not read data from http response stream: request or response body error: error reading a body from connection: unexpected EOF during chunk size line

Connector status after failure:

% cdk deploy list
 NAME         STATUS  
 car-callout  Stopped

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.