http (Sink)
HTTP sink publishes messages via HTTP or HTTPS protocols using methods such as POST, GET, PUT, and DELETE on formats text and JSON. It can also publish to endpoints protected by basic authentication or OAuth 2.0.
Syntax
CREATE SINK <NAME> WITH (type="http", map.type="<STRING>" publisher.url="<STRING>", basic.auth.username="<STRING>", basic.auth.password="<STRING>", https.truststore.file="<STRING>", https.truststore.password="<STRING>", oauth.username="<STRING>", oauth.password="<STRING>", consumer.key="<STRING>", consumer.secret="<STRING>", token.url="<STRING>", refresh.token="<STRING>", headers="<STRING>", method="<STRING>", socket.idle.timeout="<INT>", chunk.disabled="<BOOL>", ssl.protocol="<STRING>", ssl.verification.disabled="<BOOL>", tls.store.type="<STRING>", ssl.configurations="<STRING>", proxy.host="<STRING>", proxy.port="<STRING>", proxy.username="<STRING>", proxy.password="<STRING>", client.bootstrap.configurations="<STRING>", max.pool.active.connections="<INT>", min.pool.idle.connections="<INT>", max.pool.idle.connections="<INT>", min.evictable.idle.time="<STRING>", time.between.eviction.runs="<STRING>", max.wait.time="<STRING>", test.on.borrow="<BOOL>", test.while.idle="<BOOL>", exhausted.action="<INT>", hostname.verification.enabled="<BOOL>")
Query Parameters
| Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
|---|---|---|---|---|---|
| publisher.url | The URL to which the outgoing events should be published. Examples: http://localhost:8080/endpoint, https://localhost:8080/endpoint | STRING | No | No | |
| basic.auth.username | The username to be included in the authentication header when calling endpoints protected by basic authentication. basic.auth.password property should be also set when using this property. | - | STRING | Yes | No |
| basic.auth.password | The password to be included in the authentication header when calling endpoints protected by basic authentication. basic.auth.username property should be also set when using this property. | - | STRING | Yes | No |
| https.truststore.file | The file path of the client truststore when sending messages through https protocol. | `\${carbon.home}/resources/security/client-truststore.jks` | STRING | Yes | No |
| https.truststore.password | The password for the client-truststore. | gdncarbon | STRING | Yes | No |
| oauth.username | The username to be included in the authentication header when calling endpoints protected by OAuth 2.0. oauth.password property should be also set when using this property. | - | STRING | Yes | No |
| oauth.password | The password to be included in the authentication header when calling endpoints protected by OAuth 2.0. oauth.username property should be also set when using this property. | - | STRING | Yes | No |
| consumer.key | Consumer key used for calling endpoints protected by OAuth 2.0 | - | STRING | Yes | No |
| consumer.secret | Consumer secret used for calling endpoints protected by OAuth 2.0 | - | STRING | Yes | No |
| token.url | Token URL to generate a new access tokens when calling endpoints protected by OAuth 2.0 | - | STRING | Yes | No |
| refresh.token | Refresh token used for generating new access tokens when calling endpoints protected by OAuth 2.0 | - | STRING | Yes | No |
| headers | HTTP request headers in format "'<key>:<value>','<key>:<value>'". When Content-Type header is not provided the system derives the Content-Type based on the provided sink mapper as following: - map.type='json': application/json - map.type='text': plain/text - map.type='keyvalue': application/x-www-form-urlencoded - For all other cases system defaults to plain/text Also the Content-Length header need not to be provided, as the system automatically defines it by calculating the size of the payload. | Content-Type and Content-Length headers | STRING | Yes | No |
| method | The HTTP method used for calling the endpoint. | POST | STRING | Yes | No |
| socket.idle.timeout | Socket timeout in millis. | 6000 | INT | Yes | No |
| chunk.disabled | Disable chunked transfer encoding. | false | BOOL | Yes | No |
| ssl.protocol | SSL/TLS protocol. | TLS | STRING | Yes | No |
| ssl.verification.disabled | Disable SSL verification. | false | BOOL | Yes | No |
| tls.store.type | TLS store type. | JKS | STRING | Yes | No |
| ssl.configurations | SSL/TSL configurations in format "'<key>:<value>','<key>:<value>'". Some supported parameters: - SSL/TLS protocols: 'sslEnabledProtocols:TLSv1.1,TLSv1.2' - List of ciphers: 'ciphers:TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256' - Enable session creation: 'client.enable.session.creation:true' - Supported server names: 'server.suported.server.names:server' - Add HTTP SNIMatcher: 'server.supported.snimatchers:SNIMatcher' | - | STRING | Yes | No |
| proxy.host | Proxy server host | - | STRING | Yes | No |
| proxy.port | Proxy server port | - | STRING | Yes | No |
| proxy.username | Proxy server username | - | STRING | Yes | No |
| proxy.password | Proxy server password | - | STRING | Yes | No |
| client.bootstrap.configurations | Client bootstrap configurations in format "'<key>:<value>','<key>:<value>'". Some supported configurations : - Client connect timeout in millis: 'client.bootstrap.connect.timeout:15000' - Client socket timeout in seconds: 'client.bootstrap.socket.timeout:15' - Client socket reuse: 'client.bootstrap.socket.reuse:true' - Enable TCP no delay: 'client.bootstrap.nodelay:true' - Enable client keep alive: 'client.bootstrap.keepalive:true' - Send buffer size: 'client.bootstrap.sendbuffersize:1048576' - Receive buffer size: 'client.bootstrap.recievebuffersize:1048576' | - | STRING | Yes | No |
| max.pool.active.connections | Maximum possible number of active connection per client pool. | -1 | INT | Yes | No |
| min.pool.idle.connections | Minimum number of idle connections that can exist per client pool. | 0 | INT | Yes | No |
| max.pool.idle.connections | Maximum number of idle connections that can exist per client pool. | 100 | INT | Yes | No |
| min.evictable.idle.time | Minimum time (in millis) a connection may sit idle in the client pool before it become eligible for eviction. | 300000 | STRING | Yes | No |
| time.between.eviction.runs | Time between two eviction operations (in millis) on the client pool. | 30000 | STRING | Yes | No |
| max.wait.time | The maximum time (in millis) the pool will wait (when there are no available connections) for a connection to be returned to the pool. | 60000 | STRING | Yes | No |
| test.on.borrow | Enable connections to be validated before being borrowed from the client pool. | true | BOOL | Yes | No |
| test.while.idle | Enable connections to be validated during the eviction operation (if any). | true | BOOL | Yes | No |
| exhausted.action | Action that should be taken when the maximum number of active connections are being used. This action should be indicated as an int and possible action values are following. 0 - Fail the request. 1 - Block the request, until a connection returns to the pool. 2 - Grow the connection pool size. | 1 (Block when exhausted) | INT | Yes | No |
| hostname.verification.enabled | Enable hostname verification. | true | BOOL | Yes | No |
System Parameters
| Name | Description | Default Value | Possible Parameters |
|---|---|---|---|
| clientBootstrapClientGroupSize | Number of client threads to perform non-blocking read and write to one or more channels. | (Number of available processors) * 2 | Any positive integer |
| clientBootstrapBossGroupSize | Number of boss threads to accept incoming connections. | Number of available processors | Any positive integer |
| clientBootstrapWorkerGroupSize | Number of worker threads to accept the connections from boss threads and perform non-blocking read and write from one or more channels. | (Number of available processors) * 2 | Any positive integer |
| trustStoreLocation | The default truststore file path. | `\${carbon.home}/resources/security/client-truststore.jks` | Path to client truststore `.jks` file |
| trustStorePassword | The default truststore password. | gdncarbon | Truststore password as string |
Example 1
CREATE SINK StockStream WITH (type = 'http', map.type = 'json', publisher.url = 'http://stocks.com/stocks') (symbol string, price float, volume long);
Events arriving on the StockStream will be published to the HTTP endpoint http://stocks.com/stocks using POST method with Content-Type application/json by converting those events to the default JSON format:
{
"event": {
"symbol": "FB",
"price": 24.5,
"volume": 5000
}
}
Example 2
CREATE SINK FooStream WITH (type='http', map.type='json', publisher.url = 'http://localhost:8009/foo', client.bootstrap.configurations = "'client.bootstrap.socket.timeout:20'", max.pool.active.connections = '1', headers = "{{headers}}", map.payload="""<stock>{{payloadBody}}</stock>""") (payloadBody string, headers string);
Events arriving on FooStream will be published to the HTTP endpoint http://localhost:8009/foo using POST method with Content-Type application/json and setting payloadBody and header attribute values. If the payloadBody contains
{
"symbol": "gdn",
"price": 55.6,
"volume": 100
}
and header contains 'topic:foobar' values, then the system will generate an output with the body:
{
"stock": {
"symbol": "gdn",
"price": 55.6,
"volume": 100
}
}
and HTTP headers: Content-Length:xxx, Content-Location:'xxx', Content-Type:'application/json', HTTP_METHOD:'POST'