Datenbankzugriffe parallelisieren mit ØMQ

Peter J. Holzer
hjp@hjp.at

Es war einmal eine Datenbank …

  • 1973: Gründung des WSR als gemeinsames Rechenzentrum des WIFO und des IHS
  • Enwicklung des WIFO-Zeitreihen-Prozessors samt hierarchischer Datenbank auf Univac in FORTRAN.
  • 1992: Portierung auf auf HP-UX und Oracle (und Pro*For)
  • Frontends in C++ (Windows) und Perl (Web)

und wenn sie nicht gestorben ist …

… dann lebt sie noch heute.

Ja, sie lebt noch.

denn nun wurde es kompliziert

  • Das »FIW-Tool«
    • Multidimensionaler Würfel
    • Oracle
    • 8 Milliarden Rows
    • Langsam
    • UI historisch gewachsen
  • Der NoSQL-Prototyp
    • Wir schreiben unsere eigene NoSQL-Datenbank!
    • Management war nicht begeistert
  • Consultants, Consultants, Consultants, ...

Und dann kam Macrobond

Und dann kam Macrobond

Anforderungen

Server heute

Datenmodell

Nicht elegant, aber praktisch

Typische Query

select facttablename, columnname, term from concept c1, relation r, concept c2, term t
where c1.canonicalname='Berichtsregion'
  and c1.id=r.parent
  and c2.id=r.child and c2.canonicalname='Österreich'
  and t.concept_id=c2.id
      
select * from facttable_gen_preise f
where f.berichtsregion='295';
      
select * from facttable_gen_finanz f
where f.berichtsregion='295';
      
select * from facttable_kon_eh f
where f.berichtsregion='10';
      
select * from facttable_eurostat_comext f
where f.rep='38';
      

Offensichtlich über Facttables parallelisierbar!

Lies for Children

Das war ein bisschen vereinfacht.

In Wirklichkeit sieht das eher so aus:

select facttablename, columnname, term, concept_id, t.hidden, language,
       register, id, canonicalname, description, parent, type, r.sortorder
from term t, concept c, relation r
where facttablename='facttable_gen_tourismus' and columnname='erhebungszeitpunkt'
 and exists (
    select 1 from facttable_gen_tourismus f
    where f.erhebungszeitpunkt=t.term 
      and thema in (E'10')
      and saison in (E'3')
      and berichtsregion in (E'295.06')
 )
 and c.id=concept_id and r.child=concept_id

Aber die Parallelisierbarkeit bleibt.

Macrobond und PostgreSQL

Architektur

ØMQ

ØMQ

ØMQ Status Quo

ØMQ demnächst?

ØMQ demnächst?

Stored Procedure

create or replace function mb_nextlevel_zmq(variadic integer[]) returns setof mb_treenode as $$

    use strict;
    use warnings;
    use 5.10.0;
    use utf8;

    use ZMQ::LibZMQ3;
    use ZMQ::Constants qw(ZMQ_REQ);
    use WDS::Macrobond::Utils;

    my ($ids) = @_;
    $ids =~ s/{(.*)}/$1/;
    my @ids = split(/,/, $ids);

    # some constants
    my $GS         = "\x{1D}"; # ASCII group separator
    my $RS         = "\x{1E}"; # ASCII record separator
    my $US         = "\x{1F}"; # ASCII unit separator

    my $sth = spi_query("select * from userconfig where logname=user");
    my $row = spi_fetchrow($sth);
    my $url = $row->{mb_dal_url} // "tcp://127.0.0.1:21887";
    spi_cursor_close($sth);

    my $context = zmq_init();
    my $req_sck = zmq_socket($context, ZMQ_REQ);
    zmq_connect($req_sck, $url);
    my $qry_msg = join($US, "mb_nextlevel", @ids);
    zmq_send($req_sck, $qry_msg);
    my $res_msg = zmq_msg_init();
    zmq_msg_recv($res_msg, $req_sck);
    my $result = WDS::Macrobond::Utils::decode_result(zmq_msg_data($res_msg));
    my $aoh = WDS::Macrobond::Utils::aoh($result);
    return $aoh;
$$
language plperlu;

mb_dal - Request loop

my $context = zmq_init();

# This service is strictly synchronous. We answer every request before
# considering the next one. So we use a REP socket.
my $qry_socket = zmq_socket($context, ZMQ_REP);
zmq_bind($qry_socket, "tcp://*:$qry_port");
...
for (;;) {
    zmq_msg_recv($msg, $qry_socket);
    my $t0 = time;
    my $req = zmq_msg_data($msg);
    my @param = split($US, $req);
    $_ = decode_utf8($_) for @param;
    my $cmd = shift(@param);
    ++$req_no;
    print STDERR "$0: $t0: received request $req_no: $cmd @param\n";

    if ($cmd{$cmd}) {
        my $result = $cmd{$cmd}->(@param);
        my $msg = WDS::Macrobond::Utils::encode_result($result);
        zmq_msg_send($msg, $qry_socket);
    } else {
        say STDERR "unrecognized command $cmd";
        my $result = { success => 'ERROR', error => "unrecognized command $cmd" };
        my $msg = WDS::Macrobond::Utils::encode_result($result);
        zmq_msg_send($msg, $qry_socket);
    }
    my $t1 = time;
    print STDERR "$0: $t1: finished request $req_no: $cmd @param (" . ($t1 - $t0) . " s)\n";
    exit 0 if WDS::Macrobond::Utils::has_changed($treestate0);
}

WDS::Macrobond::Workers

sub BUILD {
    my ($self, $args) = @_;

    my $context = zmq_init();
    my $socket = zmq_socket($context, ZMQ_ROUTER);
    my $url = $self->url;
    unless ($url) {
        my $port = int(rand(0x4000)) + 0xC000;
        $url  = "tcp://127.0.0.1:$port";
        $self->_set_url($url);
    }
    zmq_bind($socket, $self->url);
    $self->_set_utils(WDS::Macrobond::Utils->new);
    $self->_set_socket($socket);
    my $worker_pids = {};
    for (1 .. $self->workers) {
        my $rc = fork();
        if (!defined $rc) {
            die "cannot fork: $!";
        } elsif ($rc == 0) {
            exec($self->worker, $self->url, "wds", $self->schema);
            die "cannot exec " . $self->worker . ": $!";
        } else {
            $worker_pids->{$rc} = 1;
        }
    }
    $self->_worker_pids($worker_pids);
}

WDS::Macrobond::Workers

sub sync_query {
    my ($self, $queries) = @_;

    $self->query_time(0);
    my $t0 = time;
    my $socket = $self->socket;

    # list of ready workers
    my $ready = $self->_ready;

    # outstanding requests by worker id. 
    # we store an index into @$queries plus some stats
    my $outstanding = {};

    # results. Same order as @$queries
    # Each element is a hashref with the elements
    # success (OK, ERROR)
    # header (arrayref of column names)
    # error (error message)
    # resultset (AofA)
    my $finished = [];

    while (my ($i, $q) = each $queries) {
        if (@$ready == 0) {
            $self->process_worker_request($finished, $outstanding);
        }

        # send request
        my $worker_id = shift($ready);
        zmq_msg_send($worker_id, $socket, ZMQ_SNDMORE);
        zmq_msg_send("", $socket, ZMQ_SNDMORE);
        my $msg = join($US, "anonymous", "secret", $q);
        zmq_msg_send($msg, $socket);
        $outstanding->{$worker_id} = { qidx => $i, start => time };
    }

    while (keys %$outstanding) {
        $self->process_worker_request($finished, $outstanding);
    }

    my $t1 = time;
    $self->log('INFO', "elapsed = " . ($t1 - $t0) . "s, total = " . $self->query_time . "s");
    return $finished;
}

WDS::Macrobond::Workers

sub process_worker_request {
    my ($self, $finished, $outstanding) = @_;

    my $ready = $self->_ready;
    my $socket = $self->socket;

    my $msg = zmq_msg_init();
    # from REQ to ROUTER: (at least) 3 parts:
    zmq_msg_recv($msg, $socket);
    my $worker_id = zmq_msg_data($msg);
    zmq_msg_recv($msg, $socket);
    my $empty = zmq_msg_data($msg);
    die unless $empty eq ""; 
    zmq_msg_recv($msg, $socket);
    my $payload = zmq_msg_data($msg);

    my ($success, $header, $resultset) = split($GS, $payload);
    if (exists $outstanding->{$worker_id}) {
        my $i = $outstanding->{$worker_id}{qidx};
        my $start = $outstanding->{$worker_id}{start};

        $finished->[$i] = {
            success => $success,
        };
        if ($success eq 'OK') {
            $finished->[$i]{header} = [ split($US, $header) ];
            $finished->[$i]{resultset} = [];
            for my $g (split($RS, $resultset)) {
                my $row = [];
                for my $u (split($US, $g, -1)) {
                    push $row, ($u eq "\0" ? undef : $u);
                }
                push $finished->[$i]{resultset}, $row;
            }
        } elsif ($success eq 'ERROR') {
            $finished->[$i]{error} = $header;
        } else {
            die "cannot happen";
        }

        delete $outstanding->{$worker_id};

        my $now = time;
        my $dt = $now - $start;
        $self->log('INFO', "finished request $i: $start + $dt = $now");
        $self->query_time($self->query_time + $dt);
    }
    push $ready, $worker_id;
}

worker

my $req_sck = zmq_socket($context, ZMQ_REQ);
zmq_connect($req_sck, $boss);
my $dbh = DBIx::SimpleConnect->connect($db, { PrintError => 0, RaiseError => 1});
...

# Send an unsolicited "result"
zmq_send($req_sck, "NOP");
for (;;) {
    my $msg = zmq_msg_init();
    zmq_msg_recv($msg, $req_sck);
    my $message = zmq_msg_data($msg);
    my ($user, $auth, $query) = split($US, $message);
    my $error = 0;

    # XXX - check auth token and switch to user here (NYI).

    my $result;
    eval {
        my $sth = $dbh->prepare($query);
        $sth->execute;
        my $header = join($US, @{$sth->{NAME}});
        my $resultset = $dbh->selectall_arrayref($sth);

        $resultset = join($RS, 
                          map { join($US, map $_ // "\0", @$_) }
                              @$resultset);

        $result = join($GS, "OK", $header, $resultset);
        1;
    } or do {
        $result = join($GS, "ERROR", $@);
        $error = 1;
    };
    zmq_send($req_sck, $result);

    if ($error) {
        ...
    }
}

Verwendung

    $result
        = $workers->sync_query(
                        [
                            "select distinct concept_id, canonicalname
                               from term join concept on concept_id=concept.id
                              where facttablename is not null and columnname is not null and term is null"
                        ]
                    );

Verwendung

    for my $di (@$dim_instances) {
        my $facttablename = $di->{facttablename};
        my @meta_column_ids;
        my @real_column_names;
        for my $columnname (@{ $di->{columnname} }) {
            my $cmd;
            if (looks_like_number($columnname)) {
                push @meta_column_ids, $columnname;
            } else {
                push @real_column_names, $columnname;
            }
        }
        ...
        if (@real_column_names) {

            # The complicated case: We have to extract matching members from the fact table.

            # we have different column names, so we need a different subquery for each.
            # the path condition is the same for each

            # not sure if rolling them all into one query is ideal

            my $path_cond = "";
            for my $node (@{ $self->{facttables}{$facttablename}{path} }) {
                my $columnname = $node->{columnname};
                unless (looks_like_number($columnname)) {
                    $columnname = encode('UTF-8', $columnname);
                    $path_cond .= " and $columnname in (" 
                          . join(", ", map quote($_->{term}), @{ $node->{members} })
                          . ")";
                }
            }
            for my $columnname (@real_column_names) {
                my $cmd = "select facttablename, columnname, term, concept_id, t.hidden, language, register, id, canonicalname,
                                  description, parent, type, r.sortorder";
                $cmd .= " from term t, concept c, relation r";
                $cmd .= " where facttablename='$facttablename' and";
                $cmd .= " columnname='$columnname' and exists (select 1 from $facttablename f where f.$columnname=t.term $path_cond)";
                $cmd .= " and c.id=concept_id and r.child=concept_id";
                push @cmd, $cmd;
            }
        }
    }

    my $result = $self->{workers}->sync_query(\@cmd);

Ergebnis

"Echte" Query (Alle Daten von Statistik Austria für Berichtsregion Salzburg) inkl. nicht-parallelisiertem Anteil

Frischer DAL-Prozess, aber "vorgewärmte" Datenbank.

Erheblicher Anteil an nicht-parallelisierten Querys.

Todo

Fragen

 
?