2013-10-18

Go-style coroutines in C++ - this time with (non)blocking IO

Go-style coroutines in C++ - this time with (non) blocking I/O

I've managed to write epoll-based IO scheduler, witch works on top of the coroutine scheduler described in my previous post.

The IO stuff comes in form of a small library. It provides io_scheduler and set of tcp socket wrappers (tcp_socket, tcp_acceptor). The principle is pretty simple: The io_scheduler creates coroutine with epoll loop, which receives commands via channel.

The socket classes try to perform each operation in non-blocking manner. When this is impossible, request is sent to io_scheduler, socket added to epoll, and then notification is received via another channel. At this moment coroutine may yields, and another one may be executed.

These gory details are obviously hidden from the user, for whom the code looks as if the operations were blocking.

Show me the code!

Here ya go. Just the main part, imagine the rest is your standard library. All the HTTP stuff is implemented using POCO

Everything is here: https://github.com/maciekgajewski/coroutines/tree/0.2, help yourself.

Smal HTTP server - C++

#include "coroutines/globals.hpp"
#include "coroutines/scheduler.hpp"

#include "coroutines_io/globals.hpp"
#include "coroutines_io/io_scheduler.hpp"
#include "coroutines_io/tcp_acceptor.hpp"

#include "client_connection.hpp"

#include <iostream>

using namespace coroutines;
using namespace boost::asio::ip;

void handler(http_request const& req, http_response& res)
{
    res.setStatus(Poco::Net::HTTPResponse::HTTP_OK);
    res.add("Content-Length", "14");
    res.add("Content-Type", "text/plain");

    res.stream() << "hello, world!\n";
}

void start_client_connection(tcp_socket& sock)
{
    client_connection c(std::move(sock), handler);
    c.start();
}

void server()
{
    try
    {
        tcp_acceptor acc;
        acc.listen(tcp::endpoint(address_v4::any(), 8080));

        std::cout << "Server accepting connections" << std::endl;
        for(;;)
        {
            tcp_socket sock = acc.accept();
            go("client connection", start_client_connection, std::move(sock));
        }
    }
    catch(const std::exception& e)
    {
        std::cerr << "server error: " << e.what() << std::endl;
    }
}

int main(int argc, char** argv)
{
    scheduler sched(4);
    io_scheduler io_sched(sched);
    set_scheduler(&sched);
    set_io_scheduler(&io_sched);

    io_sched.start();

    go("acceptor", server);

    sched.wait();
}

Smal HTTP server - Go

And this is the equivalent code in Go. It's a bit smaller, but mostly because it hides the accept/serve loop behind library call. I wanted to keep this part visible. And Go is still ~20% faster. But I'm getting there :)

package main

import (
    "net/http"
    "io"
    "runtime"
)

func HelloServer(w http.ResponseWriter, req *http.Request) {
    w.Header().Set("Content-Type", "text/plain")
    w.Header().Set("Connection", "keep-alive")
    w.Header().Set("Content-Length", "14")
    io.WriteString(w, "hello, world!\n")
}

func main() {
    runtime.GOMAXPROCS(4)
    http.HandleFunc("/", HelloServer)
    http.ListenAndServe(":8081", nil)
}

2013-10-04

Go-style channels and coroutines in C++

Go-style channels and coroutines in C++

Adventures with C++ coroutines and Boost.Context continued.

The way of Go

Last time I described simple Python-style generator. Since then I've been working on something bigger: Go-style coroutines. In Go, corotuines are part of the language. A coroutine (goroutine, as they call it) is started by applying go keyword to a function, and blocking, fixed-size channels are used for synchronization and data transfer. sample Go code:
 c := make(chan int)  // Allocate a channel.
// Start the sort in a goroutine; when it completes, signal on the channel.
go func() {
    list.Sort()
    c <- 1  // Send a signal; value does not matter.
}()
doSomethingForAWhile()
<-c   // Wait for sort to finish; discard sent value.

How to do it in C++

This is what we need:
  • A scheduler - something that will maintain a thread pool, and a per-thread queue of coroutines
  • A monitor - a tool that will allow one or more coroutines to pause (yield) and wait for a signal.
  • A channel - inter-thread, multiple-producer-multiple-consumer, fixed size queue. Operations on channel are 'blocking': when coroutine tries to read from empty channel or write to a full one - it yields, and the next one is executed
  • Couple of global functions: go and make_channel - to emulate Go.
The code is here: https://github.com/maciekgajewski/coroutines/tree/0.1

How does it work - a simple example

This is a simple example: parallel file decompression. Full working source code is here. The task is to find all xz-compressed files in input directory, and decompress each of them into output directory.

Step 1- setup scheduler, launch first corountines

At first, the program looks for compressed files. For each file it launches a coroutine process_file, using global> function go. Apart from being launched this way, process_file is just a normal C++ function.
The scheduler itself needs to be created explicitly. set_scheduler sets global variable used by all the global functions.
    namespace bfs = boost::filesystem;
    scheduler sched(4 /*threads*/); // Go: runtime.MAXPROC(4)
    set_scheduler(&sched);

    try
    {
        bfs::path input_dir(in);
        bfs::path output_dir(out);


        for(bfs::directory_iterator it(input_dir); it != bfs::directory_iterator(); ++it)
        {
            if (it->path().extension() == ".xz" && it->status().type() == bfs::regular_file)
            {
                bfs::path output_path = output_dir / it->path().filename().stem();
                go(std::string("process_file ") + it->path().string(), process_file, it->path(), output_path);
            }

        }

    }
    catch(const std::exception& e)
    {
        std::cerr << "Error :" << e.what() << std::endl;
    }

    sched.wait();
    set_scheduler(nullptr);

Step 2 - setup pipeline

The code is already concurrent: each file is processed in separate coroutine, concurrently and possibly in parallel.
But we can go further: the process of decompressing file can be split into 3 stages: reading compressed file, decompressing and writing output. Each of the stages is implemented as a separate coroutine. process_file creates processing pipeline: coroutines connected by channels.
The channels will pass buffers with data. Only fixed amount of buffers is going allocated: this is C++, not some garbage-collected language! The amount of memory used by the program is deterministic. The return channels are needed to circulate buffers.
// used to send blocks of data around. Movable, bot not copytable
class buffer
{
public:

    typedef char value_type;
    typedef char* iterator;
    typedef const char* const_iterator;

    // null buffer
    buffer() = default;

    // alocated buffer
    buffer(std::size_t capacity)
    : _capacity(capacity), _size(0), _data(new char[_capacity])
    {
    }

    ~buffer()
    {
    }

    buffer(const buffer&) = delete;
    buffer(buffer&& o) noexcept
    {
        swap(o);
    }

    buffer& operator=(buffer&& o)
    {
        swap(o);
        return *this;
    }

    // iterators
    iterator begin() { return _data.get(); }
    iterator end() { return _data.get() + _size; }
    const_iterator begin() const { return _data.get(); }
    const_iterator end() const { return _data.get() + _size; }

    // size/capacity
    void set_size(std::size_t s) { _size = s; }
    std::size_t size() const { return _size; }
    std::size_t capacity() const { return _capacity; }

    bool is_null() const { return !_capacity; }

    // other
    void swap(buffer& o) noexcept
    {
        std::swap(_capacity, o._capacity);
        std::swap(_size, o._size);
        std::swap(_data, o._data);
    }

private:

    std::size_t _capacity = 0;  // buffer capacity
    std::size_t _size = 0;      // amount of data in
    std::unique_ptr<char[]> _data;
};
And the pipeline is created like this:
void process_file(const bfs::path& input_file, const bfs::path& output_file)
{
    channel_pair<buffer> compressed = make_channel<buffer>(BUFFERS, "compressed");
    channel_pair<buffer> decompressed = make_channel<buffer>(BUFFERS, "decompressed");
    channel_pair<buffer> compressed_return = make_channel<buffer>(BUFFERS, "compressed_return");
    channel_pair<buffer> decompressed_return = make_channel<buffer>(BUFFERS, "decompressed_return");


    // start writer
    go(std::string("write_output ") + output_file.string(),
        write_output,
        decompressed.reade, decompressed_return.writer, output_file);

    // start decompressor
    go(std::string("lzma_decompress ") + input_file.string(),
        lzma_decompress,
        compressed.reader, compressed_return.writer,
        decompressed_return.reader, decompressed.writer);

    // read (in this coroutine)
    read_input(compressed.writer, compressed_return.reader, input_file);
}

Step 3 - read->decompress->write

And this is the final step: reading, decompressing and writing, all in separate coroutines, passing data around using channels.
read_input
void read_input(buffer_writer& compressed, buffer_reader& compressed_return, const bfs::path& input_file)
{
    try
    {
        file f(input_file.string().c_str(), "rb");

        unsigned counter = 0;
        for(;;)
        {
            buffer b;
            if (counter++ < BUFFERS)
                b = buffer(BUFFER_SIZE);
            else
                b = compressed_return.get(); // get spent buffer from decoder
            std::size_t r = f.read(b.begin(), b.capacity());
            if (r == 0)
                break; // this will close the channel
            else
            {
                b.set_size(r);
                compressed.put(std::move(b));
            }
        }
    }
    catch(const std::exception& e)
    {
        std::cerr << "Error reading file " << input_file << " : " << e.what() << std::endl;
    }
}
lzma_decompress
void lzma_decompress(
    buffer_reader& compressed,
    buffer_writer& compressed_return,

    buffer_reader& decompressed_return,
    buffer_writer& decopressed
)
{
    lzma_stream stream = LZMA_STREAM_INIT;
    lzma_ret ret = lzma_stream_decoder(&stream, UINT64_MAX, LZMA_CONCATENATED);
    if (ret != LZMA_OK)
    {
        throw std::runtime_error("lzma initialization failed");
    }

    buffer inbuf;
    buffer outbuf = decompressed_return.get(); // get allocated buffer from writer

    stream.next_in = nullptr;
    stream.avail_in = 0;
    stream.next_out = (unsigned char*)outbuf.begin();
    stream.avail_out = outbuf.capacity();


    while(ret == LZMA_OK)
    {
        lzma_action action = LZMA_RUN;

        // read more data, if input buffer empty
        if(stream.avail_in == 0)
        {
            // return previous used buffer
            if (!inbuf.is_null())
                compressed_return.put_nothrow(std::move(inbuf));
            try
            {
                // read one
                inbuf = compressed.get();
                stream.next_in = (unsigned char*)inbuf.begin();
                stream.avail_in = inbuf.size();
            }
            catch(const coroutines::channel_closed&)
            {
                action = LZMA_FINISH;
            }
        }

        // decompress
        ret = lzma_code(&stream, action);
        if (stream.avail_out == 0 || ret == LZMA_STREAM_END)
        {
            outbuf.set_size(stream.next_out - (unsigned char*)outbuf.begin());
            // send the buffer, receive an empty one
            decopressed.put(std::move(outbuf));

            if (ret != LZMA_STREAM_END)
            {
                outbuf = decompressed_return.get();
                stream.next_out = (unsigned char*)outbuf.begin();
                stream.avail_out = outbuf.capacity();
            }
        }

    }

    lzma_end(&stream);

    if (ret != LZMA_STREAM_END)
    {
        std::cerr << "lzma decoding error" << std::endl;
    }
    // exit will close all channels
}
write_output
void write_output(buffer_reader& decompressed, buffer_writer& decompressed_return, const  bfs::path& output_file)
{
    try
    {
        // open file
        file f(output_file.string().c_str(), "wb");

        // fill the queue with allocated buffers
        for(unsigned i = 0; i < BUFFERS; i++)
        {
            decompressed_return.put(buffer(BUFFER_SIZE));
        }

        for(;;)
        {
            buffer b = decompressed.get();
            f.write(b.begin(), b.size());
            decompressed_return.put_nothrow(std::move(b)); // return buffer to decoder
        }
    }
    catch(const channel_closed&) // this exception is expected when channels are closed
    {
    }
    catch(const std::exception& e)
    {
        std::cerr << "Error writing to output file " << output_file << " : " << e.what() << std::endl;
    }
}
And this is it: concurrent, parallel processing without a single mutex!

What next

This one was easy - some processing, file IO, nothing fancy. To make it really useful, network IO is needed: set of socket operations which would seem like they are blocking from coroutine's perspective, but in fact they would use event loop and context switching to provide concurrency.

I'm working on it. Watch this space!

2013-09-18

Python-style generators in C++ (part 2)

Python-style generators in C++ (part 2)

In part 2 I will continue turn the code from part 1 into something more usable.

c++ implementation, phase 2: getting rid of globals

In Python, generator functions returns generator object, which encapsulates the private context and allows for controlling the execution. Nothing prevents the user to have more than one generator using the same function, running in parallel:
#!/usr/bin/env python

def fibonacci():
    last = 1
    current = 1
    while(True):
        yield current
        nxt = last + current
        last = current
        current = nxt


N = 10
print('Two fibonacci sequences generated in parallel:')
generator1 = fibonacci()
print('seq #1: %d' % generator1.next())
print('seq #1: %d' % generator1.next())

generator2 = fibonacci()
for i in range(N):
    print('seq #1: %d' % generator1.next())
    print('seq #2:       %d' % generator2.next())

Output:
seq #1: 1
seq #1: 2
seq #1: 3
seq #2:       1
seq #1: 5
seq #2:       2
seq #1: 8
seq #2:       3
seq #1: 13
seq #2:       5
seq #1: 21
seq #2:       8
seq #1: 34
seq #2:       13
seq #1: 55
seq #2:       21
seq #1: 89
seq #2:       34
seq #1: 144
seq #2:       55
seq #1: 233
seq #2:       89
In our C++ code, all global functions and variables need to be wrapped into a class. Unfortunately - that means that yield can no longer be global. It can be passed as a parameter to the generator function, but this will change the function signature. To make it work, the function call has to be wrapped; this is an perfect opportunity to solve another major problem: the generator function can now return or throw exception.
New C++11 exception tools can be used to capture any exception from the generator function and re-throw it in the main context.
// exception thrown where generator function exists
struct generator_finished : public std::exception
{
    virtual const char* what() const noexcept { return "generator finished"; }
};

class generator
{
public:

    typedef std::function<void(intptr_t)> yield_function_type;
    typedef std::function<void(yield_function_type)> generator_function_type;

    // former 'init()'
    generator(generator_function_type generator, std::size_t stack_size = DEFAULT_STACK_SIZE)
        : _generator(std::move(generator))
    {
        // allocate stack for new context
        _stack = new char[stack_size];

        // make a new context. The returned fcontext_t is created on the new stack, so there is no need to delete it
        _new_context = boost::context::make_fcontext(
                    _stack + stack_size, // new stack pointer. On x86/64 it hast be the TOP of the stack (hence the "+ STACK_SIZE")
                    stack_size,
                    &generator::static_generator_function); // will call generator wrapper
    // prevent copying
    generator(const generator&) = delete;
    // former 'cleanup()'
    ~generator()
    {
        delete _stack;
        _stack = nullptr;
        _new_context = nullptr;
    }

    intptr_t next()
    {
        // prevent calling when the generator function already finished
        if (_exception)
            std::rethrow_exception(_exception);

        // switch to function context. May set _exception
        intptr_t result = boost::context::jump_fcontext(&_main_context, _new_context, reinterpret_cast<intptr_t>(this));
        if (_exception)
            std::rethrow_exception(_exception);
        else
            return result;
    }

private:

    // former global variables
    boost::context::fcontext_t _main_context; // will hold the main execution context
    boost::context::fcontext_t* _new_context = nullptr; // will point to the new context
    static const int DEFAULT_STACK_SIZE= 64*1024; // completely arbitrary value
    char* _stack = nullptr;

    generator_function_type _generator; // generator function

    std::exception_ptr _exception = nullptr;// pointer to exception thrown by generator function


    // the actual generator function used to create context
    static void static_generator_function(intptr_t param)
    {
        generator* _this = reinterpret_cast<generator*>(param);
        _this->generator_wrapper();
    }

    void yield(intptr_t value)
    {
        boost::context::jump_fcontext(_new_context, &_main_context, value); // switch back to the main context
    }

    void generator_wrapper()
    {
        try
        {
            _generator([this](intptr_t value) // use lambda to bind this to yield
            {
                yield(value);
            });
            throw generator_finished();
        }
        catch(...)
        {
            // store the exception, is it can be thrown back in the main context
            _exception = std::current_exception();
            boost::context::jump_fcontext(_new_context, &_main_context, 0); // switch back to the main context
        }
    }
};

void fibonacci(const generator::yield_function_type& yield)
{
    intptr_t last = 1;
    intptr_t current = 1;
    for(;;)
    {
        yield(current);
        intptr_t nxt = last + current;
        last = current;
        current = nxt;
    }
}

int main(int , char** )
{
    const int N = 10;
    std::cout << "Two fibonacci sequences generated in parallel::" << std::endl;
    generator generator1(fibonacci);
    std::cout << "seq #1: " << generator1.next() << std::endl;
    std::cout << "seq #1: " << generator1.next() << std::endl;

    generator generator2(fibonacci);
    for(int i = 0; i < N; i++)
    {
        std::cout << "seq #1: " << generator1.next() << std::endl;
        std::cout << "seq #2:       " << generator2.next() << std::endl;
    }
}

Final touch: sprinkling the code with templates

The generator is almost ready, and now not only the fibonacci, but also main looks almost like their python counterparts.
The last remaining flaw to fix is the return type: current implementation can not return anything else than intptr_t. This can be solved by turning the generator into a template. The returns value can no longer be passed via jump_fcontext, but we can pass it by member variable, just like the exception.
#include <boost/context/all.hpp>
#include <boost/optional.hpp>
#include <iostream>
#include <iomanip>
#include <functional>
#include <exception>

// exception thrown where generator function exists
struct generator_finished : public std::exception
{
    virtual const char* what() const noexcept { return "generator finished"; }
};

template<typename ReturnType>
class generator
{
public:

    typedef std::function<void(const ReturnType&)> yield_function_type;
    typedef std::function<void(yield_function_type)> generator_function_type;

    // former 'init()'
    generator(generator_function_type generator, std::size_t stack_size = DEFAULT_STACK_SIZE)
        : _generator(std::move(generator))
    {
        // allocate stack for new context
        _stack = new char[stack_size];

        // make a new context. The returned fcontext_t is created on the new stack, so there is no need to delete it
        _new_context = boost::context::make_fcontext(
                    _stack + stack_size, // new stack pointer. On x86/64 it hast be the TOP of the stack (hence the "+ STACK_SIZE")
                    stack_size,
                    &generator::static_generator_function); // will call generator wrapper
    }

    // prevent copying
    generator(const generator&) = delete;

    // former 'cleanup()'
    ~generator()
    {
        delete _stack;
        _stack = nullptr;
        _new_context = nullptr;
    }

    ReturnType next()
    {
        // prevent calling when the generator function already finished
        if (_exception)
            std::rethrow_exception(_exception);

        // switch to function context. May set _exception
        boost::context::jump_fcontext(&_main_context, _new_context, reinterpret_cast<intptr_t>(this));
        if (_exception)
            std::rethrow_exception(_exception);
        else
            return *_return_value;
    }

private:

    // former global variables
    boost::context::fcontext_t _main_context; // will hold the main execution context
    boost::context::fcontext_t* _new_context = nullptr; // will point to the new context
    static const int DEFAULT_STACK_SIZE= 64*1024; // completely arbitrary value
    char* _stack = nullptr;

    generator_function_type _generator; // generator function

    std::exception_ptr _exception = nullptr;// pointer to exception thrown by generator function
    boost::optional<ReturnType> _return_value; // optional allows for using typed without defautl constructor


    // the actual generator function used to create context
    static void static_generator_function(intptr_t param)
    {
        generator* _this = reinterpret_cast<generator*>(param);
        _this->generator_wrapper();
    }

    void yield(const ReturnType& value)
    {
        _return_value = value;
        boost::context::jump_fcontext(_new_context, &_main_context, 0); // switch back to the main context
    }

    void generator_wrapper()
    {
        try
        {
            _generator([this](const ReturnType& value) // use lambda to bind this to yield
            {
                yield(value);
            });
            throw generator_finished();
        }
        catch(...)
        {
            // store the exception, is it can be thrown back in the main context
            _exception = std::current_exception();
            boost::context::jump_fcontext(_new_context, &_main_context, 0); // switch back to the main context
        }
    }
};

template<typename NumericType> // now we can choose in which flavour do we want our fibonacci numbers
void fibonacci(const typename generator<NumericType>::yield_function_type& yield)
{
    NumericType last = 1;
    NumericType current = 1;
    for(;;)
    {
        yield(current);
        NumericType nxt = last + current;
        last = current;
        current = nxt;
    }
}

int main(int , char** )
{
    const int N = 10;
    std::cout << "Two fibonacci sequences generated in parallel::" << std::endl;
    std::cout << std::setprecision(3) << std::fixed; // to make floating point number distinguishable
    generator<int> generator1(fibonacci<int>);
    std::cout << "seq #1: " << generator1.next() << std::endl;
    std::cout << "seq #1: " << generator1.next() << std::endl;

    generator<double> generator2(fibonacci<double>);
    for(int i = 0; i < N; i++)
    {
        std::cout << "seq #1: " << generator1.next() << std::endl;
        std::cout << "seq #2:       "  << generator2.next() << std::endl;
    }
}
Output:
Two fibonacci sequences generated in parallel::
seq #1: 1
seq #1: 2
seq #1: 3
seq #2:       1.000
seq #1: 5
seq #2:       2.000
seq #1: 8
seq #2:       3.000
seq #1: 13
seq #2:       5.000
seq #1: 21
seq #2:       8.000
seq #1: 34
seq #2:       13.000
seq #1: 55
seq #2:       21.000
seq #1: 89
seq #2:       34.000
seq #1: 144
seq #2:       55.000
seq #1: 233
seq #2:       89.000

What else?

Further work on the generator could include:
  • Passing arguments to generator function. This could be done with variadic templates and tuples
  • Implementing copy constructor. This would allow for taking snapshot of function execution state at any moment.
  • Making the code thread-aware. Currently generator::next() can be called from different threads, effectively making different parts of the function to run in different threads. Wicked!