Stream Processors
Stream processors for the tap service are written in Javascript, and will be automatically bundled by Morio core.
This is the reference documentation, refer to the Tap Stream Processing Guide for a high-level overview.
The stream processor object
Each processor must be exported as an object, which we’ll refer to as a stream processor object.
This object has the following structure:
id
The id property of a stream processor object holds a string that is the ID of
the stream processor. Each stream processor must have a unique ID, as it is
used to bundle the processor code.
const processor = {
id: `my_custom_stream_processor_with_a_unique_id`,
// Rest of the stream processor object
}
export default processor
info
The info property of a stream processor object holds a string that is a
description of the stream processor. It is intended for humans to clarify
the role and purpose of the stream processor.
const processor = {
info: `This stream processor shows how it's done`,
// Rest of the stream processor object
}
export default processor
settings
The settings property of a stream processor object is optional, but it may
hold an object that allows configuration of the stream processor through the
Morio settings.
const processor = {
settings: {
enabled: {
dflt: true,
title: "Enable this stream processor",
type: "list",
list: [
{
val: false,
label: "Do not enable this stream processor (disable)"
},
{
val: true,
label: "Enable this stream processor"
}
]
}
},
// Rest of the stream processor object
}
export default processor
topic
The topic property of a stream processor holds the topic to subscribe to.
This should be a string and takes precendence over topics.
const processor = {
topic: "logs",
// Rest of the stream processor object
}
export default processor
Note that one of either topic or topics is mandatory, as we need at least a
topic to subscribe to.
topics
The topic property of a stream processor holds the topic to subscribe to.
This should be an array of strings.
const processor = {
topics: [ "logs", "audit" ],
// Rest of the stream processor object
}
export default processor
Note that one of either topic or topics is mandatory, as we need at least a
topic to subscribe to.
module
The module property of a stream processor holds the module to subscribe to.
This should be a string and takes precendence over modules.
const processor = {
module: "linux-system",
// Rest of the stream processor object
}
export default processor
If no module or modules is specified, the stream processor will be
subscribed to (data generated by) all modules.
modules
The modules property of a stream processor holds the modules to subscribe to.
This should be an array of strings.
const processor = {
modules: [ "linux-system", "linux-elasticsearch" ],
// Rest of the stream processor object
}
export default processor
If no module or modules is specified, the stream processor will be
subscribed to (data generated by) all modules.
dataset
The dataset property of a stream processor holds the dataset to subscribe to.
This should be a string and takes precendence over datasets.
const processor = {
dataset: "journald",
// Rest of the stream processor object
}
export default processor
If no dataset or datasets is specified, the stream processor will be
subscribed to (data generated by) all datasets.
datasets
The datasets property of a stream processor holds the datasets to subscribe to.
This should be an array of strings.
const processor = {
datasets: [ "diskio", "memory", "load" ],
// Rest of the stream processor object
}
export default processor
If no dataset or datasets is specified, the stream processor will be
subscribed to (data generated by) all datasets.
handler
The handler property of a stream processor holds a Javascript function that
implements the logic of your stream processor.
const processor = {
handler: function(params) {
// do something here
},
// Rest of the stream processor object
}
The handler function will be invoked for every message that matches the
subscription data (topic(s), module(s), dataset(s)).
Refer to the Tap Stream Processing Guide to how to write your own logic.
Folder structure
Stream processors are seeded in Morio. In other words, they are not part of the configuration, and are expected to be loaded from a git repository. You can use your own git repository for your custom stream processors, refer to the seeding guide for details.
For the seeding to function, you should make sure to respect the following rules:
- Morio core will recursively look for
index.mjsfiles - It will import the default export from those files
- If the default export is an array, it will be treated as an array of stream processor objects
- If the default export is an object, it will be treated as a single stream processor object
File structure
A single object as default export
The default export can be a single object like this:
const processor = {
id: `my_custom_stream_processor_with_a_unique_id`,
// Rest of the stream processor object
}
export default processor
An array as default export
You can bundle multiple stream processors in the same file by exporting an array:
const processors = [
{
id: `my_first_custom_stream_processor_with_a_unique_id`,
// Rest of the first stream processor object
},
{
id: `my_second_custom_stream_processor_with_a_unique_id`,
// Rest of the second stream processor object
},
]
export default processors
Exporting via a barrel file
If you have a bunch of processors, a common approach is to keep each in its
own file, and make the index.mjs a barrel file that merely re-exports them:
import processor1 from './my-great-processor.mjs'
import processor2 from './my-even-better-processors.mjs'
import otherProcessor from './this-is-a-test.mjs'
export default [
processor1,
processor2,
otherProcessor
]
By naming your barrel file index.mjs Morio core will pick it up, and follow
the imports. This gives you maximum flexibility on how to structure your code.