A library allowing one to manually create, update, checkpoint or retrieve information on a bot.
Helpful methods for interacting with RStreams' DynamoDB tables.
For convenience, a re-export of the popular event-stream library.
An export of the super popular split2 library.
It's used to turn events in the pipeline into a set of stringified events where each event is separated by a character, typically newline. We use it to make JSON lines files with a single pipeline step that is then followed by a step that writes it to an S3 file.
This creates a pipeline step to tell the SDK to micro-batch events received in one pipeline step before sending them to the next pipeline step. It's useful to control how many events arrive all at once, roughly, to the next pipeline step. It can be helpful for a pipeline step to receive a micro-batch of events, say 100 at a time, instead of 1 at a time to leverage economies of scale when writing to a database, e.g. doing one query to the database to get 100 records back all at once instead of having to do a query to the database for each event as it comes in.
The type of the data being batched from the previous pipeline step before sending to the next pipeline step
If a number, then this is how many events to batch up before sending along. If BatchOptions
then
this is fine-grained control to ensure events keep flowing smoothly whether there are a few or many at a given moment.
The pipeline step that is ready to be used in a pipeline
Creates a pipeline step that will checkpoint and then pass the events on to the next step in the pipeline.
Options for when to checkpoint.
Checkpoint after this number of records or as soon as the time
condition is met if used and happens sooner.
Checkpoint after this amount of time or as soon as the records
condition is met if used and happens sooner.
Note, this type is any one of the valid durations the Moment JS library
can take: Duration | number | string | FromTo | DurationInputObject.
The pipeline step that is ready to be used in a pipeline
This creates a pipeline step that takes an event, logs it and then passes the event on to the next pipeline step. The log will include the event ID of the event, if it's present. This is helpful to get visibility into the pipeline.
The type of the event that flows in, gets logged and then flows unchanged to the next pipeline step.
If present, log statements are prefixed with this string.
If present, only log every Nth event that flows through where N is records
.
The pipeline step that is ready to be used in a pipeline
This creates a pipeline step that takes an event, logs it and then passes the event on to the next pipeline step. The log will include the event ID of the event, if it's present. This is helpful to get visibility into the pipeline.
The type of the event that flows in, gets logged and then flows unchanged to the next pipeline step.
If present, only log every Nth event that flows through where N is records
.
The pipeline step that is ready to be used in a pipeline
todo document: what this functon does. Creates Correlation form read events
Creates a pipeline step that can act as a noop sink.
Sometimes you don't care to push data anywhere when you have a pipeline, but you need the fine-grained control of making your own pipeline. When that's the case, use this to create a final pipeline step, a sink, to end your pipeline.
Pipelines must have a sink or data won't flow through the pipeline since Node streams pull data starting with the sink who asks the previous pipeline step for data and then that previous step asks the one before it for data and so on. So, no sink means no data flows. This gives you a noop sink.
The type of the data sent into this final pipeline step
If a string, logs events that come in, prefixing the log statement with the stream. If this is true, logs the event. Otherwise, does nothing.
The pipeline step that is ready to be used in a pipeline
This is a callback-based version of the RStreamsSdk.enrichEvents
function and should no longer be used.
Callback-based API flavors will be phased out over time.
It reads events from one queue and writes them to another queue. Put another way,
an enrich operation reads events from a source inQueue
and then writes them to a destination outQueue
,
allowing for side effects or transformation in the process.
The EnrichOptions.transform
function
is called when events are retrieved from the source queue so you can transform them and send them to the
destination queue by calling the callback in the transform function. The callback here as the second argument
of this function is meant to be called when all enriching is done on all events (right before it closes down the stream),
allowing you to do clean up like closing a DB connection or something.
The type of the data event retrieved from the source queue
The type of the data event that is sent to the destination queue
The details of how to enrich and the function that does the work to enrich, either the batched or not batched version. The batched version will batch up requests to your transform function and pass it an array instead of a single object.
A function called when all events have been processed
Helper function to turn a timestamp into an RStreams event ID.
The timestamp you want to turn into an RStreams event ID which can be anything used to construct a Moment object
Specify the granularity of the event ID, maybe just year/month or year/month/hour, etc.
The generated event ID
Helper function to turn a an RStreams event ID into a timestamp.
The event ID to turn into an epoch timestamp
The timestamp as a time since the epoch
This creates a pipeline step that will parse events from a CSV file and send them to the next step. Underneath the covers it uses the popular fast-csv node library.
List of fields to transform | true builds the header list dynmaically
fastCSV options https://c2fo.github.io/fast-csv/docs/parsing/options
The pipeline step that is ready to be used in a pipeline
Create a pipeline step that reads from the RStreams bus instance queue inQueue
doing so
as bot botId
.
The type of data that will be read from the queue.
The bot to read as
The queue to read from
The options on how to read from this queue
The pipeline step that is ready to be used in a pipeline
This creates a pipeline step that can act as the first step in a pipeline, the source, which reads data from an S3 file.
What to read from.
The name of the S3 bucket to read from
The name of the file in the bucket to read from
Read from a specific range in the file. This is a string that must look like this:
bytes=<startingByteoffset>-<endingByteOffset>
where <startingByteoffset>
is the start position to read from
and <endingByteOffset>
is the ending position to read from, exclusive.
The pipeline step that is ready to be used in a pipeline
Creates a pipeline step that uncompresses the data that flows through it.
This is an export of the zlib
libraries createGunzip
function which is used to uncompress
content as it moves through a pipeline.
The pipeline step that is ready to be used in a pipeline
Creates a pipeline step that compresses the data that flows through it.
This is an export of the zlib
libraries createGzip
function which is used to compress
content as it moves through a pipeline.
The pipeline step that is ready to be used in a pipeline
The pipeline step that joins an external data source with events
This creates a pipeline step that acts as the last step of the pipeline, the sink, writing events sent to the pipeline step to the queue specified.
The type of the data received by the pipeline step
For events that don't specify the bot to act as, this default is used. It is the bot to act as when writing, events will be marked as written by this bot. If not provided, each event must include the id of the bot to write the event as.
For events that don't specify the queue to write to, this default is used. It is the queue into which events will be written. If not provided, each event must include the queue to write the event to.
An object that contains config values that control the flow of events to outQueue
Creates a pipeline step that will log events as they pass through which can be helpful for debugging in between streaming operations.
The type of the data that flows through the step to be logged
If provided, this prefix is included with each log
The pipeline step that is ready to be used in a pipeline
This is a callback-based version of the RStreamsSdk.offloadEvents
function and should no longer be used.
Callback-based API flavors will be phased out over time.
It reads events from a queue to do general processing (such as write to an external DB). It's called offload because it is commonly used to process events and offload them to external resources such as ElasticSearch or other databases that are off of the RStreams Bus.
It reads from the queue specified in opts
and then calls the opts.transform
function passing in the
events retrieved so they may be processed.
The type of the data read from the RStreams bus queue
A pipeline step that will split and parse JSON lines, turning them into events.
The type of object produced from parsing the JSON objects.
If true and there's a parse error, the error and the JSON line that couldn't be parsed is skipped. Defaults to false.
The pipeline step that is ready to be used in a pipeline
This creates a pipeline step that allows events to move through the step. This is only used in special cases and most will not have occasion to use this.
I need to preread a database to start generating data. But I need to give you something you can pipe to. So, I give you a passthrough. It's used if you are the source of you want to put a sink in the middle of a pipe. So, writing to an S3 file is a sink step but we do it in the middle of the stream because we generate many s3 files. We pipe it to S3 and then we pipe the results of that into a pass through.
The options for transforming.
The pipeline step that is ready to be used in a pipeline
A callback-based version of pipeAsync
. Creates a pipeline of steps where the first step produces the data and then
it flows to the next step and so on. The first step is the source, producing the content, the final step is the sink.
The type definitions make this look daunting. It's not. It's merely a set of pipeline steps in a series where the first step in the pipeline is the source that produces content, perhaps by reading it from a queue of the bus, and then the data is sent to the next step and so on until the final step, the sink, gets the data.
The reason the types look so complicated is because there must be a specific type for the exact sequence of possible pipeline steps that one might chain together. The different parameterized types, T1/T2/etc., represent the data type of the data events that are produced and are modified and move down through the pipe.
Note that developers don't really need to understand hardly any of this. Developers need only ask what type
of pipe they want to create. Do I want a pipe that just reads from a queue and doesn't send it anywhere else?
Well, that's an RStreamsSdk.offload
pipe. Do you want to read from one queue, do some processing and/or
transformation and/or filtering and then write the result to another queue? Well that's an RStreamsSdk.enrich
or
RStreamsSdk.enrichEvents
. With these, you don't have to construct a pipe yourself as these helper methods
craft one purpose-built for you.
If you need to do something more involved then this method is for you. Even then, the SDK dramatically simplifies things so you don't have to craft your own pipeline steps. Instead, you simply call an SDK function to create an instance of a pipeline step that does what you want it to.
A callback-based version of pipeAsync
. Creates a pipeline of steps where the first step produces the data and then
it flows to the next step and so on. The first step is the source, producing the content, the final step is the sink.
The type definitions make this look daunting. It's not. It's merely a set of pipeline steps in a series where the first step in the pipeline is the source that produces content, perhaps by reading it from a queue of the bus, and then the data is sent to the next step and so on until the final step, the sink, gets the data.
The reason the types look so complicated is because there must be a specific type for the exact sequence of possible pipeline steps that one might chain together. The different parameterized types, T1/T2/etc., represent the data type of the data events that are produced and are modified and move down through the pipe.
Note that developers don't really need to understand hardly any of this. Developers need only ask what type
of pipe they want to create. Do I want a pipe that just reads from a queue and doesn't send it anywhere else?
Well, that's an RStreamsSdk.offload
pipe. Do you want to read from one queue, do some processing and/or
transformation and/or filtering and then write the result to another queue? Well that's an RStreamsSdk.enrich
or
RStreamsSdk.enrichEvents
. With these, you don't have to construct a pipe yourself as these helper methods
craft one purpose-built for you.
If you need to do something more involved then this method is for you. Even then, the SDK dramatically simplifies things so you don't have to craft your own pipeline steps. Instead, you simply call an SDK function to create an instance of a pipeline step that does what you want it to.
A callback-based version of pipeAsync
. Creates a pipeline of steps where the first step produces the data and then
it flows to the next step and so on. The first step is the source, producing the content, the final step is the sink.
The type definitions make this look daunting. It's not. It's merely a set of pipeline steps in a series where the first step in the pipeline is the source that produces content, perhaps by reading it from a queue of the bus, and then the data is sent to the next step and so on until the final step, the sink, gets the data.
The reason the types look so complicated is because there must be a specific type for the exact sequence of possible pipeline steps that one might chain together. The different parameterized types, T1/T2/etc., represent the data type of the data events that are produced and are modified and move down through the pipe.
Note that developers don't really need to understand hardly any of this. Developers need only ask what type
of pipe they want to create. Do I want a pipe that just reads from a queue and doesn't send it anywhere else?
Well, that's an RStreamsSdk.offload
pipe. Do you want to read from one queue, do some processing and/or
transformation and/or filtering and then write the result to another queue? Well that's an RStreamsSdk.enrich
or
RStreamsSdk.enrichEvents
. With these, you don't have to construct a pipe yourself as these helper methods
craft one purpose-built for you.
If you need to do something more involved then this method is for you. Even then, the SDK dramatically simplifies things so you don't have to craft your own pipeline steps. Instead, you simply call an SDK function to create an instance of a pipeline step that does what you want it to.
A callback-based version of pipeAsync
. Creates a pipeline of steps where the first step produces the data and then
it flows to the next step and so on. The first step is the source, producing the content, the final step is the sink.
The type definitions make this look daunting. It's not. It's merely a set of pipeline steps in a series where the first step in the pipeline is the source that produces content, perhaps by reading it from a queue of the bus, and then the data is sent to the next step and so on until the final step, the sink, gets the data.
The reason the types look so complicated is because there must be a specific type for the exact sequence of possible pipeline steps that one might chain together. The different parameterized types, T1/T2/etc., represent the data type of the data events that are produced and are modified and move down through the pipe.
Note that developers don't really need to understand hardly any of this. Developers need only ask what type
of pipe they want to create. Do I want a pipe that just reads from a queue and doesn't send it anywhere else?
Well, that's an RStreamsSdk.offload
pipe. Do you want to read from one queue, do some processing and/or
transformation and/or filtering and then write the result to another queue? Well that's an RStreamsSdk.enrich
or
RStreamsSdk.enrichEvents
. With these, you don't have to construct a pipe yourself as these helper methods
craft one purpose-built for you.
If you need to do something more involved then this method is for you. Even then, the SDK dramatically simplifies things so you don't have to craft your own pipeline steps. Instead, you simply call an SDK function to create an instance of a pipeline step that does what you want it to.
A callback-based version of pipeAsync
. Creates a pipeline of steps where the first step produces the data and then
it flows to the next step and so on. The first step is the source, producing the content, the final step is the sink.
The type definitions make this look daunting. It's not. It's merely a set of pipeline steps in a series where the first step in the pipeline is the source that produces content, perhaps by reading it from a queue of the bus, and then the data is sent to the next step and so on until the final step, the sink, gets the data.
The reason the types look so complicated is because there must be a specific type for the exact sequence of possible pipeline steps that one might chain together. The different parameterized types, T1/T2/etc., represent the data type of the data events that are produced and are modified and move down through the pipe.
Note that developers don't really need to understand hardly any of this. Developers need only ask what type
of pipe they want to create. Do I want a pipe that just reads from a queue and doesn't send it anywhere else?
Well, that's an RStreamsSdk.offload
pipe. Do you want to read from one queue, do some processing and/or
transformation and/or filtering and then write the result to another queue? Well that's an RStreamsSdk.enrich
or
RStreamsSdk.enrichEvents
. With these, you don't have to construct a pipe yourself as these helper methods
craft one purpose-built for you.
If you need to do something more involved then this method is for you. Even then, the SDK dramatically simplifies things so you don't have to craft your own pipeline steps. Instead, you simply call an SDK function to create an instance of a pipeline step that does what you want it to.
A callback-based version of pipeAsync
. Creates a pipeline of steps where the first step produces the data and then
it flows to the next step and so on. The first step is the source, producing the content, the final step is the sink.
The type definitions make this look daunting. It's not. It's merely a set of pipeline steps in a series where the first step in the pipeline is the source that produces content, perhaps by reading it from a queue of the bus, and then the data is sent to the next step and so on until the final step, the sink, gets the data.
The reason the types look so complicated is because there must be a specific type for the exact sequence of possible pipeline steps that one might chain together. The different parameterized types, T1/T2/etc., represent the data type of the data events that are produced and are modified and move down through the pipe.
Note that developers don't really need to understand hardly any of this. Developers need only ask what type
of pipe they want to create. Do I want a pipe that just reads from a queue and doesn't send it anywhere else?
Well, that's an RStreamsSdk.offload
pipe. Do you want to read from one queue, do some processing and/or
transformation and/or filtering and then write the result to another queue? Well that's an RStreamsSdk.enrich
or
RStreamsSdk.enrichEvents
. With these, you don't have to construct a pipe yourself as these helper methods
craft one purpose-built for you.
If you need to do something more involved then this method is for you. Even then, the SDK dramatically simplifies things so you don't have to craft your own pipeline steps. Instead, you simply call an SDK function to create an instance of a pipeline step that does what you want it to.
An async/await friendly version of pipe
. Reference the docs there.
An async/await friendly version of pipe
. Reference the docs there.
An async/await friendly version of pipe
. Reference the docs there.
An async/await friendly version of pipe
. Reference the docs there.
An async/await friendly version of pipe
. Reference the docs there.
An async/await friendly version of pipe
. Reference the docs there.
This is very advanced functionality that likely won't be used very often. It takes a list of pipeline steps and turns them into a single Transform stream step. If you have to ask why you'd want to do this, you probably don't need to know to do this.
This is very advanced functionality that likely won't be used very often. It takes a list of pipeline steps and turns them into a single Transform stream step. If you have to ask why you'd want to do this, you probably don't need to know to do this.
This is very advanced functionality that likely won't be used very often. It takes a list of pipeline steps and turns them into a single Transform stream step. If you have to ask why you'd want to do this, you probably don't need to know to do this.
This is very advanced functionality that likely won't be used very often. It takes a list of pipeline steps and turns them into a single Transform stream step. If you have to ask why you'd want to do this, you probably don't need to know to do this.
This is very advanced functionality that likely won't be used very often. It takes a list of pipeline steps and turns them into a single Transform stream step. If you have to ask why you'd want to do this, you probably don't need to know to do this.
This is very advanced functionality that likely won't be used very often. It takes a list of pipeline steps and turns them into a single Transform stream step. If you have to ask why you'd want to do this, you probably don't need to know to do this.
This is very advanced functionality that likely won't be used very often. It takes a list of pipeline steps and turns them into a single Transform stream step. If you have to ask why you'd want to do this, you probably don't need to know to do this.
This is very advanced functionality that likely won't be used very often. It takes a list of pipeline steps and turns them into a single Transform stream step. If you have to ask why you'd want to do this, you probably don't need to know to do this.
This is very advanced functionality that likely won't be used very often. It takes a list of pipeline steps and turns them into a single Transform stream step. If you have to ask why you'd want to do this, you probably don't need to know to do this.
This is very advanced functionality that likely won't be used very often. It takes a list of pipeline steps and turns them into a single Transform stream step. If you have to ask why you'd want to do this, you probably don't need to know to do this.
This is very advanced functionality that likely won't be used very often. It takes a list of pipeline steps and turns them into a single Transform stream step. If you have to ask why you'd want to do this, you probably don't need to know to do this.
This is very advanced functionality that likely won't be used very often. It takes a list of pipeline steps and turns them into a single Transform stream step. If you have to ask why you'd want to do this, you probably don't need to know to do this.
This is very advanced functionality that likely won't be used very often. It takes a list of pipeline steps and turns them into a single Transform stream step. If you have to ask why you'd want to do this, you probably don't need to know to do this.
This is very advanced functionality that likely won't be used very often. It takes a list of pipeline steps and turns them into a single Transform stream step. If you have to ask why you'd want to do this, you probably don't need to know to do this.
This is very advanced functionality that likely won't be used very often. It takes a list of pipeline steps and turns them into a single Transform stream step. If you have to ask why you'd want to do this, you probably don't need to know to do this.
This is very advanced functionality that likely won't be used very often. It takes a list of pipeline steps and turns them into a single Transform stream step. If you have to ask why you'd want to do this, you probably don't need to know to do this.
This is for internal SDK use only and is no longer needed externally but remains here for backward compatibility.
This creates a pipeline step that takes events in of type T, allows your code to process it and then you send an event of type U to the the next pipeline step.
The type of the data sent into the function to be processed
The type of the data you send on after being processed
The name of the bot act as
The function to process data, getting data of type T and returning data of type U
The queue to send the resulting data to
The pipeline step that is ready to be used in a pipeline
Creates a pipeline step that is used to manually get/set the checkpoint. Often, this pipeline step is used to get and store checkpoints and then at the very end of the pipeline, it will use the saved off data to checkpoint manaully checkpoint.
The bot that is doing the reading.
The queue we need to checkpoint where we've read to on behalf of the bot botId
How often to checkpoint.
Checkpoint after this number of records or as soon as the time
condition is met if used and happens sooner.
Checkpoint after this amount of time or as soon as the records
condition is met if used and happens sooner.
Note, this type is any one of the valid durations the Moment JS library
can take: Duration | number | string | FromTo | DurationInputObject.
The pipeline step that is ready to be used in a pipeline
This creates a pipeline step that turns a Javascript object into a JSON line (newline at the end of the stringified JS object). This is used to make it easy to create JSON lines files.
The pipeline step that is ready to be used in a pipeline
This is a callback-based version of the RStreamsSdk.throughAsync
function and should no longer be used.
Callback-based API flavors will be phased out over time.
This creates a callback based pipeline step that will take data in, possibly transform the data or do computation, and then sends the result on to the next step in the pipeline.
The type of the data sent in to be passed through this step.
The type of data to be sent on to the next step in the pipeline.
A function that does the work of taking the data in, doing something with it and then calling the done function when done.
The first arg is stripped off by Javascript since it recognizes that the this arg is just to set the this context
so that the this
keyword will work inside the function and itself be the instance of the transform stream which can be useful.
For example, say you want to push to an event in here to a queue. You could do that by calling
this.push
to push the event to a queue while still sending the queue on the next step in the pipeline afterwards.
So, the first real argument your function will receive is obj
which is the data event being sent in to be processed/transformed
and sent on to the next pipeline step. The second arg is done
. You call this when you're done. Call done()
if there's no error
but you want to filter out this event and not pass it on to the next pipeline step. Call done(err)
if an error ocurreed where
err
is a string or Error object. Call done(null, U)
when no error and you want to pass on an event to the next step in the
pipeline where U
is the type of object being sent on.
A function to be called when the entire pipeline has been flushed to allow for cleanup, perhaps closing a DB connection.
The pipeline step that is ready to be used in a pipeline
This creates an async/await-friendly pipeline step that will take data in, possibly transform the data or do computation, and then sends the result on to the next step in the pipeline.
The type of the data sent in to be passed through this step.
The type of data to be sent on to the next step in the pipeline.
A function that does the work of taking the data in, doing something with it and then rejecting or resolving
the promise with the result object type U. If you resolve with no result data, the event is skipped and not sent to the next pipeline step.
The first arg is stripped off by Javascript since it recognizes that the this arg is just to set the this context
so that the this
keyword will work inside the function and itself be the instance of the transform stream which can be useful.
For example, say you want to push to an event in here to a queue. You could do that by calling
this.push
to push the event to a queue while still sending the queue on the next step in the pipeline afterwards.
So, the first real argument your function will receive is obj
which is the data event being sent in to be processed/transformed
and sent on to the next pipeline step.
A function to be called when the entire pipeline has been flushed to allow for cleanup, perhaps closing a DB connection.
The pipeline step that is ready to be used in a pipeline
This creates a pipeline step that will create a CSV file from the events that flow into this step. Underneath the covers it uses the popular fast-csv node library.
List of fields to transform | true builds the header list dynmaically
fastCSV options https://c2fo.github.io/fast-csv/docs/parsing/options
The pipeline step that is ready to be used in a pipeline
This is only to be used internally by the SDK. It used to be necessary externally and so remains for backward comaptibiliy.
Creates a pipeline step that will checkpoint and then pass the events on to the next step in the pipeline.
When to checkpoint.
The pipeline step that is ready to be used in a pipeline
A super convenient function to write data to Dynamo DB table as the final step in a pipeline, the sink, as events flow into the pipeline step.
The name of the Dynamo DB table to write to
The options for writing.
The pipeline step that is ready to be used in a pipeline
Create a pipeline step that takes the events from the previous pipeline step and then writes them to an RStreams bus queue.
The bot to act as when writing.
Options for writing
The pipeline step that is ready to be used in a pipeline
This creates a pipeline step meant to be the last step in a pipeline, the sink, that writes events that flow into it into S3. You should micro-batch events before getting to this step to control how many events to write to the file.
The name of the AWS S3 bucket to write the file to
The name of the file to write.
The pipeline step that is ready to be used in a pipeline
This is a sink, a step designed to be the last step in the pipe.
This is a sink, a step designed to be the last step in the pipe.
Generated using TypeDoc
This namespace encompasses the majority of the functionality of the SDK. It might be helpful to start at RStreamsSdk which exposes functionality from this namespace that is most commonly used.