How to control concurrent item processing in Golang

4 min read
zen8labs Throttle concurrent item processing in Golang

Handling many tasks at once is key to making software run better. Golang has strong tools to help with this. In this post, we’ll look at how to limit the number of tasks running at the same time in Golang. This helps balance the workload and avoid overloading the system. Using these methods can keep your application running smoothly and efficiently.

The problem

The problem is as follows: we have a list of items []A, a function f(A) that transforms A into B and returns a list of items []B. At the same time, we want to regulate the maximum number of threads at any given moment. 

Summary: []A → throttle async f(A)B → []B 

Introduction 

In this article, I will use a channel to throttle concurrent processing at a given time. The components will include the following actors: 

  • The throttle channel will act like a set of processing slots for delivery items, with each slot processing only one item at a time. 
  • The delivery item is the item that needs to be processed. 
  • The response item is the final product returned after processing. 
  • The collector receives and aggregates the response items from the throttle channel and returns the results. 
  • The carrier transports the delivery items to the channel. 

Implementation

Because the request processing is asynchronous, the collector needs to know the position of each response item in the delivery items when it receives the response. Therefore, I will create a struct that includes the response item and its order. 

type ResponseItem[B any] struct { 
     item  B 
     order int 
} 

First, I will create a slot to transform the delivery item into the response item. 

package gomap 

type Slot[A, B any] struct { 
     fx func(A) B 
} 

func NewSlot[A, B any](processFx func(A) B) Slot[A, B] { 
     return Slot[A, B]{ 
          fx: processFx, 
    } 
} 

func (slot Slot[A, B]) ProcessDeliveryItem(item A) B { 
     return slot.fx(item) 
} 

Next, I need to create a throttle channel. The channel will receive delivery items from the carrier and put them into an empty slot for processing. If no slot is available, it will wait until one becomes available. After processing, it will return the final product to the collector via the collect channel. 

package gomap 

type ThrottleChannel[A, B any] struct { 
     slotChannel    chan any 
     collectChannel chan ResponseItem[B] 
     fx             func(A) B 
} 

func NewThrottleChannel[A, B any](fx func(A) B, slotChannel chan any, collectChannel chan ResponseItem[B]) ThrottleChannel[A, B] { 
     return ThrottleChannel[A, B]{ 
          slotChannel:    slotChannel, 
          fx:             fx, 
          collectChannel: collectChannel, 
     } 
} 

func (t *ThrottleChannel[A, B]) waitIdleSlot() Slot[A, B] { 
     var anything any 
     for { 
         select { 
         case t.slotChannel <- anything: 
              return NewSlot[A, B](t.fx) 
         } 
     } 
} 

func (t *ThrottleChannel[A, B]) HandleDeliveryItem(item A, order int) { 
     slot := t.waitIdleSlot() 

     go func() { 
          respItem := slot.ProcessDeliveryItem(item) 

          t.collectChannel <- ResponseItem[B]{ 
               item:  respItem, 
               order: order, 
          } 

          t.cleanSlot() 
     }() 
} 

func (t *ThrottleChannel[A, B]) cleanSlot() { 
     <-t.slotChannel 
}

After that, create a simple carrier to deliver items to the throttle channel. 

package gomap 

type Carrier[A, B any] struct { 
     throttle ThrottleChannel[A, B] 
} 

func NewCarrier[A, B any](throttle ThrottleChannel[A, B]) Carrier[A, B] { 
     return Carrier[A, B]{ 
          throttle: throttle,  
     } 
} 

func (c *Carrier[A, B]) DeliveryItems(items []A) { 
     for i, item := range items { 
          c.throttle.HandleDeliveryItem(item, i) 
     } 
}

Next, create a collector to receive the response items and sort them according to the order of the delivery items. Here, I will place the response item into an array at the correct position in the original list. Therefore, there’s no need to sort again. 

package gomap 

type Collector[B any] struct { 
     collectChannel   chan ResponseItem[B] 
     deliveryItemsLen int 
} 

func NewCollector[B any](collectChannel chan ResponseItem[B], deliveryItemsLen int) Collector[B] { 
     return Collector[B]{ 
          collectChannel:   collectChannel, 
          deliveryItemsLen: deliveryItemsLen, 
     } 
} 

func (c *Collector[B]) CollectItems() []B { 
     storage := make([]B, c.deliveryItemsLen) 

     for i := 0; i < c.deliveryItemsLen; i++ { 
          resp := <-c.collectChannel 

          order := resp.order 
          storage[order] = resp.item 
     } 

     return storage 
} 

Finally, create a function called AsyncMap as the entry point, which will be called from outside the package.

func AsyncMap[A, B any](fx func(A) B, s []A, processNumber int) []B { 
     slotChannel := make(chan any, processNumber) 
     collectChannel := make(chan ResponseItem[B], len(s)) 

     defer func() { 
          close(slotChannel) 
          close(collectChannel) 
     }() 

     throttle := NewThrottleChannel(fx, slotChannel, collectChannel) 
     carrier := NewCarrier(throttle) 
     collector := NewCollector(collectChannel, len(s)) 

     go carrier.DeliveryItems(s) 
     return collector.CollectItems() 
}

Let’s try running a test. 

package main 

import ( 
     "fmt" 
     "github.com/test/gomap" 
     "time" 
) 

func main() { 
     s := []int{1, 2, 3, 4, 5} 

     start := time.Now() 
     resp := gomap.AsyncMap(func(item int) int { 
          fmt.Printf("item %d start at second: %f\n", item, time.Now().Sub(start).Seconds()) 
          time.Sleep(time.Second) 

          return item * item 
     }, s, 3) 

     fmt.Println(resp) 
}

And we get the result as: 

item 3 start at second: 0.000084 
item 2 start at second: 0.000115 
item 1 start at second: 0.000096 
item 4 start at second: 1.001128 
item 5 start at second: 1.001139 
[1 4 9 16 25]

This is a simple way to use channels for asynchronous processing of a slice. Additionally, we can also implement using the fan-in, fan-out pattern. In practice, we also need to handle additional cases such as errors or premature termination of the flow.

Conclusion

In conclusion, limiting the number of tasks running at the same time in Golang helps manage resources and keeps your application running well. Using Golang’s built-in tools, like channels and goroutines, developers can control task flow to avoid overloading the system. These methods make applications more reliable and easier to manage. For other knowledge sharing, check out the articles on zen8labs’ website.

Hoa Le, Software Engineer

Related posts

When selecting a technology stack for a new project, developers often need substantial time to evaluate various options. The process from initiation to achieving a stable codebase can be daunting, especially for projects requiring rapid development. With Supabase, zen8labs' team has managed to expedite this process while providing scalability and stability for our projects.
5 min read
Carrying out the right performance test can help any IT project succeed. This blog shows how K6 can help you carry out the most common of performance tests.
5 min read
What is the CSS happy learning path and how does it affect javascript developers? Then this blog is for you to show you the ways that CSS can offer you a happy learning experience.
5 min read