Skip to content

Create Stream Application

Introduction

Stream applications are declarative specs that define the processing logic to process the events sent to the stream processor. A stream app definition contains the following configurations:

Configuration Description
Stream A logical series of events ordered in time with a uniquely identifiable name, and set of defined attributes with specific data types defining its schema.
Source This consumes data from external sources (such as TCP , Kafka , HTTP , etc) in the form of events, then converts each event (that can be in XML , JSON , binary , etc. format) to a stream event, and passes that to a stream for processing.
Sink This takes events arriving at a stream, maps them to a predefined data format (such as XML , JSON, binary , etc), and publishes them to external endpoints (such as E-mail , TCP , Kafka , HTTP , etc).
Table A structured representation of data stored with a defined schema. Stored data can be backed by In-Memory, or external data stores such as RDBMS, MongoDB, etc. The tables can be accessed and manipulated at runtime.
Executional Element

An executional element can be one of the following:

  • Stateless query: Queries that only consider currently incoming events when generating an output. e.g., filters
  • Stateful query: Queries that consider both currently incoming events as well as past events when generating an output. e.g., windows, sequences, patterns, etc.
  • Partitions: Collections of stream definitions and queries separated from each other within a Stream application for the purpose of processing events in parallel and in isolation

Macrometa provide in-build source, sink and store explained in the later section of this document.

Creating a Stream Application

To create a stream application follow the steps below:

  1. Open the GUI. Click on Stream Apps tab.
  2. Click on New to start defining a new stream application.
  3. Enter a Name as SweetProductionAnalysis or feel free to chose any other name for the stream application.
  4. Enter a Description.
  5. Add the following sample stream application.

    @source(type = 'c8db', collection='SweetProductionData', @map(type='json'))
    define stream SweetProductionStream (name string, amount double);
    
    @sink(type= 'c8streams', stream='ProductionAlertStream', @map(type='json'))
    define stream ProductionAlertStream (name string, amount double);
    
    select *
    from SweetProductionStream
    insert into ProductionAlertStream;
    
  6. Click Save to save the stream app.

  7. Select all the regions to deploy your application in.
  8. Click on Save.

Source

C8Streams

Syntax

@source(type="c8streams", stream.list="<STRING>", replication.type="<STRING>", @map(...)))

Example

@source(type="c8streams", stream.list="OrderStream", replication.type="local", @map(type='json')))
define stream OrderStream(product_id string, quantity integer)

If @source annotation is not provided, c8stream is considered as a the default source. Stream application will use the c8stream with the default query parameters explained in the chart below. In the above example, stream can also be defined as

define stream OrderStream(product_id string, quantity integer)

Query Parameters

Name Description Default Value Possible Data Types Optional
stream.list This specifies the list of streams to which the source must listen. This list can be provided as a set of comma-separated values e.g. stream_one,stream_two STRING No
replication.type Specifies if the replication type of the streams. Possible values can be local and global local STRING Yes

C8DB

Syntax

@source(type = 'c8db', collection="STRING", replication.type="STRING", collection.type="STRING", @map(...))

Example

@source(type = 'c8db', collection='SweetProductionData', @map(type='json'))
define stream SweetProductionStream (name string, amount double);

Query Parameters

Name Description Default Value Possible Data Types Optional
collection This specifies the name of the c8db collection to which the source must listen. STRING No
replication.type Specifies if the replication type of the c8db collection. Possible values can be local and global local STRING Yes
collection.type This specifies the type of the data collection contains. Possible values can be doc and edge. doc STRING Yes

Sink

C8Streams

Syntax

@sink(type="c8streams", stream="<STRING>", replication.type="<STRING>", @map(...)))

Example

@sink(type= 'c8streams', stream='ProductionAlertStream', @map(type='json'))
define stream ProductionAlertStream (name string, amount double);

Query Parameters

Name Description Default Value Possible Data Types Optional
stream The streams to which the C8Stream sink needs to publish events. STRING No
replication.type Specifies if the replication type of the stream. Possible values can be local and global local STRING Yes

Table

C8DB

Syntax

@store(type = 'c8db', collection="STRING", replication.type="STRING", collection.type="STRING", from="STRING", to="STRING")

Example

@store(type = 'c8db', collection='SweetProductionCollection')
define table SweetProductionCollection (name string, amount double);

If @store annotation is not provided, c8db is considered as a the default store. Stream application will use the c8db with the default query parameters explained in the chart below. In the above example, store can also be defined as

define table SweetProductionCollection (name string, amount double);
Name Description Default Value Possible Data Types Optional
collection This specifies the name of the c8db collection to which events must written. STRING No
replication.type Specifies if the replication type of the c8db collection. Possible values can be local and global local STRING Yes
collection.type This specifies the type of the data collection contains. Possible values can be doc and edge. doc STRING Yes
from If collection.type is specified as edge, this field indicates which field to be considered as a source node of the edge. _from STRING Yes
to If collection.type is specified as edge, this field indicates which field to be considered as a destination node of the edge. _to STRING Yes

Tutorials

Following tutorials cover various user scenarios using Macrometa Stream Processing.

Please refer to Reference for additional stream processing examples.