Parallel Map in Elixir

elixir
Posted on: 2014-07-19

A few months ago at Railsconf, I attended a workshop on Elixir, given by Chris McCord, and have been tinkering with it off and on ever since.

I'd done some functional-style programming in Javascript and Ruby, in the sense of using things like map, function currying, and the like. And I'd played with Clojure a little.

But aside from liking Elixir's syntax more, I'm also struck by its foundation. Building a functional language on top of Erlang is less impressive than building one on Java; Erlang is already functional. But if the task is easier, it's probably easier to do well. Furthermore, with the Erlang foundation already laid, Elixir exists only to make the programmer's job easier.

I'm still pretty new to Elixir, but one of the more tantalizing things I've seen comes from the beginning of Dave Thomas' Programming Elixir. Dave takes the familiar map function and, with a tiny bit of tweaking, makes it run in parallel with as many processes as necessary. (These are VM-level Erlang processes, not system processes; they're cheap, so making 1,000 is no big deal.)

I've copied that example here and added my own comments to clarify what's happening. Note that in Elixir, |> is a "pipe", akin to Unix pipes, so that instead of saying use_data(get_data()), you can say get_data() |> use_data().

defmodule Parallel do
  # Allows mapping over a collection using N parallel processes
  def pmap(collection, function) do
    # Get this process's PID
    me = self
    collection
    |>
    Enum.map(fn (elem) ->
      # For each element in the collection, spawn a process and
      # tell it to:
      # - Run the given function on that element
      # - Call up the parent process
      # - Send the parent its PID and its result
      # Each call to spawn_link returns the child PID immediately.
      spawn_link fn -> (send me, { self, function.(elem) }) end
    end) |>
    # Here we have the complete list of child PIDs. We don't yet know
    # which, if any, have completed their work
    Enum.map(fn (pid) ->
      # For each child PID, in order, block until we receive an
      # answer from that PID and return the answer
      # While we're waiting on something from the first pid, we may
      # get results from others, but we won't "get those out of the
      # mailbox" until we finish with the first one.
      receive do { ^pid, result } -> result end
    end)
  end
end

This can be used as follows:

# Calculate squares of these numbers
Parallel.pmap(1..1_000, fn(integer) -> integer * integer end)

Coming from Ruby, where parallelism is not nearly as straightforward, I find the possibilities here intriguing.

Update 2016-07-09

My friend Jay Hayes points out that this can be done more succinctly with Task as follows:

defmodule Parallel do
  @doc """
      iex> Parallel.map([1,2,3], &(&1*2))
      [2,4,6]
  """
  def map(collection, function) do
    collection
    |> Enum.map(&Task.async(fn -> function.(&1) end))
    |> Enum.map(&Task.await(&1))
  end
end

Also, I'd like to add that in my original example, if the order of the output need not match the order of the input, changing receive do {^pid... to receive do {_... - in other words, receive whatever pid comes to you next - would be slightly faster. And if you do that, you might as well not have the child processes send back their pids at all.