Rolling FIFO data stream to a groov data store table


#1

Sometimes there is a single point of data coming into Node-RED that I want displayed on a groov page, but I find a single gadget isn’t quite enough – I want to keep a rolling history of values where the First In is the First Out (FIFO), and I specifically don’t want any duplication of data in my history either.

In this post I’ll show the flow and JavaScript I used to do get a rolling feed of ten strings from a twitter feed, but this could work with almost any other data source as well.

My data is a tweet string in msg.payload that comes from the raw public tweet stream filtered for the hastag #IoT, I get at most one of these per second thanks to a delay node in rate limit mode that discards intermediate messages. At this point your data could come from anywhere.
Wherever your data is injected from, use that flow to pick up the entire data store table, in my case the tag tweets:

When I use the groov read node to pick up the table I make sure to grab the full table length of 10 and put the array in a new message property (msg.tweets) so that the string sitting in msg.payload is not written over.

Once I have the table I check to see if the tweet is in English. Here I could use the or operator (||) to allow other languages, scan the location of the tweet, or any other number of filter checks before it gets compared to the list.
This shows some simple filtering that can be done before adding the item in msg.payload to the list.

if(msg.lang != ‘en’) return null;

If the flow is not ended by return null then the tweet text is in English, so I make a copy of the list of tweets currently in groov using .slice(), and make a copy of the tweet text to keep the rest of the code generic.

var list = msg.tweets.slice();
var newItem = msg.tweet.text;

After that I check to see if the new item has a valid index in the list, if so it is already being displayed and I should end the flow.
This statement handles any potential duplication

if(list.indexOf(newItem) >= 0) return null;

At this point in the code I have accepted the new data since it passed the filters and is new to the list, so I can roll down the previous values to add the new item to the top.
Copy each item down one index, starting at the bottom of the list and working backwards, then assign the new item at the start of the array.
This for loop handles the “rolling” part of the flow.

for(i = list.length-1; i > 0; i--)
    list[i] = list[i-1];
list[0] = newItem;

Now the new list should be written to the groov table, which I do using an array of dynamic write objects.

var data = [];
for(i = 0; i < list.length; i++)
    data.push({ tableStartIndex : i, value : list[i] });
return { payload : data };

All together this gives me a nice, relatively short function block to handle all of the features I need:

I have an extra delay after this function node to make sure I have time to read each new tweet that comes in, your application may or may not call for it so use your best judgement.

Also Since the #IoT public tweet stream is quite lively I also drop intermediate messages since that could turn into a very obnoxious message queue. If I were to set the node to a lower traffic feed, like tweets from people I follow, I would likely untick this box or not even bother with the delay.

Once the message has been delayed if necessary I can split the payload with a default split node and fire each of the 9 old tweets + the one new tweet at the write node set to push the data in msg.payload.value to my data store table tweets using the index from the dynamic write objects.

Now I can deploy the flow and watch the stream roll by in groov:

Happy coding!

[{“id”:“c5958a70.d8b948”,“type”:“debug”,“z”:“b0835392.0631c”,“name”:"",“active”:true,“console”:“false”,“complete”:“true”,“x”:750,“y”:420,“wires”:[]},{“id”:“382b60f4.48bf6”,“type”:“groov-read-ds”,“z”:“b0835392.0631c”,“dataStore”:“41793f8c.cbbde”,“tagName”:“tweets”,“tableStartIndex”:"",“tableLength”:“10”,“value”:“tweets”,“valueType”:“msg”,“topic”:"",“topicType”:“none”,“name”:"",“x”:450,“y”:420,“wires”:[[“1b74fefe.795001”]]},{“id”:“1b74fefe.795001”,“type”:“function”,“z”:“b0835392.0631c”,“name”:“add tweet?”,“func”:“if(msg.lang != ‘en’) return null; // end flow if not certain language(s)\nvar list = msg.tweets.slice(); // copy array into ‘list’\nvar newItem = msg.tweet.text; // new item being checked\n// if the new item has a valid list index it must already be in the list, so end the flow here\nif(list.indexOf(newItem) >= 0) return null;\n\n// otherwise shuffle everything down one\nfor(i = list.length-1; i > 0; i–)\n list[i] = list[i-1];\nlist[0] = newItem; // put the new tweet at the top of the list\n// the new tweet list will need to be rewritten\nvar data = []; // array of dynamic write objects\nfor(i = 0; i < list.length; i++) // build a write object for each table index\n data.push({ tableStartIndex : i, value : list[i] });\nreturn { payload : data }; // data array becomes the new payload. to be split.”,“outputs”:1,“noerr”:0,“x”:590,“y”:420,“wires”:[[“c5958a70.d8b948”,“83c94f38.43bb”]]},{“id”:“bfe7d036.70e”,“type”:“split”,“z”:“b0835392.0631c”,“name”:"",“splt”:"\n",“spltType”:“str”,“arraySplt”:1,“arraySpltType”:“len”,“stream”:false,“addname”:"",“x”:750,“y”:460,“wires”:[[“1d1ad5db.39789a”]]},{“id”:“1d1ad5db.39789a”,“type”:“groov-write-ds”,“z”:“b0835392.0631c”,“dataStore”:“41793f8c.cbbde”,“tagName”:“tweets”,“tableStartIndex”:"",“value”:“payload.value”,“valueType”:“msg”,“name”:"",“x”:870,“y”:460,“wires”:[[]]},{“id”:“ea717d06.45125”,“type”:“delay”,“z”:“b0835392.0631c”,“name”:"",“pauseType”:“rate”,“timeout”:“5”,“timeoutUnits”:“seconds”,“rate”:“1”,“nbRateUnits”:“1”,“rateUnits”:“second”,“randomFirst”:“1”,“randomLast”:“5”,“randomUnits”:“seconds”,“drop”:true,“x”:470,“y”:380,“wires”:[[“382b60f4.48bf6”]]},{“id”:“83c94f38.43bb”,“type”:“delay”,“z”:“b0835392.0631c”,“name”:"",“pauseType”:“rate”,“timeout”:“5”,“timeoutUnits”:“seconds”,“rate”:“1”,“nbRateUnits”:“10”,“rateUnits”:“second”,“randomFirst”:“1”,“randomLast”:“5”,“randomUnits”:“seconds”,“drop”:true,“x”:600,“y”:460,“wires”:[[“bfe7d036.70e”]]},{“id”:“53591c1a.9761a4”,“type”:“twitter in”,“z”:“b0835392.0631c”,“twitter”:"",“tags”:"#IoT",“user”:“false”,“name”:"",“topic”:“tweets”,“inputs”:1,“x”:330,“y”:380,“wires”:[[“ea717d06.45125”]]},{“id”:“41793f8c.cbbde”,“type”:“groov-data-store”,“z”:"",“project”:“5c31575e.43ac48”,“dsName”:“NodeRED”},{“id”:“5c31575e.43ac48”,“type”:“groov-project”,“z”:"",“address”:“localhost”}]