Skip to main content

Stream Processors

Stream processors for the tap service are written in Javascript, and will be automatically bundled by Morio core.

tip

This is the reference documentation, refer to the Tap Stream Processing Guide for a high-level overview.

The stream processor object

Each processor must be exported as an object, which we’ll refer to as a stream processor object.

This object has the following structure:

id

The id property of a stream processor object holds a string that is the ID of the stream processor. Each stream processor must have a unique ID, as it is used to bundle the processor code.

processor.id
const processor = {
id: `my_custom_stream_processor_with_a_unique_id`,
// Rest of the stream processor object
}

export default processor

info

The info property of a stream processor object holds a string that is a description of the stream processor. It is intended for humans to clarify the role and purpose of the stream processor.

processor.info
const processor = {
info: `This stream processor shows how it's done`,
// Rest of the stream processor object
}

export default processor

settings

The settings property of a stream processor object is optional, but it may hold an object that allows configuration of the stream processor through the Morio settings.

processor.info
const processor = {
settings: {
enabled: {
dflt: true,
title: "Enable this stream processor",
type: "list",
list: [
{
val: false,
label: "Do not enable this stream processor (disable)"
},
{
val: true,
label: "Enable this stream processor"
}
]
}
},
// Rest of the stream processor object
}

export default processor

topic

The topic property of a stream processor holds the topic to subscribe to. This should be a string and takes precendence over topics.

processor.topic
const processor = {
topic: "logs",
// Rest of the stream processor object
}

export default processor
A topic is mandatory

Note that one of either topic or topics is mandatory, as we need at least a topic to subscribe to.

topics

The topic property of a stream processor holds the topic to subscribe to. This should be an array of strings.

processor.topics
const processor = {
topics: [ "logs", "audit" ],
// Rest of the stream processor object
}

export default processor
A topic is mandatory

Note that one of either topic or topics is mandatory, as we need at least a topic to subscribe to.

module

The module property of a stream processor holds the module to subscribe to. This should be a string and takes precendence over modules.

processor.module
const processor = {
module: "linux-system",
// Rest of the stream processor object
}

export default processor
A module is optional

If no module or modules is specified, the stream processor will be subscribed to (data generated by) all modules.

modules

The modules property of a stream processor holds the modules to subscribe to. This should be an array of strings.

processor.modules
const processor = {
modules: [ "linux-system", "linux-elasticsearch" ],
// Rest of the stream processor object
}

export default processor
A module is optional

If no module or modules is specified, the stream processor will be subscribed to (data generated by) all modules.

dataset

The dataset property of a stream processor holds the dataset to subscribe to. This should be a string and takes precendence over datasets.

processor.dataset
const processor = {
dataset: "journald",
// Rest of the stream processor object
}

export default processor
A dataset is optional

If no dataset or datasets is specified, the stream processor will be subscribed to (data generated by) all datasets.

datasets

The datasets property of a stream processor holds the datasets to subscribe to. This should be an array of strings.

processor.datasets
const processor = {
datasets: [ "diskio", "memory", "load" ],
// Rest of the stream processor object
}

export default processor
A dataset is optional

If no dataset or datasets is specified, the stream processor will be subscribed to (data generated by) all datasets.

handler

The handler property of a stream processor holds a Javascript function that implements the logic of your stream processor.

processor.handler
const processor = {
handler: function(params) {
// do something here
},
// Rest of the stream processor object
}

The handler function will be invoked for every message that matches the subscription data (topic(s), module(s), dataset(s)).

tip

Refer to the Tap Stream Processing Guide to how to write your own logic.

Folder structure

Stream processors are seeded in Morio. In other words, they are not part of the configuration, and are expected to be loaded from a git repository. You can use your own git repository for your custom stream processors, refer to the seeding guide for details.

For the seeding to function, you should make sure to respect the following rules:

  • Morio core will recursively look for index.mjs files
  • It will import the default export from those files
  • If the default export is an array, it will be treated as an array of stream processor objects
  • If the default export is an object, it will be treated as a single stream processor object

File structure

A single object as default export

The default export can be a single object like this:

Single object as default export
const processor = {
id: `my_custom_stream_processor_with_a_unique_id`,
// Rest of the stream processor object
}

export default processor

An array as default export

You can bundle multiple stream processors in the same file by exporting an array:

Single object as default export
const processors = [
{
id: `my_first_custom_stream_processor_with_a_unique_id`,
// Rest of the first stream processor object
},
{
id: `my_second_custom_stream_processor_with_a_unique_id`,
// Rest of the second stream processor object
},
]

export default processors

Exporting via a barrel file

If you have a bunch of processors, a common approach is to keep each in its own file, and make the index.mjs a barrel file that merely re-exports them:

Using a barrel file
import processor1 from './my-great-processor.mjs'
import processor2 from './my-even-better-processors.mjs'
import otherProcessor from './this-is-a-test.mjs'

export default [
processor1,
processor2,
otherProcessor
]

By naming your barrel file index.mjs Morio core will pick it up, and follow the imports. This gives you maximum flexibility on how to structure your code.