Node SDK

Interface EnrichBatchOptions<T, U>

Legend

  • Namespace
  • Variable
  • Function
  • Function with type parameter
  • Type alias
  • Type alias with type parameter
  • Interface
  • Interface with type parameter
  • Property
  • Enumeration
  • Class

These options for an RStreamsSdk.enrich pipeline step. This reads events from one queue and writes them to another queue. Put another way, an enrich pipeline operations reads events from an inQueue and then writes them to an outQueue, allowing for side effects or transformation in the process.

This inherits from EnrichOptions and adds the ability to batch the data read from the source inQueue before it is sent to the transform function which means your transform function will recieve an array of type T which is the batched events instead of just a single event of type T.

see

RStreamsSdk.enrich

see

RStreamsSdk.enrichEvents

Type parameters

  • T

    The type of the event read from the source inQueue

  • U

    The type of the event that will be written to the destination outQueue

Hierarchy

Index

Properties

autoCheckpoint?: boolean

Enable/Disable if the stream will use auto checkpointing

default

true

backoff?: BackoffOptions

Options for kinesis/firehose backoff options

default

{ randomisationFactor: 0, initialDelay: 10, maxDelay:1000 }

batch: number | BatchOptions

This governs micro-batching events that have been received from the source inQueue before they are sent to your transform function, allowing that function to receive events in batches instead of one at a time. This can be useful when your transform function will reach out and hit an external resource such as a database. Hitting a database for every single event that flows through a pipe can be very detrimental to performance. So, it's common to micro-batch say 100 or 1000 or so and then construct a single query to a database to read/write all data as a single DB operation.

If this is a number, it's just the number of events to micro-batch up.

config?: ReadOptions

Fine-grained control of reading from the source inQueue

firehose?: boolean

If true, firehose will be used. Firehose batches events sent to it to an S3 file in 1 minute increments, which of course adds at least a one minute latency. However, firehose can take a vast number of concurrent writes compared to kinesis. So, use this when you have a giant number of concurrent events you wish to write, where a giant number is X events per X amount of time.

An RStreams Bus system bot reads the 1 minute batched S3 files written to firehose, separates the events in the file into separate S3 files by queue and then sends a single event to kinesis for each resulting queue-specific S3 file. From there, processing continues as if events were sent to kinesis using s3.

If this and s3 are present, firehose will be used.

default

false

force?: boolean

If true, the checkpoint will be applied even if someone else already checkpointed on the same bot/queue since the last time this code checkpointed. This is only used in advanced fanout cases.

id: string

The name of the bot that this code is acting as. The SDK will use it to query to the bot Dynamo DB table to pull checkpoints and to checkpoint for you.

inQueue: string

The source queue from which events will be read

outQueue: string

The destination queue into which events will be written

partitionHashKey?: string

The hash value used to explicitly determine the shard to send events

default

0

records?: number

The number of records, where each record is an event, to micro-batch locally in the SDK before writing them to either kinesis, firehose or S3. See the other options in this object to understand how this might be useful.

The SDK will write events as soon as one of the records, size or time conditions are met.

default

kinesis 100 records (events)

default

S3 1 file

default

firehose 10,000 records (events)

s3Opts?: { addBufferDuration?: boolean; chunkTime?: DurationInputArg1; prefix?: string; sectionCount?: number; time?: DurationInputArg1 }

Options to pass to leoS3 to configure s3 file creation

Type declaration

  • Optional addBufferDuration?: boolean

    Include the duration to create the first chunk in the allowed S3 file time

    default

    true

  • Optional chunkTime?: DurationInputArg1

    Time allowed before closing a chunk of an s3 file

    default

    this.time

  • Optional prefix?: string

    prefix to use in the file name.

    default

    Bot Id

  • Optional sectionCount?: number

    Max number of chunk sections to allow in an S3 file

  • Optional time?: DurationInputArg1

    Time allowed before closing an s3 file

    default

    10s

size?: number

The number of bytes to micro-batch locally in the SDK before writing them to either kinesis, firehose or S3. See the other options in this object to understand how this might be useful.

The SDK will write events as soon as one of the records, size or time conditions are met.

default

kinesis 200k

default

S3 unbounded

default

firehose 900k

start?: string

The event ID of the starting position to read from with in the queue. It is common to not provide this because each queue is stateful in that it remembers the last read position of a bot. Then, as bots read they make a call back to the RStreams Bus to update the read position.

Usually, the SDK just handles this for you. So, if the start isn't provided, the SDK will just use the bot's last read position as the starting point. So, as bots are invoked, read some events and do some processing, they automatically update back how far they've read to and then the bot shuts down after a period of time. When the bot starts back up to read again, it knows where it last read from and just continues.

see

Fundamentals: Event ID

time?: DurationInputArg1

The amount of time to micro-batch locally in the SDK before writing events to either kinesis, firehose or S3. See the other options in this object to understand how this might be useful.

Note, this type is any one of the valid durations the Moment JS library can take: Duration | number | string | FromTo | DurationInputObject.

The SDK will write events as soon as one of the records, size or time conditions are met.

default

kinesis 200ms

default

S3 unbounded

default

firehose 900k

useQueuePartition?: boolean

Flag to use the queue name to determine the shard to send events

default

false

useS3?: boolean

If true, the SDK will write events to S3 and then pass a single event to kinesis whose payload references the S3 file. Thus, one actual event flows through Kinesis and that one event is eventually written to the RStreams bus' events DynamoDB table, still referencing the S3 file full of events. When reading events, the SDK will detect it has received an event in a queue that is really a reference to S3 and retrieve the portion of the S3 file needed to fulfill the SDK read request made.

This can be useful when a very large number of events need to be written all at once or if the events are large. However, there is some additional ingestion latency incurred by this approach and also on reading. If the size of the S3 files is large, the latency penalty for reading is negligible for most use cases. However, waiting to batch up a sufficient number of events can cause a delay getting events into Kinesis for ingestion. The rule of thumb is files around 2mb or larger are fine. It's OK if an occasional file is small. However, creating many small files should be avoided as it could cause read latency. For example, if requesting 1000 events from a queue if every two events are in an S3 file, the SDK will have to retrieve 500 files to read just 1000 events. Use the other settings to tune the amount of data saved to the file: records, size, time.

NOTE! A new feature, ReadOptions.fast_s3_read, largely solves the problem of having lots of small S3 files by enabling the SDK to concurrently read from multiple S3 files.

If this and firehose are present, firehose will be used.

default

false

Methods

Generated using TypeDoc