Kafka Stream Processing Guide
Morio exposes a Kafka API so you can utilise your favourite data tools to run stream processing on the data flowing through a Morio collector.
There’s no shortage of streaming data tools so we cannot possibly cover them all. Instead, we will give a high-level overview of how you can get started, followed by a hands-on example in JavaScript.
Connecting to Kafka
Morio exposes the Kafka API on TCP
port 9092
on all broker nodes.
The port uses TLS both for encryption and authentication.
Authentication
Authentication to the Kafka API is done through .
In other words, you need a TLS certificate and key to be able to connect to the
Kafka API. It will accept any certificate that is generated by Morio. So you
need to get a certificate from Morio, either through the UI on the
/tools/certificates/
page, or through the
API.
Access Control
Authentication and authorisation are both based on . In other words, the CN of your certificate will be matched against the broker’s access control list.
By default, Morio ships with an access control list that allows writing data to the stream, but not reading it. So, you will need to add an entry to the broker’s access control list to grant the CN of your certificate access to the topic(s) you want to read from.
To do so, connect to the RedPanda Console at /console/security/acls
.
Here, under the ACLs
tab, you can configure the access control list.
Refer to the RedPanda documentation on Access Control Lists for more details.
Example
As a simple example, let us create a stream processor in JavaScript that will
read data from the metrics
topic and output it.
This is a simple example that only reads data from the Kafka API. It does not handle things like graceful shutdown and so on.
If you are not certain how to build upon this example, you might want to look at our low-code stream processing solution instead.
/*
* We are using the kafkajs client
* This is a dependency you can install with:
* npm install kafkajs
*/
import { Kafka, logLevel } from 'kafkajs'
/*
* This is the function that will end up getting called
* for every message we receive from the Kafka API
*
* When using the Tap service,
* a function like this is the only thing you need to provide.
*
* Refer to the kafkajs docs for all details about the parameters:
* https://kafka.js.org/docs/
*/
function streamProcessor ({ topic, partition, message }) {
/*
* Here we can do whatever we want
* For our example, we just output the data
*/
console.log({
topic,
partition,
offset: message.offset,
value: JSON.parse(message.value.toString()),
})
}
/*
* Instantiate a Kafka client
* The getSettings() method returns all client settings.
* You would need to adapt its return value to your own setup.
*/
const client = new Kafka(getSettings())
/*
* Create a consumer
*/
const consumer = client.consumer({ groupId: getSettings().clientId })
/*
* Connect the consumer to the Kafka API
*/
await consumer.connect()
/*
* Subscribe to the topic(s) of our choice
*/
await consumer.subscribe({ topic: 'logs', fromBeginning: true })
/*
* Now run the consumer,
* and we invoke our function for each message we receive
*/
await consumer.run({ eachMessage: streamProcessor })
/*
* This function returns the client settings.
* It is created as a function only because this allows us to
* place it at the end of the file so that the example starts
* with the relevant code, rather than a wall of text from the
* certificate and key
*/
function getSettings () {
return {
brokers: ['example.morio.it:9092'],
clientId: 'morio-example-stream-processor',
logLevel: logLevel.ERROR,
ssl: {
rejectUnauthorized: false, // In case the Morio CA is not trusted
cert: `-----BEGIN CERTIFICATE-----
MIIFLjCCAxagAwIBAgIRAOgLYt+vwgLxzfPKxgCvEBgwDQYJKoZIhvcNAQELBQAw
gZAxCzAJBgNVBAYTAkJFMREwDwYDVQQIEwhCcnVzc2VsczERMA8GA1UEBxMIQnJ1
c3NlbHMxGTAXBgNVBAoTEEVuZ2luZWVyaW5nIFRlYW0xDTALBgNVBAsTBHRlc3Qx
MTAvBgNVBAMTKE1vcmlvIEludGVybWVkaWF0ZSBDZXJ0aWZpY2F0ZSBBdXRob3Jp
dHkwHhcNMjQxMjEyMDk0MDE1WhcNMjYxMjEzMDk0MTE1WjApMScwJQYDVQQDEx5w
b2MtbW9yaW8tbm9kZTEuY2VydC5ldXJvcGEuZXUwggEiMA0GCSqGSIb3DQEBAQUA
A4IBDwAwggEKAoIBAQCo2BosdyFVCJ5teVkQsb3BUiOYO6hp284gxXigBiqZ0FpD
KsGYUcJKGOSqa1X4CfpGcrH5KR0P4VhHLi2ZeiImvSfbxzoOdtIFM5aM2yY7/gnq
3vxioJb5z11srIXmyxUQLXFdHC3k7gJTJOAkjE6Wgira1behzBjHrP+Fn75UahoU
NHET4nE1VlQl2wFPEe8dXVQfyxfH5eZ769hfIbd1p6Q6QEy3KvCy8ZcW9cf/67Vr
tPqF1om2NlFqN3GOFYzZOpxIJFnlSGaZg5FTIjhPfIBVO30cicqpgDGiWJk43vrq
dRWEHe59mlMw9wh36lRDQSs5Lqhe7/EFrcL8oIFPAgMBAAGjgegwgeUwDgYDVR0P
AQH/BAQDAgWgMB0GA1UdJQQWMBQGCCsGAQUFBwMBBggrBgEFBQcDAjAdBgNVHQ4E
FgQUXZ+9KqGeol7Jga9qlt3+O0drApgwHwYDVR0jBBgwFoAUANEVMe5FF27SKEJ7
RnwQstAEd8MwKQYDVR0RBCIwIIIecG9jLW1vcmlvLW5vZGUxLmNlcnQuZXVyb3Bh
LmV1MEkGDCsGAQQBgqRkxihAAQQ5MDcCAQEEBWFkbWluBCtoQzRxazFpUFIwaVNG
U1Q5ZHhXLVdBS2VvQWNuTGpNaktuMm1oVnR3NWhvMA0GCSqGSIb3DQEBCwUAA4IC
AQBjab7yd4yji0GGETdCGiHNg9tULn4NMnUkVC3GOqoB1uEz0sUgvBDMWmSKLpwh
eIaMCQnPBx7xS7lWsrfSBHNSDQ/ndXlYjSOhgaWpcwBpc8I8EeRZVvsip17bSnSN
/QSJR1lvuNgc6w+1CUyG+t6pbwJc3fxduCFudO9gK/wU4Jo+TTUHc7VkZyetMYJD
CqV05+NWEk3OZDIpaVV5I6URH70Gx7SaIQSlgmH9JIw7W5cBTst2pL9j2uzm021W
8NEV4Um2nh6AxpGbJ0O+y9Q+U0mFp3JPEnh1AV3XQW1WWluyoiY6QeO9nzcZO/8P
JfoPeoog2Yh2V2aojX5mPgl2TWgu4RB2pDzjxKsxRIIJ0b/dZgRlr+fFWfwR3CLt
7K/9WZOt/h/CQon8oi42uCsf6t+ufn2Gd+LKLP8MteUM/FXUdV5mZTvsbCYqGyoK
5lZpZ5mqCnS3Uobh5I+f53JJmUvbJJgGo5xMAq1He4OuNQNyFOkf4Y9OdF8uwtg0
A/hhyOzv1PMIYQj7Avs6AjOLVDjOlvNuRcd9TNqpEDQbmfKx3kT7/E+qpHHg5qwj
5LUEf2wxVuzF3tQdeQdn0SZm6HlMQa9lfC6WQeMmv1vVvwKWGgF4oMhBRBjbQ701
Uwiktf6/7fEf+0B52DarqQ8jjTezfyD9hyEH8XLG+k1yWg==
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIGTjCCBDagAwIBAgIBATANBgkqhkiG9w0BAQsFADCBiDELMAkGA1UEBhMCQkUx
ETAPBgNVBAgTCEJydXNzZWxzMREwDwYDVQQHEwhCcnVzc2VsczEZMBcGA1UEChMQ
RW5naW5lZXJpbmcgVGVhbTENMAsGA1UECxMEdGVzdDEpMCcGA1UEAxMgTW9yaW8g
Um9vdCBDZXJ0aWZpY2F0ZSBBdXRob3JpdHkwHhcNMjQxMjExMTMzMTE0WhcNMjkx
MjExMTMzMTE0WjCBkDELMAkGA1UEBhMCQkUxETAPBgNVBAgTCEJydXNzZWxzMREw
DwYDVQQHEwhCcnVzc2VsczEZMBcGA1UEChMQRW5naW5lZXJpbmcgVGVhbTENMAsG
A1UECxMEdGVzdDExMC8GA1UEAxMoTW9yaW8gSW50ZXJtZWRpYXRlIENlcnRpZmlj
YXRlIEF1dGhvcml0eTCCAiIwDQYJKoZIhvcNAQEBBQADggIPADCCAgoCggIBAKRj
7MpldIIFJP9QhGRhjGMv5BpKrJVyVHAWN6f0aiLAbnMIhpCXYWFlHJptEkm4fEcb
K4Q3gXIhI/c/FEXSyl/Jdb67ja77FotZwOCl0Vvw2W4te3UxIsE6B3l28mYaP8Z+
WcqRtCkpdWddddG8t9NBq5qRhz3ptD98oU7XV1sWI4fy2M14ccIEJ1XC6F/ihrft
IdvyR1lpMbhLjg4tbTwCiHozcxYJuwqFFGu4HoZHpNnJZ9JSR0JdVMeD+1bazp2h
thaIGlNQpxm3cK+2TA7tMd5yCWaBMG76+HNpNYE0tfOH9Ds6Dd8TPxxBQ39L0XSz
C9jARmASzPSQORAMiZFlPh1Mr1W3Mr7vZBLu0oSKsDitUtQ9lRqpcGnOCSpVMza5
gUKt3cbzryxDpjqHob0uCv6XOmk3906esYtzAWzIsSs9lATTu/vEymyzi6G2GUK8
Phinx0hoQhsFrue1eNdYxFIryIzLMJ7lbK25/k500kg/y6eyO75I3VKtUSQKuWD8
lpMQPUqj2S/TbcsxE8qWi7ox3Hyi23q2gtrgwFpzAUdPR/Eupr3ZkCKvrBTJXBBt
pNfUVANCwPoQ2c9KJ3OLfuJASZWILr4cJzDd8VRjLIx5CvL0meVL8BFkoLBnUVvc
rzguKsuwuwtxhrYnCfk7jTM2Lr72KEO5xyj9R/NfAgMBAAGjgbgwgbUwDAYDVR0T
BAUwAwEB/zALBgNVHQ8EBAMCAvQwOwYDVR0lBDQwMgYIKwYBBQUHAwEGCCsGAQUF
BwMCBggrBgEFBQcDAwYIKwYBBQUHAwQGCCsGAQUFBwMIMBEGCWCGSAGG+EIBAQQE
AwIA9zAdBgNVHQ4EFgQUANEVMe5FF27SKEJ7RnwQstAEd8MwKQYDVR0RBCIwIIIe
cG9jLW1vcmlvLW5vZGUxLmNlcnQuZXVyb3BhLmV1MA0GCSqGSIb3DQEBCwUAA4IC
AQARR5BEzWUpdbuK6imu55aYIui/Gzx6miXtt85B0ERLBA0AS+GLJDTK6HumMw8u
OB1JnJofR3mWBB/pDpaAwhSBPtc23vnHY7cW76GuZFVwWRPtbWdt9RVSqi+7AL6I
gBUQX+Wg6KLlJLVXpxZCgx8Rd+wWX7PcfYTW5kBg+n0NhrTg2iswvQvwfOKZQF8E
30gypA1BRWRHhajbp842+LzxLXxau5D+YUQQbu5DfgFXCeoPWWSPDHGSmt6x1ip7
bnSxV1YwmXAxgF8NsEyfBjR7kLg9HB/o4rofIRxu6XzqyrWsvFf/XfkxrI9hYtGx
PbvvJuVCchv6mmwAk5tLPdRmCkjDNB91io1xgk9JGsfPVC7nghwjPgFgQ/KDj183
9hHYhuBYq3u4wzGe9PCCsnv+CXZDeWDIp3fA2KTKTFEveQ9xGFhPMBVRAJueP+39
hr06aF8JgChz8/1CbAm4/TdL3Aw563lIsOKqpWLYz6zfnO5wXt/r8qXthwMkGMd7
aq788nk/i6r00nGi0MNn+a5flMEmqAXrnJYZL3j9lz2eV034ZFzrRlOgpGbIme0m
GDifPxdxehgpV7snfyHagmOOWyRyisZjLiyxJ+LSMbof4otAQwOik1jPLN6A6B4h
fRoTTTmC/qeEFOjHF6lGYWvO4yvuDyqoqHflzXAL7HCzwg==
-----END CERTIFICATE-----`,
key: `-----BEGIN RSA PRIVATE KEY-----
MIIEoQIBAAKCAQEAqNgaLHchVQiebXlZELG9wVIjmDuoadvOIMV4oAYqmdBaQyrB
mFHCShjkqmtV+An6RnKx+SkdD+FYRy4tmXoiJr0n28c6DnbSBTOWjNsmO/4J6t78
YqCW+c9dbKyF5ssVEC1xXRwt5O4CUyTgJIxOloIq2tW3ocwYx6z/hZ++VGoaFDRx
E+JxNVZUJdsBTxHvHV1UH8sXx+Xme+vYXyG3daekOkBMtyrwsvGXFvXH/+u1a7T6
hdaJtjZRajdxjhWM2TqcSCRZ5UhmmYORUyI4T3yAVTt9HInKqYAxoliZON766nUV
hB3ufZpTMPcId+pUQ0ErOS6oXu/xBa3C/KCBTwIDAQABAoIBABWLYaeb2eZ6T5Dj
g5BkRbTF0OHrdt1vHTnESNSOhOAUnHfmnEZd5F9FFuSCBrm8ODHGD9yduOYeWH/Y
rhvf4Dy2NUBPhyJyWfEs+IBntFTCYSyVoHnLSQh+Q8sKpplg/4KNceIyJs8H6ccH
NQyk1KBS4roCvEhKfTz5X6+CxywJGnSRDYRmybjL9S4f6nHeHqD+a9INZU+hNhN4
Ui2eF9i1L2gQieiZBtiOCEe0mcctEIXh9W3yzDHQI5M/pLR+ZycS5YLA2wqxYFX6
oiw/swMzjY8guGKHCOjIYluw7gHPrdJ4gxEVHE4+8uj4SWMDiMGiDnYFEzVDhrz9
Ye6Vme0CgYEA6jKR9H89m8sWVt3ZaqSpWqWNbTtGTexg8XHZAWPCCRlO+U4365A/
f9cbMRwQC/QaueFXFszyQexMxTNTZvL+/bRj3FK/XZYpohm9fn1NJoRn6B5oBi6i
gM4a9906V/b6q7J9hrbyfwXR0zqTza72yWeKL25OwLXuZPZ5UmezIksCgYEAuJAF
RZIb5gkEndAEJIBrx3NDuzlmysZtKimTzqRNb9r5t+jV2pLgvk6IQ5wfp5Kvo5tP
wI0IJ4ELIijdCFIqPR4SaWVpN2fbng3Fu3BuXuURmhN1qpiiQinY4HB66fkHDQta
ltmIJl5ppCJPoGE8xoCWBqR0UwoqAH6xcyORGo0CgYEA4Z8Ab7yZfNqlVeB6kbu6
j+KIImCFdZKxPkoA1s7lYuCn7ZQGWYFpO17fCmvPpcRwbv0BaqzdAteEikLNKZQW
RhKgVeS+CYod59XGcfKC/gV3GO8aJkKNOWvWHU5a493DVuApADyF2AuiPk3jkLbr
d9KxFMq/QfbDiS7OaK4QrIsCgYBfLMXLLqF8asIar56zRBZQcDxJXDyJ7PF2gKJc
OZZFTGLiWAuP/5YuDUlvtB8OxzIT6I+j+BZwaLdZEkGJfPWTNuK/vzAXBlPWmbuS
AVSpUu9UyRwwsS1beUEB3H9GEMl2DJjWma8AOke4AImXZw10P/Md0ci62c73Gl5f
xcOaMQJ/L4aThGq/N8t5aH76H3SRImy1jnn2uhI9cn9YyLaXbf3axonwDChBAU4f
v+ZOMZq/PGoau8kpqz3xRC980cRm46uTxGogPlVV3iTCSHy5Skhj0I8kZeMRdtIV
OuLSWZ3msJey2GjLRTXgHfsEP8b73ki611oOcQNIdOnM4eMoTw==
-----END RSA PRIVATE KEY-----`,
}
}
}