Transducers

From Clojure to C++

















a presentation brought to you by
Juanpe Bolívar
@sinusoidalen
https://sinusoid.al

1. What?

"A transducer is a device that converts one form of energy to another. Energy types include (but are not limited to): electrical, mechanical, electromagnetic (including light), chemical, acoustic, and thermal energy. Usually a transducer converts a signal in one form of energy to a signal in another[1] (for example, a loudspeaker driver converts an electric signal to sound), but any variable attenuation of energy may serve as input; for example, the light reflecting off the landscape, although it is not a signal, conveys information that image sensors, one form of transducer, can convert. A sensor is a transducer whose purpose is to sense (i.e. detect) some characteristic of its environs; it is used to detect a parameter in one form and report it in another form of energy, often an electrical signal. For example, a pressure sensor might detect pressure (a mechanical form of energy) and convert it to electrical signal for display at a remote gauge. Transducers are widely used in measuring instruments."

Invented by Rich Hickey Introduced in Clojure 1.7

Research from Ableton on declarative data models

A transducer is a transformation
over a sequential process


map([](int x) { return std::to_string(x); });
          
 

A transformation as a value


auto xf = map([](int x) { return std::to_string(x); });
          

 

A composable transformation


auto xf = comp(
    filter([](int x) { return x > 0; }),
    map([](int x) { return std::to_string(x); }));
          

$$comp(f, g) = f \circ g$$ $$(f \circ g)(x) = f(g(x))$$

A transformation independent of the source


auto xf = comp(
    filter([](int x) { return x > 0; }),
    map([](int x) { return std::to_string(x); }));
          

auto data = std::vector<int>{ ... };
auto xformed = sequence(xf, data);
std::copy(xformed.begin(), xformed.end(), ...)
          

run(comp(read<int>(std::cin),
         xf,
         write(std::cout)));
          

auto sig1 = boost::signal<void(int)>{};
auto sig2 = make_signal(xf, sig1);
sig2.connect([] (std::string s) { cout << s << endl; });
sig1(41); sig1(-10); sig1(10) // "41", "10", ...
          





Sequential processes are

not just ranges!

  • An iterator
  • GUI events
  • CSP Channels
  • A network socket

2. How?

"Extract the essence
of map and filter"

template <typename In, typename Out, typename F>
Out transform(In first, In last, Out out, F fn) {
    for (; first != last; ++first) {
        *out++ = fn(*first);
    }
    return out;
}
template <typename In, typename Out, typename P>
Out filter(In first, In last, Out out, P pred) {
    for (; first != last; ++first) {
        if (pred(*first))
            *out++ = *first;
    }
    return out;
}
template <typename In, typename S, typename Rf>
S accumulate(In first, In last, S acc, Rf step) {
    for (; first != last; ++first) {
        acc = step(acc, *first);
    }
    return acc;
}
auto output_rf = [] (auto out, auto input) {
    *out++ = input;
    return out;
};
template <typename In, typename Out, typename F>
Out transform(In first, In last, Out out, F fn) {
    return accumulate(first, last, out, [&](auto state, auto input) {
           return output_rf(state, fn(in));
    });
}
template <typename In, typename Out, typename P>
Out filter(In first, In last, Out out, P pred) {
    return accumulate(first, last, out, [&](auto state, auto input) {
          return pred(input) ? output_rf(state, input) : state;
    });
}
auto map = [] (auto fn) {
  return [=] (auto step) {
    return [=] (auto s, auto ...ins) {
      return step(s, fn(ins...));
    };
  };
};
              
auto filter = [] (auto pred) {
  return [=] (auto step) {
    return [=] (auto s, auto ...ins) {
      return pred(ins...)
             ? step(s, ins...)
             : s;
    };
  };
};

// transform(first, last, fn) ==

accumulate(first, last, out,
    map(fn)(output_rf));


            

// filter(first, last, pred) ==

accumulate(first, last, out,
    filter(fn)(output_rf));



            
auto map = [] (auto fn) {
  return [=] (auto step) {
    return [=] (auto s, auto ...ins) {
      return step(s, fn(ins...));
    };
  };
};
              
auto filter = [] (auto pred) {
  return [=] (auto step) {
    return [=] (auto s, auto ...ins) {
      return pred(ins...)
             ? step(s, ins...)
             : s;
    };
  };
};
              



Definitions

  • Reducing function
  • Transducer
  • State
  • Inputs
  • Outputs

Mmmh, smells
like essence of map and filter!










The pipeline goes left to right

accumulate(first, last, out, comp(filter(pred), map(fn)) (output_rf))accumulate(first, last, out, filter(pred) (map(fn) (output_rf)))
if (pred(s, inputs)) {
inputs = map(inputs);
*out++ = inputs;
}

Atria uses Clojure's nomenclature

  • reduce(step, initial, ranges...)
  • transduce(xform, step, initial, ranges...)
  • into(col, xform, ranges...)
  • into_vector(xform, ranges...)
  • sequence(xform, ranges...)

3. Advanced stuff

1. Variadic transducers

Most transducers have arity 0...n


// 0 inputs, generator

sequence(map(&std::rand));
            

// 1 input

auto v1 = { 1, 2, 3, 4, 5 }
transduce(map([](int x){ return x * 2 }),
          std::plus<>{},
          0,
          v1);
            

// n inputs, zipping...

auto v1 = { 1, 2, 3, 4, 5 }
auto v2 = { 3.3, 2.2, 1.1 }
into_vector(map(plus<>{}), v1, v2);
into_vector(identity, v1, v2);
            

// n outputs

sequence(comp(
  map([] { return make_tuple(rand(), rand(), rand()); }),
  unzip,
  filter([] (int x, int y, int z) { return x<y && y<z; }),
  interleave));
            

2. State

enumerate is a stateful transducer


sequence(enumerate);
// 0, 1, 2, 3, ...

sequence(enumerate, "abc");
// (0, 'a'), (1, 'b'), (2, 'c')
          

The Clojure way is to hide the state
in the reducing function


auto enumerate = [] (auto step) {
    return [n = 0] (auto s, auto ins...) mutable {
        return step(s, n++, ins...);
    };
};
          

state goes here

auto enumerate = [] (auto step) {
    return [n = 0] (auto s, auto ins...) mutable {
        return step(s, n++, ins...);
    };
};

not here!

state_wrapper<
    Tag,
    State,
    Data
>

auto enumerate = [] (auto step) {
    return [=] (auto s, auto ins...) {
        auto n = state_data(s, [] { return 0; });
        auto w = state_unwrap(s);
        return state_wrap(step(w, n, ins...), n + 1);
    };
};
enumerate step
state
[♦, 1] [♦, 1] [♦, 1] [♦, 1] [♦, 2] [♦, 2]
input
0 0 1 1

3. Early termination

Take needs to terminate early


into_vector(comp(enumerate, take(4)))
// { 0, 1, 2, 3 }

into(string{}, comp(map(tolower), take(4)), "ABCDEG...")
// "abcd"
          
struct take_tag {};

auto take = [] (auto count) {
   return [] (auto step) {
       return [=] (auto s, auto ins...) {
          auto n = state_data(s, [] { return count; });
          auto w = state_unwrap(s);
          return state_wrap<take_tag>(
             step(w, ins...), n - 1);
      };
   };
};

Use tag to define reduced state

template <typename T>
bool state_wrapper_data_is_reduced(take_tag, T n) {
    return n == 0;
}
state_is_reduced(state)

4. Batteries included

  • reduce,
    reduce_nested
  • reductor
  • transducer

auto cat = [] (step) {
    return [=] (auto s, auto... ins) {
        return reduce_nested(
            step, s, ins...);
    }
};
                
reductor<RF, State, Ins...>
  • operator () (Ins...);
  • operator bool ();
  • current() -> StateT
  • complete() -> State
transducer<int, pack<int, char> >
    xform = comp(
        map([](auto x) {
            return to_string(x); }),
        cat,
        enumerate);
                    
"std::function for transducers"
  cat   dedupe   distinct   drop   drop_while   filter   interpose   mapcat   map   map_indexed   partition_by   partition   random_sample   remove   replace   take   take_nth   take_while
     chain   count   cycle   enumerate   product   range   repeat

Exclusive!

  each   eager   interleave   iter   readbuf   read   sink   traced   transducer   unzip   writebuf   write   zip

5. Performance

The good

State and inputs
forward through the chain

Small functions, easy to inline

Benchmarks say that most transducers pipelines perform like hand-written code

Often faster than boost.range

The ugly

Type erasure is expensive

Adapting as iterator has overhead

4. Conclusion

transform
value
composable
independent

pure
fast
wip
std?

Thank
you
very
much

!

Extras





Tail recursive accumulate


template <typename InT, typename StateT, typename FnT>
StateT accumulate(InT first, InT last, StateT state, FnT fn)
{
  return first == last
      ? state
      : accumulate(next(first), last,
                   fn(state, *first),
                   fn);
}
                




C++11 transducer


struct map_rf_gen {
    template <typename ReducingFnT,
              typename MappingT>
    struct apply {
        ReducingFnT step;
        MappingT mapping;

        template <typename State, typename ...Inputs>
        auto operator() (State&& s, Inputs&& ...is)
          -> ABL_DECLTYPE_RETURN(
            step(std::forward<State>(s),
                 estd::invoke(mapping, std::forward<Inputs>(is)...)))
    };
};

template <typename MappingT>
auto map(MappingT&& mapping)
    -> transducer_impl<detail::map_rf_gen, estd::decay_t<MappingT> > {
        return { std::forward<MappingT>(mapping) };
}
                

template<typename ReducingFnGenT,
         typename ...ParamTs>
struct transducer_impl : std::tuple<ParamTs...>
{
    using base_t = std::tuple<ParamTs...>;

    template <typename ...Ts>
    transducer_impl(Ts ...ts)
        : base_t(std::move(ts)...) {}

    template<typename ReducingFnT>
    constexpr auto operator() (ReducingFnT&& step) const
      -> typename ReducingFnGenT::template apply<
        estd::decay_t<ReducingFnT>,
        estd::decay_t<ParamTs>...
      >
    {
        using indexes_t = estd::make_index_sequence<sizeof...(ParamTs)>;
        return this->make(std::forward<ReducingFnT>(step), indexes_t());
    }
          

    template<typename ReducingFnT, std::size_t...indexes_t>
    constexpr auto make(ReducingFnT&& step, estd::index_sequence<indexes_t...>) const
      -> typename ReducingFnGenT::template apply<
        estd::decay_t<ReducingFnT>,
        estd::decay_t<ParamTs>...
      >
    {
        return { std::forward<ReducingFnT>(step),
                 std::get<indexes_t>(*this)... };
    }
};
          




Completion


into_vector(partiton(3), "abcdef")
// {'a', 'b', 'c'}, {'d', 'e', 'f'}

into_vector(partiton(3), "abcd")
// {'a', 'b', 'c'}, {'d'}
                

struct partition_tag {};

auto partition = [] (auto count) {
    return [] (auto step) {
        return [] (auto s, auto... ins) {
            using elem_t = decltype(tuplify(ins...));
            auto data = state_data(s, [] {
                return make_tuple(std::vector<elem_t>{}, step);
            });
            auto& group = get<0>(data);
            auto& step_ = get<1>(data);
            group.push_back(tuplify(ins...));
            auto w = group == count
              ? call(step_, state_unwrap(s), group)
              : skip(step_, state_unwrap(s), group);
            return wrap_state(w, data);
        }
    };
}
                

template <typename T>
auto state_wrapper_complete(partition_tag, T s) {
    auto data = state_wrapper_data(s);
    return state_complete(std::get<1>(data) (
        state_unwrap(s),
        std::get<0>(data)));
}
                




Jokes


The real smell of freaking essence of map and filter!