logo
SIGN UP

Developer Documentation

# Let's Data : Focus on the data - we'll manage the infrastructure!

Cloud infrastructure that simplifies how you process, analyze and transform data.

Error Handling

A dataset that has atleast 1 error task transitions to the ERRORED state. The errors can be classified into two types: Task Errors and Record Errors

  • Record Errors: Record errors are errors in the user handler in parsing records or creating composite records from the parsed records. Record errors are logged as error documents if an error document is returned from the user handlers, or become task errors if an exception is thrown in user handlers.
  • Task Errors: Errors in the #Let's Data framework are task errors. Task errors transition the task to ERRORED state.

Task Errors

At a high level, the failures could be:

  • Task timeout: The task exceeded the amount of time allocated to the compute worker and transitions to an error state. In most cases, we will schedule a task to RERUN before it times out, but in case it does, the system tags such tasks with timeout error in the last error attribute of the task. The task execution logs would also show that the task has timed out.
  • Read Connector Issues: The read connector becomes unavailable, the read connector connectivity is reset or the read connector has transient errors. The system is somewhat resilient to such errors with exponential wait retry patterns but these errors could transition an incomplete task to error status. These errors would show in the last error attribute of the task and in the task execution logs.
  • Write Connector Issues: The writes to the write destination / error destinations fail because of write connector unavailability, connectivity issues or transient errors such as throttling. The system is somewhat resilient to such errors with exponential wait retry patterns but these errors could transition an incomplete task to error status. These errors would show in the last error attribute of the task and in the task execution logs.
  • Checkpointing Errors: The checkpoint fails due to checkpoint database unavailability, connectivity issues or transient errors such as throttling. The system is somewhat resilient to such errors with exponential wait retry patterns but these errors could transition an incomplete task to error status. These errors would show in the last error attribute of the task and in the task execution logs.
  • Diagnostic Failures: The metrics / log emissions fails due to diagnostics endpoint unavailability, connectivity issues or transient errors such as throttling. The system is somewhat resilient to such errors with exponential wait retry patterns but these errors could transition an incomplete task to error status. These errors would show in the last error attribute of the task and in the task execution logs.
  • S3 File Reader Parsing Errors: Here is how the following parsing errors are handled by the system:
    • S3 File Reader Parsing Errors
      • The task does not find a start pattern in the file - this is expected from time to time and indicates an end of file. The system transitions the task to completed state.
      • The task finds a start pattern in the file but does not find a corresponding end pattern within the max allowed record size (16 MB)- this is unexpected and in this case, we will transition the task to error state, capturing the exception in the last error and logs.
      • The task finds a start pattern in the file but does not find a corresponding end pattern and reaches the end of file - this is unexpected and in this case, we will transition the task to error state, capturing the exception in the last error and logs.
    • SQS Queue Reader Parsing Errors
      • The SQS Queue Reader reads the messages in batch and after they are written to the write destination, it deletes the messages from the SQS Queue. It is possible that the deletes are unsuccessful for some messages. The system is somewhat resilient to such errors, it does some retries but in case of deletion errors, it will transition the task to ERRORED state.
      • If the max receive count is reached on a message, the SQS queue has the responsibility of moving the message to the DLQ (or similar terminal error handling strategy)
  • Unhandled Exceptions: The task gets an unhandled exception thrown by the user's data handlers (parser / reader implementations) - in this case, we will transition the task to error state, capturing the exception in the last error and logs.

Error Model

While the above tries to simplify the error handling story for the tasks, in reality a task error could be because of a number of reasons. Examining the task lifecycle would help us understand the different failure points in the process and how the system would react to these failures.

  • 4. Task Created: The Task lifecycle starts with a task in created state. In this state, the following errors could happen:
    • task is not picked up by a compute worker in a defined amount of time - The system makes a best effort guarantee that tasks would be picked up within the defined amount of time
      • System Behaviour: any laggards would be issues that the #Let's Data team would be alarmed for and fix. User doesn't necessarily need to write code to handle this case.
    • the task is picked up by a compute worker but fails before the compute worker has had a chance to initialize exception handling code as part of the task processing
      • System Behaviour: We make best effort to minimize the compute worker code that runs before exception handling has been initialized but issues can happen. However, we don't suggest users write code for handling this error case
  • B. Task Reads: See Read Connector issues and reader parsing issues above.
  • C. Write Document / D. Error Records: See Write Connector issues above.
  • E. Create Task Checkpoint: See Checkpointing Errors above.
  • F. Emit Metrics / G. Logs: See Diagnostic Failures above.
  • I. End of File / J. Unhandled Exceptions / K. Task Stop Requested: See Unhandled Exceptions above.
 

Redriving Tasks

Once a dataset has errored (atleast 1 task has errored), the errored tasks can be redriven. If the issues that caused the error were transient, then re-running as is would be okay. If there were software issues or issues such as timeout, then the tasks should be redriven after fixing the issue and updating the dataset configuration. Fixes could be updating the implementation jar, fixing timeout / concurrency etc.

A task redrive would reinitialize the failed tasks according to the specified task redrive policy. This requires us asynchronously running a simple task redrive workflow with the following high level sub tasks:

  • make sure that the dataset and task data consistency is maintained and that multiple task redrive requests don't cause havoc in the system etc.
  • select the tasks that need to be redriven using to the specified Task Selection Policy
  • move the dataset and the redriving tasks to PROCESSING status, apply the Task Redrive Policy, clear any error state related to these tasks

Once the dataset and the tasks are in PROCESSING state, they are like any other task and follow the same lifecycle as before.

Being able to redrive error task should be simple. For example, when processing at high scale with a large number of tasks (~80,000), finding which task has failed and should be redriven, why has it failed etc might not scale too well. We've built in a few simple Task Redrive constructs to help with selecting tasks for redrive and the policy to apply when redriving these. These are discussed as follows:
  • Task Redrive Selection: Specify this value to tell us on how to select tasks that need to be redriven. This currently has the following values:
    • "RedriveAllErrorTasks": When this value is specified for the Task Redrive Selection, #Let's Data will find all the tasks that are in error status and attempt to redrive them.
    • "RedriveSelectErrorTasks": When this value is specified for the Task Redrive Selection, #Let's Data expects the taskIds of the tasks that need to be redriven in the Task Redrive request.
  • Task Redrive Policy: We've built in checkpointing in the framework - this essentially means that error tasks have a save state for the file they are processing. We've defined a Task Redrive Policy to specify how to use the save state (checkpoint) in redriving the task. These are:
    • "ResumeTasksFromLastCheckpoints": In scenarios where the file processed upto the save state does not need to be reprocessed, specify the "ResumeTasksFromLastCheckpoints" policy - this will resume the task from the last save state (checkpoint).
    • "StartTasksFromTheBeginning": If however the task needs to be restarted then select the "StartTasksFromTheBeginning" policy - this will clear any save state (checkpoints) and start the task from the beginning of the file.

Redrive - CLI Commands

Here are different CLI commands that can be used with datasets:

Command Syntax:

Command Help:

Show Help
 

Command Examples:

Show Examples

Record Errors

The user handlers (parser / reader implementations) can also encounter errors in either parsing the documents or in creating a composite document from the parsed records. It is up to the user handlers on how they deal with such errors. If they believe that the error records can be logged as errors and the file processing should continue, they should return an error record and the system will archive the error record and process the next record. If the error requires the system to stop processing records any further, then throwing an exception would transition the task to error state. Developers can look at the archived error records and failed tasks, fix the issues and retry these as needed.

Record Errors Emission Code

When an error happens in the user data handlers (parsers/readers), the handler can return Error Documents that the system will archive to the error destination. The Error Records are augmented with filenames, timestamps, file offsets and the serialized error doc for debuggability. Here is sample code that can be used in a parser / reader to return an error doc. This code uses the default error doc implementation. Users can code their own implementations of the ErrorDoc interface as well.

Record Errors Schema

Each error document is a gzip compressed file that can contain multiple error documents that were parsed in a checkpoint. The filename is constructed from different useful elements from the checkpoint state. Here is a look at an example filename and its format:
 

Here is an Error Record schema and an example:

Record Errors - CLI Commands

Command Syntax:

Command Help:

Show Help
 

Command Examples:

Show Examples
On This Page