Skip to main content

Source and Sink Example

Sources and sinks are used to consume and publish events to external systems.

There are multiple source and sink types, but this example only explains Macrometa source with a stream sink. For more info refer to the Stream Worker Query Guide.

Example

This example creates a source from which a stream consumes JSON messages:

C8DB source to consume `JSON` messages from.
CREATE SOURCE TemperatureStream WITH (type='database', collection='TemperatureStream', collection.type="doc", replication.type="global", map.type='json') (sensorId string, temperature double);

This example creates a sink to log events that arrive from a stream called TemperatureOnlyStream with the temperature attribute of type double:

CREATE SINK TemperatureOnlyStream WITH (type='stream', stream="TemperatureOnlyStream", replication.type="local", map.type='json') (temperature double);

@info(name = 'Simple-selection')
insert into TemperatureOnlyStream
select temperature
from TemperatureStream;

Input

When a JSON message is written to the collection TemperatureStream, it automatically gets mapped to an event in the TemperatureStream stream.

{
"sensorId":"aq-14",
"temperature":35.4
}

To process custom input messages, refer to Sinkmapper in Functions.

Output

After processing, the event arriving at TemperatureOnlyStream will be emitted via c8stream sink.

The message is published to TemperatureOnlyStream as

{"temperature":"35.4"}

To output messages using other message formats, refer to Sourcemapper in Functions.