Blog

Advanced Functional Programming: Optimize Data Processing Performance with Stream Pipeline

Advanced functional programming: Optimize data processing performance with stream pipeline

Prerequisites

  • Firm grasp of ES6, especially arrow function, generator, Array.map, Array.reduce, …
  • Deep understanding of functional programming like currying, pure function, higher-order function, 
  • Basic knowledge of RxJS.

Note: This article strongly focus in advanced functional programming so please make sure you meet the requirements before diving in the main content.

Table of Contents

1. Function Composition

2. Transducer

3. RxJS Use Case

Function Composition

What is function composition and why do we need it?

In computer science, function composition is an act or mechanism to combine simple functions to build more complicated ones

Composing functions helps developers break complex logic into a set of smaller problems, thus significantly improve maintainability and code reuse.

A real-world example to understand further:

Imagine you are a bartender whose job is serving cocktails by mixing liquors, beverages, and many other types of ingredients in an orderly fashion.

You have an empty cup let glass = "" as an initial value and multiple preparation steps:

Remember that the ingredients and the steps you take are important because it creates different drinks:

Paloma cocktail recipe

For each cocktail recipe, we need to write a function to prepare it, with various preparation steps that take time to follow. So you begin to have an idea: Implement a naive let loop to execute the preparation steps in an orderly fashion.

Much shorter and easier to read!

Similar to composition of functions in mathematics, the result of each function is passed as the argument of the next, and the result of the last one is the result of the whole.

This imperative implementation is good enough in term of function composition, but because we are learning functional programming so let’s make it declarative by using Arrray.reduce  to simplify the serveCocktail function:

onst serveCocktail = (...methods) => {
   // the initial value is hard-coded as an empty string
   return methods.reduce((glass, method) => method(glass), "")
}

Even better, we can use currying to lift the initial value up as an argument. For general-purpose usage, we define a utility function that allows us to compose functions with ease.

  • pipe: compose functions from left to right.
  • compose: compose functions from right to left.

Composing functions is much easier with the help of pipe and compose.

When we break a complex process into smaller functions, with each function does only one job, developers can write declarative code which enhances readability, reusability, and separation of concerns.

 

Transducer

A transducer is a composable higher-order reducer. It takes a reducer as input, and returns another reducer (Eric Elliott)

A reducer is a pure function that takes 2 arguments and returns a new value.

When applying reducer in context of a loop:

  • The first argument named accumulator is the result from the previous step, or the initial value if it is the first step in the loop.
  • The second argument is the current value in the loop.

So a simple syntax of a reducer is as follow:

reducer = (accumulator, curVal) => newVal

Then syntax of a transducer can be stated as:

transducer = reducer => reducer

To answer the question “why do we need transducer?”, follow me into an example:

Imagine working as a scientist at NASA, your duty is to communicate with an alien spaceship outside our planet by receiving signals and transform it into human understandable data, then display the final message.

why do we need transducer

  1. Transform signal: filter false signals (noises) then convert signals to string.
  2. Display message: concatenate string.

This simple, yet naive implementation is fine only if your array is small.

Consider processing a huge array of trillion signals:

  • Each time you traverse through the array with filter and map , JavaScript initiates a whole new intermediary array that takes up a ton of memory.
  • You need to wait for each processing step to finish before executing the next one. In the above code, if we want to retrieve the final message, we must wait for reduce to complete it. Before that, reduce must wait for map to complete. Before that, map must wait for filter to complete. This is a fatal problem because the information in the message is critical and we must read it within a very strict deadline.

An approach to solve this problem is to build a processing pipeline that takes signals as a stream of data and performs execution on each signal in the correct order.

Rewrite it a little bit so you can recognize the flow of xform is exactly the same as of function composition. The result of decodeSignal is depends on isTruthful, and the result of concatString is depends on decodeSignal.

(msg, sig) => {
  let val = isTruthful(sig) ? sig : null
  if (val) {
    val = decodeSignal(val)
    val = concatString(msg, val)
    return val
  }
  return msg
}

In this example, we only have to deal with a few processing steps. But keep in mind that in real world, the transform signals process must go through multiple steps in a complicated order. To build a flexible pipeline, we begin to think of function composition:

compose(isTruthful, decodeSignal, concatString)   // not working 

But it will not work, simply because these functions are not composable. In other words, the result of the previous function is not compatible with the argument of the next function.

To make chaining possible, we must rewrite the filter and map functions:

const filter = next => {
  return (msg, sig) => isTruthful(sig) ? next(msg, sig) : msg
}const map = next => {
  return (msg, sig) => next(msg, decodeSignal(sig))
}

Given that next is a reducer, then filter and map are transducers, they take a reducer as input and return another reducer.

To enhance reusability, we use currying to lift isTruthful and decodeSignal up as arguments:

const filter = predicate => next => 
         (acc, cur) => predicate(cur) ? next(acc, cur) : accconst map = transform => next =>
      (acc, cur) => next(acc, transform(cur))const transformSignals = compose(
   filter(isTruthful),
   map(decodeSignal)
}

We can use compose to combine transducers because they are composable, and the result transformSignals is a new transducer, so we must supply a reducer as a final step to tell the transducer how to accumulate the result.

const xform = transformSignals(concatString)const message = signals.reduce(xform, "")

To sum up everything we just did, here’s the complete code that you can run:

 

RxJS use case

Congratulation!!! If you are reading up to this section, then you’ve finished all the hardest stuff. RxJS is a library that supports streaming data, it has many handful transducers like filter and map so that we don’t have to implement anything from scratch.

Try to run and observe the output, you can clearly see that each signal is processed independently, and the result is printed immediately.

Sourcehttps://medium.com/@blueish/advanced-functional-programming-optimize-data-processing-performance-with-stream-pipeline-a610a8e8c863

Content Manager

Thanh (Bruce) Pham CEO of Saigon Technology

A Member of Forbes Technology Council

Table of Contents

Want to start a project?

Our team is ready to implement your ideas. Contact us now to discuss your roadmap!

get in touch

As a Leading Vietnam Software Development Outsourcing Company, we dedicate to your success by following our philosophy:

YOUR SUCCESS IS OUR MISSION.

about-us

Contact Us