Artifact Content
Not logged in

Artifact e92c2afc53c2fba99ac304216856a50ba9ebbab4:


// {{{ headers
/*
 * Copyright (c) 2003-2005 Thomer M. Gil
 *                    Massachusetts Institute of Technology
 *
 * Permission is hereby granted, free of charge, to any person obtaining
 * a copy of this software and associated documentation files (the
 * "Software"), to deal in the Software without restriction, including
 * without limitation the rights to use, copy, modify, merge, publish,
 * distribute, sublicense, and/or sell copies of the Software, and to
 * permit persons to whom the Software is furnished to do so, subject to
 * the following conditions:
 *
 * The above copyright notice and this permission notice shall be
 * included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 */

#include "kademlia.h"
#include "p2psim/network.h"
#include "p2psim/threadmanager.h"
#include <deque>
#include <iostream>
using namespace std;

Kademlia::use_replacement_cache_t Kademlia::use_replacement_cache = FULL;
unsigned Kademlia::debugcounter = 1;
unsigned Kademlia::_nkademlias = 0;
Kademlia::k_nodeinfo_pool *Kademlia::pool = 0;

bool Kademlia::docheckrep = false;

unsigned Kademlia::k = 0;
unsigned Kademlia::k_tell = 0;
unsigned Kademlia::alpha = 0;
unsigned Kademlia::stabilize_timer = 0;
unsigned Kademlia::refresh_rate = 0;
unsigned Kademlia::erase_count = 0;
Time Kademlia::max_lookup_time = 0;
bool Kademlia::learn_stabilize_only = false;
bool Kademlia::force_stabilization = false;
bool Kademlia::death_notification = false;

long long unsigned Kademlia::_rpc_bytes = 0;
long unsigned Kademlia::_good_rpcs = 0;
long unsigned Kademlia::_bad_rpcs = 0;
long unsigned Kademlia::_ok_by_reaper = 0;
long unsigned Kademlia::_timeouts_by_reaper = 0;

long unsigned Kademlia::_good_lookups = 0;
long unsigned Kademlia::_good_attempts = 0;
long unsigned Kademlia::_bad_attempts = 0;
long unsigned Kademlia::_lookup_dead_node = 0;
long unsigned Kademlia::_ok_failures = 0;
long unsigned Kademlia::_bad_failures = 0;

Time Kademlia::_good_total_latency = 0;
Time Kademlia::_good_lookup_latency = 0;
Time Kademlia::_good_ping_latency = 0;
long unsigned Kademlia::_good_timeouts = 0;

long unsigned Kademlia::_good_hops = 0;
Time Kademlia::_good_hop_latency = 0;

Time Kademlia::_bad_lookup_latency = 0;
long unsigned Kademlia::_bad_timeouts = 0;
long unsigned Kademlia::_bad_hops = 0;
Time Kademlia::_bad_hop_latency = 0;
Time Kademlia::_default_timeout = 0;

unsigned Kademlia::_to_multiplier = 0;
unsigned Kademlia::_to_cheat = 0;


Kademlia::NodeID *Kademlia::_all_kademlias = 0;
HashMap<Kademlia::NodeID, Kademlia*> *Kademlia::_nodeid2kademlia = 0;

#define NODES_ITER(x) for(set<k_nodeinfo, Kademlia::closer>::const_iterator i = (x)->begin(); i != (x)->end(); ++i)


// }}}
// {{{ Kademlia::Kademlia
Kademlia::Kademlia(IPAddress i, Args a)
  : P2Protocol(i), _id(ConsistentHash::ip2chid(ip()))
{
  // KDEBUG(1) << "id: " << printID(_id) << ", ip: " << ip() << endl;
  if(getenv("P2PSIM_CHECKREP"))
    docheckrep = strcmp(getenv("P2PSIM_CHECKREP"), "0") ? true : false;

  use_replacement_cache = a.nget<use_replacement_cache_t>("rcache", ENABLED, 10);
  max_lookup_time = a.nget<Time>("maxlookuptime", 4000, 10);
  learn_stabilize_only = a.nget<unsigned>("stablearn_only", 0, 10) == 1 ? true : false;
  force_stabilization = a.nget<unsigned>("force_stab", 0, 10) == 1 ? true : false;
  death_notification = 
    a.nget<unsigned>("death_notify", 1, 10) == 1 ? true : false;

  if(!k) {
    k = a.nget<unsigned>("k", 20, 10);
  }

  if(!k_tell) {
    k_tell = a.nget<unsigned>("k_tell", k, 10);
  }

  if(!alpha) {
    alpha = a.nget<unsigned>("alpha", 3, 10);
  }

  if(!stabilize_timer) {
    refresh_rate = stabilize_timer = a.nget<unsigned>("stabilize_timer", 10000, 10);
  }

  if(!erase_count) {
    erase_count = a.nget<unsigned>("erase_count", 5, 10);
  }

  if(!_default_timeout) {
    _default_timeout = a.nget<Time>("default_timeout", 1000, 10);
  }

  if (!_to_multiplier) {
    _to_multiplier = a.nget<unsigned>("timeout_multiplier",3,10);
  }
  _to_cheat = a.nget<unsigned>("timeout_cheat",1,10);

  if(!Kademlia::pool)
    Kademlia::pool = New k_nodeinfo_pool();
  _me = k_nodeinfo(_id, ip());
  _joined = false;

  init_k_bucket_tree();
  _nkademlias++;
}

Kademlia::NodeID Kademlia::closer::n = 0;
Kademlia::NodeID Kademlia::closerRTT::n = 0;
// }}}
// {{{ Kademlia::~Kademlia
Kademlia::~Kademlia()
{
  // KDEBUG(1) << "Kademlia::~Kademlia" << endl;
  for(HashMap<NodeID, k_nodeinfo*>::iterator i = flyweight.begin(); i != flyweight.end(); ++i)
    Kademlia::pool->push(i.value());
  flyweight.clear();

  delete _root;

  if(ip() == 1) {
    printf("\
#  1: k\n\
#  2: k_tell\n\
#  3: alpha\n\
#  4: stabilize_timer\n\
#  5: refresh_timer\n\
#  6: learn\n\
#  7: rcache\n\
#\n\
#  8: total number of bytes for RPCs\n\
#  9: number of RPCs sent in lookup (good)\n\
# 10: number of RPCs sent in lookup (bad)\n\
# 11: number of good      RPCs reaped by reaper\n\
# 12: number of timed out RPCs reaped by reaper\n\
#\n\
# 13: number of good lookups\n\
# 14: number of attempts for good lookups\n\
# 15: number of attempts for bad lookups\n\
# 16: number of good lookups, but node was dead\n\
# 17: number of bad lookups, node was dead\n\
# 18: number of bad lookups, node was alive\n\
#\n\
# 19: avg total lookup latency (good)\n\
# 20: avg pure lookup latency (good)\n\
# 21: avg pure ping latency (good)\n\
# 22: number of timeouts suffered during lookup (good)\n\
#\n\
# 23: avg number of hops (good)\n\
# 24: avg latency per hop (good)\n\
#\n\
# 25: avg pure lookup latency (bad)\n\
# 26: number of timeouts suffered during lookup (bad)\n\
# 27: avg number of hops (bad)\n\
# 28: avg latency per hop (bad)\n\
#\n\
%u %u %u %u %u %u %u      %llu %lu %lu %lu %lu     %lu %lu %lu %lu %lu %lu    %.2f %.2f %.2f %lu    %.2f %.2f   %.2f %lu %.2f %.2f\n",
        Kademlia::k,
        Kademlia::k_tell,
        Kademlia::alpha,
        Kademlia::stabilize_timer,
        Kademlia::refresh_rate,
        Kademlia::learn_stabilize_only ? 1 : 0,
        Kademlia::use_replacement_cache,

        _rpc_bytes,
        _good_rpcs,
        _bad_rpcs,
        _ok_by_reaper,
        _timeouts_by_reaper,

        _good_lookups,
        _good_attempts,
        _bad_attempts,
        _lookup_dead_node,
        _ok_failures,
        _bad_failures,

        (double) _good_total_latency / _good_lookups,
        (double) _good_lookup_latency / _good_lookups,
        (double) _good_ping_latency / _good_lookups,
        _good_timeouts,

        (double) _good_hops / _good_lookups,
        (double) _good_hop_latency / _good_hops,

        (double) _bad_lookup_latency / _bad_attempts,
        _bad_timeouts,
        (double) _bad_hops / _bad_attempts,
        (double) _bad_hop_latency / _bad_attempts);


    print_stats();
  }

  if(_all_kademlias) {
    delete _all_kademlias;
    _all_kademlias = 0;
  }

  if(_nodeid2kademlia) {
    delete _nodeid2kademlia;
    _nodeid2kademlia = 0;
  }


  if(--_nkademlias == 0) {
    delete pool;
    pool = 0;
  }
}

void
Kademlia::init_k_bucket_tree()
{
  _root = New k_bucket(0, this);
  _root->leaf = false;

// build the entire k-bucket tree
  k_bucket *b = _root;
  unsigned bit;
  for(unsigned i=0; ; i++) {
    bit = Kademlia::getbit(_id, i);

    // leafside
    b->child[bit^1] = New k_bucket(b, this);
    b->child[bit^1]->nodes = New k_nodes(b->child[bit^1]);
    b->child[bit^1]->replacement_cache = New k_nodes(b->child[bit^1]);
    b->child[bit^1]->leaf = true;

    // nodeside
    b->child[bit] = New k_bucket(b, this);
    b = b->child[bit];
    b->leaf = false;

    if(i == Kademlia::idsize-2)
      break;
  }
  b->nodes = New k_nodes(b);
  b->replacement_cache = New k_nodes(b);
  b->leaf = true;
}

// jy
Time
Kademlia::timeout(IPAddress dst)
{
  if (_to_cheat)
    return _to_multiplier * 2 * Network::Instance()->gettopology()->latency(_me.ip,dst);
  else
    return Kademlia::_default_timeout;
}

// }}}
// {{{ Kademlia::initstate
void
Kademlia::initstate()
{
  const set<Node*> *l = Network::Instance()->getallnodes();
  // KDEBUG(1) << "Kademlia::initstate l->size = " << l->size() << endl;

  if(!_all_kademlias) {
    // KDEBUG(1) << "allocating all" << endl;
    _all_kademlias = New NodeID[l->size()];
    _nodeid2kademlia = New HashMap<NodeID, Kademlia*>;
    unsigned j = 0;
    for(set<Node*>::const_iterator i = l->begin(); i != l->end(); ++i) {
      Kademlia *k = (Kademlia *) *i;
      _all_kademlias[j++] = k->id();
      _nodeid2kademlia->insert(k->id(), k);
    }
  }

  Kademlia::NodeID upper = 0, lower = 0;
  for(unsigned depth=0; depth<Kademlia::idsize; depth++) {
    upper = lower = 0;
    for(unsigned i=0; i<Kademlia::idsize; i++) {
      unsigned bit = Kademlia::getbit(_id, i);
      if(i < depth) {
        lower |= (((Kademlia::NodeID) bit) << Kademlia::idsize-i-1);
        upper |= (((Kademlia::NodeID) bit) << Kademlia::idsize-i-1);
      } else if(i == depth) {
        lower |= (((Kademlia::NodeID) bit^((Kademlia::NodeID)1)) << Kademlia::idsize-i-1);
        upper |= (((Kademlia::NodeID) bit^((Kademlia::NodeID)1)) << Kademlia::idsize-i-1);
      } else if(i > depth)
        upper |= (((Kademlia::NodeID) 1) << Kademlia::idsize-i-1);
    }

    // // KDEBUG(2) << " initstate lower = " << Kademlia::printbits(lower) << endl;
    // // KDEBUG(2) << " initstate upper = " << Kademlia::printbits(upper) << endl;

    // create a set of canditates
    vector<NodeID> candidates;
    for(unsigned i = 0; i < l->size(); i++) {
      if(_all_kademlias[i] >= lower && _all_kademlias[i] <= upper)
        candidates.push_back(_all_kademlias[i]);
    }

    // if candidate set size is less than k, then insert them all.
    if(candidates.size() <= Kademlia::k) {
      for(vector<NodeID>::const_iterator i = candidates.begin(); i != candidates.end(); ++i) {
        Kademlia *k = (*_nodeid2kademlia)[*i];
        // KDEBUG(2) << "initstate: inserting candidate = " << Kademlia::printID(k->id()) << endl;
        // assert(k);
        insert(k->id(), k->ip(), 0, 0, true);
      }
      continue;
    }

    // if candidate set is too large, pick k at random
    unsigned count = 0;
    while(count <= Kademlia::k && candidates.size()) {
      unsigned index = random() % candidates.size();
      Kademlia *k = (*_nodeid2kademlia)[candidates[index]];
      // assert(k);

      if(flyweight[k->id()])
        continue;

      insert(k->id(), k->ip(), 0, 0, true);
      count++;

      // make a copy of candidates and remove the selected index
      vector<NodeID> tmp;
      for(unsigned i=0; i<candidates.size(); i++) {
        if(i == index)
          continue;
        tmp.push_back(candidates[i]);
      }
      candidates.clear();
      candidates = tmp;
    }
  }
  _joined = true;
}

// }}}
// {{{ Kademlia::join
void
Kademlia::join(Args *args)
{
  if(_joined)
    return;

  IPAddress wkn = args->nget<IPAddress>("wellknown");

  // pick a new ID
  //KDEBUG(1) << "Kademlia::join ip " << ip() << " id " << printID(_id) << " now " << now() << endl;
  assert(!_root);
  _nodeid2kademlia->remove(_id);
  _id = ConsistentHash::ip2chid(ip());
  _me = k_nodeinfo(_id, ip());
  assert(!_nodeid2kademlia->find_pair(_id));
  _nodeid2kademlia->insert(_id, this);
  assert(!_root);
  init_k_bucket_tree();

  // I am the well-known node
  if(wkn == ip()) {
    if(stabilize_timer)
      delaycb(stabilize_timer, &Kademlia::reschedule_stabilizer, (void *) 0);
    _joined = true;
    return;
  }

join_restart:
  // lookup my own key with well known node.
  lookup_args la(_id, ip(), _id);
  la.stattype = Kademlia::STAT_JOIN;
  lookup_result lr;
  bool b = false;
  Time before = 0;
  do {
    record_stat(STAT_JOIN, 1, 0);
    before = now();
    b = doRPC(wkn, &Kademlia::do_lookup, &la, &lr, timeout(wkn));
    record_stat(STAT_JOIN, lr.results.size(), 0);
  } while(!b);

  if(!alive())
    return;

  // put well known node in k-buckets
  // KDEBUG(2) << "join: lr.rid = " << printID(lr.rid) << endl;
  update_k_bucket(lr.rid, wkn, now() - before);

  // put all nodes that wkn told us in k-buckets
  // NB: it's questionable whether we can do a touch here.  we haven't
  // actually talked to the node.
  NODES_ITER(&lr.results) {
    if(!flyweight[i->id] && i->id != _id) {
      insert(i->id, i->ip, 0, i->timeouts);
      touch(i->id);
    }
  }

  // get our ``successor'' and compute length of prefix we have in common
  vector<k_nodeinfo*> successors;
  _root->find_node(_id, &successors);
  closer::n = _id;
  sort(successors.begin(), successors.end(), closer());
  NodeID succ_id = successors[0]->id;

  //KDEBUG(2) << "join: succ ip is " << successors[0]->ip << " id is " << printID(succ_id) << endl;
  unsigned cpl = common_prefix(_id, succ_id);

  // all entries further away than him need to be refreshed.  this is similar to
  // stabilization.  flip a bit and set the rest of the ID to something random.
  for(unsigned i=idsize-cpl+1; i<idsize; i++) {
    NodeID random_key = _id ^ (((Kademlia::NodeID) 1) << i);
    for(int j=i-1; j>=0; j--)
      random_key ^= (((NodeID) random() & 0x1) << j);

    lookup_args la(_id, ip(), random_key);
    la.stattype = Kademlia::STAT_JOIN;
    lookup_result lr;

    // if we now believe our successor died, then start again
    if(!flyweight[succ_id]) {
      clear();
      goto join_restart;
    }

    // do a refresh on that bucket
    do_lookup(&la, &lr);

    if(!alive())
      return;

    // fill our finger table
    NODES_ITER(&lr.results)
      if(!flyweight[i->id] && i->id != _id)
        insert(i->id, i->ip, 0, i->timeouts);
        //  ... but not touch.  we didn't actually talk to the node.
  }

  _joined = true;

  if(stabilize_timer)
    delaycb(stabilize_timer, &Kademlia::reschedule_stabilizer, (void *) 0);
}

// }}}
// {{{ Kademlia::crash
void
Kademlia::crash(Args *args)
{
  // destroy k-buckets
  //KDEBUG(1) << "Kademlia::crash ip " << ip() << endl;
  // assert(alive());
  //_root->collapse();
  //jy
  delete _root;
  _root = NULL;
  for(HashMap<NodeID, k_nodeinfo*>::iterator i = flyweight.begin(); i; i++)
    Kademlia::pool->push(i.value());
  flyweight.clear();
  _joined = false;
}

// }}}
// {{{ Kademlia::clear
void
Kademlia::clear()
{
  // destroy k-buckets
  // KDEBUG(1) << "Kademlia::clear" << endl;
  for(HashMap<NodeID, k_nodeinfo*>::iterator i = flyweight.begin(); i; i++)
    Kademlia::pool->push(i.value());
  flyweight.clear();
  _root->collapse();
}

// }}}
// {{{ Kademlia::lookup
//
// we assume lookups are only for node IDs, but we have to cheat to do this.
void
Kademlia::lookup(Args *args)
{
  if(!_joined || !alive())
    return;

  IPAddress key_ip = args->nget<NodeID>("key");
  if(!Network::Instance()->getnode(key_ip)->alive())
    return;

  // find node with this IP
  Kademlia *k = (Kademlia*) Network::Instance()->getnode(key_ip);
  NodeID key = k->id();
  //KDEBUG(0) << "Kademlia::lookup: ip " << key_ip << " id " << printID(key) << endl;

  lookup_wrapper_args *lwa = New lookup_wrapper_args();
  lwa->ipkey = key_ip;
  lwa->key = key;
  lwa->starttime = now();
  lwa->attempts = 0;

  lookup_wrapper(lwa);
}

// }}}
// {{{ Kademlia::lookup_wrapper
void
Kademlia::lookup_wrapper(lookup_wrapper_args *args)
{
  static unsigned outcounter = 0;

  if(!alive()) {
    delete args;
    return;
  }

  find_value_args fa(_id, ip(), args->key);
  find_value_result fr;

  find_value(&fa, &fr);
  Time after = now();
  args->attempts++;

  // if we found the node, ping it.
  if(args->key == fr.succ.id) {
    ping_args pa(fr.succ.id, fr.succ.ip);
    ping_result pr;
    Time pingbegin = now();
    // assert(_nodeid2kademlia[fr.succ.id]->ip() == fr.succ.ip);
    if(!doRPC(fr.succ.ip, &Kademlia::do_ping, &pa, &pr, timeout(fr.succ.ip)) && alive()) {
      if(collect_stat()) _lookup_dead_node++;
      if(flyweight[fr.succ.id] && !Kademlia::learn_stabilize_only)
        erase(fr.succ.id);
      fr.timeouts++;
    }
    after = now();

    record_lookup_stat(ip(), fr.succ.ip, after - args->starttime, true, true,
		       fr.hops, fr.timeouts, 0);
    if(outcounter++ >= 1000) {
      KDEBUG(0) <<  pingbegin - args->starttime << "ms lookup (" << args->attempts << "a, " << fr.hops << "h, " << fr.rpcs << "r, " << fr.timeouts << "t), " << after - pingbegin << "ms ping, " << after - args->starttime << "ms total." << endl;
      outcounter = 0;
    }
    // cout << pingbegin - args->starttime << endl;

    if(collect_stat()) {
      _good_lookups++;
      _good_attempts += args->attempts;
      _good_total_latency += (after - args->starttime);
      _good_lookup_latency += (pingbegin - args->starttime);
      _good_ping_latency += (after - pingbegin);

      _good_hops += fr.hops;
      _good_timeouts += fr.timeouts;
      _good_rpcs += fr.rpcs;
      _good_hop_latency += fr.latency;
    }
    delete args;
    return;
  }


  //node crashed and some other nodes rejoined into its old master node,
  //deleting the crashed node's entry, this cannot be relied upon to 
  //determine node liveliness
  //bool alive_and_joined = (*_nodeid2kademlia)[args->key]->alive() &&
  //                        (*_nodeid2kademlia)[args->key]->_joined;
  bool alive_and_joined = Network::Instance()->alive(args->ipkey)&&
                          ((Kademlia*)Network::Instance()->getnode(args->ipkey))->_joined;
  IPAddress target_ip = args->ipkey;

  // we're out of time.
  if(now() - args->starttime > Kademlia::max_lookup_time) {
    if(alive_and_joined) {
      if(collect_stat()) _bad_failures++;
      record_lookup_stat(ip(), target_ip, after - args->starttime, false, false, fr.hops, fr.timeouts, 0);
    } else {
      if(alive_and_joined) _ok_failures++;
      record_lookup_stat(ip(), target_ip, after - args->starttime, true, false, fr.hops, fr.timeouts, 0);
    }
    if(collect_stat()) {
      _bad_attempts += args->attempts;
      _bad_lookup_latency += (after - args->starttime);
      _bad_hops += fr.hops;
      _bad_timeouts += fr.timeouts;
      _bad_rpcs += fr.rpcs;
      _bad_hop_latency += fr.latency;
    }

    delete args;
    return;
  }

  // try again in a bit.
  delaycb(100, &Kademlia::lookup_wrapper, args);
}

// }}}
// {{{ Kademlia::do_ping
void
Kademlia::do_ping(ping_args *args, ping_result *result)
{
  // put the caller in the tree, but never ourselves
  // KDEBUG(1) << "Kademlia::do_ping from " << printID(args->id) << endl;
  if(!Kademlia::learn_stabilize_only)
    update_k_bucket(args->id, args->ip);
}

// }}}
// {{{ Kademlia::util for find_value and do_lookup
#define SEND_RPC(x, ARGS, RESULT, HOPS, WHICH_ALPHA) {                                  \
    find_node_args *fa = New find_node_args(_id, ip(), ARGS->key);                      \
    fa->stattype = ARGS->stattype;                                                      \
    find_node_result *fr = New find_node_result;                                        \
    fr->hops = HOPS;                                                                    \
    fr->which_alpha = WHICH_ALPHA;                                                      \
    record_stat(ARGS->stattype, 1, 0);                                                  \
    unsigned rpc = asyncRPC(x.ip, &Kademlia::find_node, fa, fr, timeout(x.ip)); \
    callinfo *ci = New callinfo(x, fa, fr);                                             \
    ci->before = now();                                                                 \
    rpcset->insert(rpc);                                                                \
    outstanding_rpcs->insert(rpc, ci);                                                  \
}

#define CREATE_REAPER(STAT) {                                                           \
      if(!outstanding_rpcs->size() && !is_dead->size() ) {                              \
        delete rpcset;                                                                  \
        delete outstanding_rpcs;                                                        \
        if(Kademlia::death_notification) {                                              \
          for( HashMap<NodeID, vector<IPAddress> *>::iterator i=who_told_me->begin(); i != who_told_me->end(); ++i ) {                                                 \
            vector<IPAddress> *v = i.value();                                           \
            if( v != NULL ) delete v;                                                   \
          }                                                                             \
        }                                                                               \
        delete who_told_me;                                                             \
        delete is_dead;                                                                 \
        return;                                                                         \
      }                                                                                 \
      reap_info *ri = New reap_info();                                                  \
      ri->k = this;                                                                     \
      ri->rpcset = rpcset;                                                              \
      ri->outstanding_rpcs = outstanding_rpcs;                                          \
      ri->who_told_me = who_told_me;                                                    \
      ri->is_dead = is_dead;                                                            \
      ri->stat = STAT;                                                                  \
      ThreadManager::Instance()->create(Kademlia::reap, (void*) ri);                    \
      return;                                                                           \
}
// }}}
// {{{ Kademlia::find_value
void
Kademlia::find_value(find_value_args *fargs, find_value_result *fresult)
{
  //KDEBUG(0) << "Kademlia::find_value: node ip " << fargs->ip << " id " << printID(fargs->id) << " does find_value for " << printID(fargs->key) << endl;
    // assert(alive());
  HashMap<NodeID, bool> asked;
  HashMap<NodeID, unsigned> hops;
  HashMap<unsigned, unsigned> timeouts;
  update_k_bucket(fargs->id, fargs->ip);
  fresult->rid = _id;
  fresult->hops = 0;

  // find alpha successors for this key
  closer::n = fargs->key;
  closerRTT::n = fargs->key;
  set<k_nodeinfo, closerRTT> successors;
  vector<k_nodeinfo*> tmp;
  _root->find_node(fargs->key, &tmp);
  for(unsigned i=0; i<tmp.size(); i++) {
    successors.insert((*tmp[i]));
    hops.insert(tmp[i]->id, 0);
  }

  // we can't do anything but return ourselves
  if(!successors.size()) {
    fresult->succ = _me;
    return;
  }

  // return if we're done already
  fresult->succ = *successors.begin();
  if(fresult->succ.id == fargs->key)
    return;

  // send out the first alpha RPCs
  HashMap<unsigned, callinfo*> *outstanding_rpcs = New HashMap<unsigned, callinfo*>;
  RPCSet *rpcset = New RPCSet;
  {
    unsigned a = 0;
    for(set<k_nodeinfo, closer>::const_iterator i=successors.begin(); i != successors.end() && a < Kademlia::alpha; ++i, ++a) {
      k_nodeinfo ki = *i;
      SEND_RPC(ki, fargs, fresult, hops[ki.id], a);
      fresult->rpcs++;
      asked.insert(ki.id, true);
      successors.erase(ki);
      timeouts.insert(a, 0);
    }
  }

  // data structures for death notifications:
  HashMap<NodeID, vector<IPAddress> * > *who_told_me = 
    New HashMap<NodeID, vector<IPAddress> * >;
  HashMap<NodeID, bool> *is_dead = New HashMap<NodeID, bool>;

  // now send out a new RPC for every single RPC that comes back
  unsigned useless_replies = 0;
  NodeID last_before_merge = ~fargs->key;
  unsigned last_returned_alpha;
  while(true) {
    bool ok;
    unsigned donerpc = rcvRPC(rpcset, ok);
    callinfo *ci = (*outstanding_rpcs)[donerpc];
    outstanding_rpcs->remove(donerpc);
    bool improved = false;
    last_returned_alpha = ci->fr->which_alpha;

    k_nodeinfo ki;
    if(!alive()) {
      stat_type st = ci->fa->stattype;
      delete ci;
      CREATE_REAPER(st); // returns
    }

    // node was dead
    closer::n = fargs->key;
    closerRTT::n = fargs->key;
    if(!ok) {
      if(flyweight[ci->ki.id] && !Kademlia::learn_stabilize_only)
        erase(ci->ki.id);
      if(Kademlia::death_notification) {
	// inform whoever sent me this person that they are dead.
	is_dead->insert( ci->ki.id, true );
      }
      timeouts.insert(last_returned_alpha, timeouts[last_returned_alpha]+1);
      delete ci;
      if(!successors.size() && outstanding_rpcs->size()==0) {
        CREATE_REAPER(ci->fa->stattype); // returns
      }
      goto next_candidate;
    }

    // node was ok
    assert(ci->fr->results.size() <= Kademlia::k_tell);
    record_stat(ci->fa->stattype, ci->fr->results.size(), 0);
    if(!Kademlia::learn_stabilize_only)
      update_k_bucket(ci->ki.id, ci->ki.ip, now() - ci->before);

    // put all the ones better than what we knew so far in successors, provided
    // we haven't asked them already.
    if(successors.size())
      last_before_merge = successors.rbegin()->id;

    for(unsigned i=0; i<ci->fr->results.size(); i++) {
      k_nodeinfo ki = ci->fr->results[i];
      if( Kademlia::death_notification ) {
	if( (*who_told_me)[ki.id] == NULL ) {
	  who_told_me->insert( ki.id, New vector<IPAddress> );
	}
	((*who_told_me)[ki.id])->push_back( ci->ki.ip );
      }
      if(asked.find_pair(ki.id))
        continue;
      successors.insert(ki);
      if(!hops.find_pair(ki.id))
        hops.insert(ki.id, ci->fr->hops+1);
    }

    // cut out elements beyond index k
    while(successors.size() > Kademlia::k)
      successors.erase(*successors.rbegin());

    // found the key
    ki = *successors.begin();
    if(ki.id == fargs->key) {
      fresult->hops = hops[ki.id];
      fresult->succ = ki;
      fresult->timeouts = timeouts[last_returned_alpha];
      stat_type st = ci->fa->stattype;
      delete ci;
      CREATE_REAPER(st); // returns
    }

    // if our standing hasn't improved,
    improved = (distance(successors.begin()->id, fargs->key) < distance(fresult->succ.id, fargs->key));
    useless_replies = improved ? 0 : useless_replies++;

    // if the last alpha RPCs didn't yield anything better, give up.  I don't
    // think this should ever happen.  It means we went down a dead end alpha
    // times.
    //
    // XXX: Thomer completely pulled this out of his ass.
    if(!successors.size() || useless_replies > Kademlia::alpha) {
      stat_type st = ci->fa->stattype;
      delete ci;
      CREATE_REAPER(st); // returns
    }
    delete ci;

next_candidate:
    // assert((unsigned) outstanding_rpcs->size() == Kademlia::alpha - 1);
    // XXX: wrong.  messes up last_alpha_rpc
    while((outstanding_rpcs->size() < (int) Kademlia::alpha) && successors.size()) {
      k_nodeinfo front = *successors.begin();
      if(Kademlia::distance(front.id, fargs->key) < Kademlia::distance(fresult->succ.id, fargs->key))
        fresult->succ = front;

      SEND_RPC(front, fargs, fresult, hops[front.id], timeouts[last_returned_alpha]);

      fresult->rpcs++;
      asked.insert(front.id, true);
      successors.erase(front);
    }
  }

}

// }}}
// {{{ Kademlia::do_lookup
//
// only called for stabilization, so always learn
void
Kademlia::do_lookup(lookup_args *largs, lookup_result *lresult)
{
  //KDEBUG(1) << "Kademlia::do_lookup: node " << printID(largs->id) << " does lookup for " << printID(largs->key) << ", flyweight.size() = " << endl;
  // assert(alive());
  lresult->rid = _id;

  // find successors of this key
  closer::n = largs->key;
  set<k_nodeinfo, closer> successors;
  vector<k_nodeinfo*> tmp;
  _root->find_node(largs->key, &tmp);
  for(unsigned i=0; i<tmp.size(); i++) {
    successors.insert(tmp[i]);
    lresult->results.insert(tmp[i]);
  }

  // unsigned j = 0;
  // NODES_ITER(&lresult->results) {
  //   KDEBUG(0) << "start: lresults[" << j++ << "] = " << printID(i->id) << endl;
  // }

  // we can't do anything but return ourselves
  if(!successors.size()) {
    // KDEBUG(0) << "no successors, exiting" << endl;
    lresult->results.insert(_me);
    return;
  }

  // keep track of worst-of-best
  NodeID worst = lresult->results.rbegin()->id;
  // KDEBUG(0) << "worst = " << printID(worst) << endl;

  // send out the first alpha RPCs
  HashMap<unsigned, callinfo*> *outstanding_rpcs = New HashMap<unsigned, callinfo*>;
  RPCSet *rpcset = New RPCSet;
  {
    unsigned a = 0;
    for(set<k_nodeinfo, closer>::const_iterator i=successors.begin(); i != successors.end() && a < Kademlia::alpha; ++i, ++a) {
      k_nodeinfo ki = *i;
      SEND_RPC(ki, largs, lresult, 0, 0);
      // KDEBUG(0) << "SEND_RPC (initial) to " << printID(ki.id) << endl;
      successors.erase(ki);
    }
  }

  // j = 0;
  // NODES_ITER(&successors) {
  //   KDEBUG(0) << "after initial SEND_RPC successors[" << j++ << "] = " << printID(i->id) << endl;
  // }

  // data structures for death notifications:
  HashMap<NodeID, vector<IPAddress> * > *who_told_me = 
    New HashMap<NodeID, vector<IPAddress> * >;
  HashMap<NodeID, bool> *is_dead = New HashMap<NodeID, bool>;

  // send an RPC back for each incoming reply.
  HashMap<NodeID, bool> replied;
  while(true) {
    bool ok;
    unsigned donerpc = rcvRPC(rpcset, ok);
    callinfo *ci = (*outstanding_rpcs)[donerpc];
    outstanding_rpcs->remove(donerpc);
    replied.insert(ci->ki.id, true);

    if(!alive()) {
      delete ci;
      // KDEBUG(0) << "rcvRPC, not alive, bye" << endl;
      CREATE_REAPER(largs->stattype); // returns
    }

    // node was dead
    closer::n = largs->key;
    if(!ok) {
      // KDEBUG(0) << "!ok" << endl;
      if(flyweight[ci->ki.id] && (!Kademlia::learn_stabilize_only ||
                                   largs->stattype == STAT_STABILIZE ||
                                   largs->stattype == STAT_LOOKUP))
        erase(ci->ki.id);
      if(Kademlia::death_notification) {
	// inform whoever sent me this person that they are dead.
	is_dead->insert( ci->ki.id, true );
      }
      delete ci;

      // no more RPCs to send, but more RPCs to wait for
      if(!successors.size() && outstanding_rpcs->size()) {
        // KDEBUG(0) << "!ok, no more successors, but more outstanding rpcs" << endl;
        continue;
      }

      // no more RPCs to send and no outstanding RPCs.
      if(!successors.size() && !outstanding_rpcs->size()) {
        // KDEBUG(0) << "!ok, no more successors and no outstanding_rpcs" << endl;
        // truncate to right size
        while(lresult->results.size() > Kademlia::k)
          lresult->results.erase(*lresult->results.rbegin());
        // unsigned j = 0;
        // NODES_ITER(&lresult->results) {
        //   KDEBUG(0) << "CREATE_REAPER lresults[" << j++ << "] = " << printID(i->id) << endl;
        // }
        CREATE_REAPER(largs->stattype); // returns
      }

      if(successors.size())
        goto next_candidate;

      // we've handled all the cases, right?
      assert(false);
    }

    // node was ok
    // KDEBUG(0) << "good reply" << endl;
    record_stat(largs->stattype, ci->fr->results.size(), 0);
    if(!Kademlia::learn_stabilize_only || largs->stattype == STAT_STABILIZE ||
                                          largs->stattype == STAT_LOOKUP)
      update_k_bucket(ci->ki.id, ci->ki.ip, now() - ci->before);

    // j = 0;
    // NODES_ITER(&lresult->results) {
    //   KDEBUG(0) << "before merge lresults[" << j++ << "] = " << printID(i->id) << endl;
    // }
    // j = 0;
    // NODES_ITER(&successors) {
    //   KDEBUG(0) << "before merge successors[" << j++ << "] = " << printID(i->id) << endl;
    // }

    // put in successors list if:
    //   - it's better than the worst-of-best we have so far AND
    //   - it's not in our results set already
    closer::n = largs->key;
    for(unsigned i=0; i<ci->fr->results.size(); i++) {
      k_nodeinfo ki = ci->fr->results[i];
      if( Kademlia::death_notification ) {
	if( (*who_told_me)[ki.id] == NULL ) {
	  who_told_me->insert( ki.id, New vector<IPAddress> );
	}
	((*who_told_me)[ki.id])->push_back( ci->ki.ip );
      }
      if(distance(ki.id, largs->key) >= distance(worst, largs->key) ||
         lresult->results.find(ki) != lresult->results.end())
        continue;
      successors.insert(ki);
      lresult->results.insert(ki);
    }
    delete ci;

    // j = 0;
    // NODES_ITER(&lresult->results) {
    //   KDEBUG(0) << "after merge lresults[" << j++ << "] = " << printID(i->id) << endl;
    // }
    // j = 0;
    // NODES_ITER(&successors) {
    //   KDEBUG(0) << "after merge successors[" << j++ << "] = " << printID(i->id) << endl;
    // }

    // we're done if we've had a reply from everyone in the results set.
    if(!successors.size()) {
      bool we_are_done = true;
      NODES_ITER(&lresult->results) {
        if(!replied[i->id]) {
          we_are_done = false;
          break;
        }
      }
      if(we_are_done) {
        // KDEBUG(0) << "returning size = " << lresult->results.size() << endl;
        while(lresult->results.size() > Kademlia::k)
          lresult->results.erase(*lresult->results.rbegin());
        CREATE_REAPER(largs->stattype); // returns
      }
    }

    // last alpha RPCs did not change our view of the world.  send parallel RPCs
    // to all remaining ones.
    if(successors.size() <= (Kademlia::k - Kademlia::alpha)) {
      // KDEBUG(0) << "nothing changed. flushing" << endl;
      NODES_ITER(&successors) {
        k_nodeinfo ki = *i;
        SEND_RPC(ki, largs, lresult, 0, 0);
        successors.erase(ki);
      }
      continue;
    }

    // resize back to Kademlia::k
    while(successors.size() > Kademlia::k)
      successors.erase(*successors.rbegin());
    while(lresult->results.size() > Kademlia::k)
      lresult->results.erase(*lresult->results.rbegin());


    // j = 0;
    // NODES_ITER(&successors) {
    //   KDEBUG(0) << "after truncate successors[" << j++ << "] = " << printID(i->id) << endl;
    // }
    // j = 0;
    // NODES_ITER(&lresult->results) {
    //   KDEBUG(0) << "after truncate lresults[" << j++ << "] = " << printID(i->id) << endl;
    // }

    // need to update our worst-of-best?
    if(successors.size() &&
      (Kademlia::distance(lresult->results.rbegin()->id, largs->key) <
       Kademlia::distance(worst, largs->key)))
    {
      worst = lresult->results.rbegin()->id;
    }

next_candidate:
    k_nodeinfo front = *successors.begin();
    SEND_RPC(front, largs, lresult, 0, 0);
    successors.erase(front);
    // j = 0;
    // NODES_ITER(&successors) {
    //   KDEBUG(0) << "after next_candidate successors[" << j++ << "] = " << printID(i->id) << endl;
    // }
  }
}
// }}}
// {{{ Kademlia::find_node
// Kademlia's FIND_NODE.  Returns the best k from its own k-buckets
void
Kademlia::find_node(find_node_args *largs, find_node_result *lresult)
{
  // assert(alive());
  if(!Kademlia::learn_stabilize_only || largs->stattype == STAT_STABILIZE ||
                                        largs->stattype == STAT_LOOKUP)
    update_k_bucket(largs->id, largs->ip);

  lresult->rid = _id;

  // deal with the empty case
  if(!flyweight.size()) {
    // assert(p);
    KDEBUG(2) << "find_node: key " << printID(largs->key) << " tree is empty. returning myself" << endl;
    lresult->results.push_back(_me);
    return;
  }

  vector<k_nodeinfo*> tmpset;
  _root->find_node(largs->key, &tmpset);
  //KDEBUG(2) << "find_node: key "<< printID(largs->key) << " from ip " << largs->ip << " returning: ip " <<tmpset[0]->ip << " id " << printID(tmpset[0]->id) << endl;
  for(unsigned i=0; i<tmpset.size(); i++)
    lresult->results.push_back(tmpset[i]);
}


// }}}
// {{{ Kademlia::reschedule_stabilizer
void
Kademlia::reschedule_stabilizer(void *x)
{
  // KDEBUG(1) << "Kademlia::reschedule_stabilizer" << endl;
  if(!alive()) {
    // KDEBUG(2) << "Kademlia::reschedule_stabilizer returning because I'm dead." << endl;
    return;
  }
  stabilize();

  if(stabilize_timer)
    delaycb(stabilize_timer, &Kademlia::reschedule_stabilizer, (void *) 0);
}

// }}}
// {{{ Kademlia::stabilize
void
Kademlia::stabilize()
{
  // KDEBUG(0) << "Kademlia::stabilize" << endl;
  // assert(alive());

  if(Kademlia::docheckrep) {
    k_check check;
    _root->traverse(&check, this);
  }

  // stabilize
  k_stabilizer stab;
  _root->traverse(&stab, this);

  // tell the observers.  maybe they're happy now.
  notifyObservers();
}

// }}}
// {{{ Kademlia::stabilized
bool
Kademlia::stabilized(vector<NodeID> *lid)
{
  // remove ourselves from lid
  vector<NodeID> copylid;
  for(vector<NodeID>::const_iterator i = lid->begin(); i != lid->end(); ++i)
    if(*i != id())
      copylid.push_back(*i);

  k_stabilized stab(&copylid);
  _root->traverse(&stab, this);
  return stab.stabilized();
}

// }}}
// {{{ Kademlia::getbit
//
// Returns the i-th bit in n.  0 is the most significant bit.
//
inline
unsigned
Kademlia::getbit(NodeID n, unsigned i)
{
  return (n & (((NodeID) 1)<<((sizeof(NodeID)*8)-i-1))) ? 1 : 0;
}

// }}}
// {{{ Kademlia::touch
// pre: id is in the flyweight and id is in the tree
// post: lastts for id is updated, update propagated to k-bucket.
void
Kademlia::touch(NodeID id)
{
  // KDEBUG(1) << "Kademlia::touch " << Kademlia::printID(id) << endl;

  // assert(alive());
  // assert(id);
  // assert(flyweight.find(id, 0));
  _root->insert(id, true);
}

// }}}
// {{{ Kademlia::insert
// pre: id and ip are valid, id is not yet in this flyweight
// post: id->ip mapping in flyweight, and k-bucket
//
void
Kademlia::insert(NodeID id, IPAddress ip, Time RTT, char timeouts, bool init_state)
{
  // KDEBUG(1) << "Kademlia::insert " << Kademlia::printID(id) << ", ip = " << ip << endl;
  static unsigned counter = 0;

  // assert(alive());
  // assert(id && ip);
  // assert(!flyweight.find(id, 0));

  k_nodeinfo *ni = Kademlia::pool->pop(id, ip, RTT, timeouts);
  assert(ni);
  if(init_state)
    ni->lastts = counter++;
  flyweight.insert(id, ni);
  _root->insert(id, false, init_state);
}
// }}}
// {{{ Kademlia::erase
void
Kademlia::erase(NodeID id)
{
  // KDEBUG(1) << "Kademlia::erase " << Kademlia::printID(id) << endl;

  // assert(flyweight.find(id, 0));
  // KDEBUG(2) << "Kademlia::erase deleting id = " << printID(id) << ", ip = " << flyweight[id]->ip << endl;
  k_nodeinfo *ki = flyweight[id];

  // 5, taken from Kademlia paper
  if(++ki->timeouts >= Kademlia::erase_count) {
    _root->erase(id);
    flyweight.remove(id);
    Kademlia::pool->push(ki);
    return;
  }
}
// }}}
// {{{ Kademlia::erase
void
Kademlia::do_erase(erase_args *a, erase_result *r)
{
  for( unsigned i = 0; i < a->ids->size(); i++ ) {
    NodeID n = (*(a->ids))[i];
    if( flyweight.find(n, 0) ) {
      erase(n);
    }
  }
}
// }}}
// {{{ Kademlia::update_k_bucket
inline void
Kademlia::update_k_bucket(NodeID id, IPAddress ip, Time RTT)
{
  // KDEBUG(1) << "Kademlia::update_k_bucket" << endl;

  // update k-bucket
  if(id == _id)
    return;

  if(!flyweight[id]) {
    // KDEBUG(2) << "Kademlia::update_k_bucket says " << printID(id) << " doesn't exist yet" << endl;
    insert(id, ip, RTT);
  } else if(RTT) {
    flyweight[id]->RTT = RTT;
  }
  touch(id);
}
// }}}
// {{{ Kademlia::common_prefix
unsigned
Kademlia::common_prefix(Kademlia::NodeID k1, Kademlia::NodeID k2)
{
  unsigned size = 0;
  for(unsigned i=0; i<idsize; i++)
    if(getbit(k1, i) == getbit(k2, i))
      size++;
    else
      break;
  return size;
}

// }}}
// {{{ Kademlia::record_stat
void
Kademlia::record_stat(stat_type type, uint num_ids, uint num_else )
{
  record_bw_stat(type, num_ids, num_else);
  if(collect_stat())
    _rpc_bytes += 20 + num_ids * 4 + num_else; // paper says 40 bytes per node entry
}
// }}}
// {{{ Kademlia::distance
Kademlia::NodeID
Kademlia::distance(Kademlia::NodeID from, Kademlia::NodeID to)
{
  return from ^ to;
}

// }}}
// {{{ Kademlia::printbits
string
Kademlia::printbits(NodeID id)
{
  char buf[128];

  unsigned j=0;
  for(int i=idsize-1; i>=0; i--)
    sprintf(&(buf[j++]), "%llu", (id >> i) & 0x1);
  sprintf(&(buf[j]), ":%llx", id);

  return string(buf);
}

// }}}
// {{{ Kademlia::printID
string
Kademlia::printID(NodeID id)
{
  char buf[128];
  sprintf(buf, "%llx", id);
  return string(buf);
}

// }}}
// {{{ Kademlia::reap
// reaps outstanding RPCs, but snatches out the info that's contained in it.
// I.e., we update k-buckets.
void
Kademlia::reap(void *r)
{
  reap_info *ri = (reap_info *) r;
  // NodeID _id = ri->k->id();

  // cout << "Kademlia::reap" << endl;
  assert((int) ri->rpcset->size() == ri->outstanding_rpcs->size());

  while(ri->outstanding_rpcs->size()) {
    bool ok;
    unsigned donerpc = ri->k->rcvRPC(ri->rpcset, ok);
    callinfo *ci = ri->outstanding_rpcs->find(donerpc);

    // cout << "Kademlia::reap ok = " << ok << ", ki = " << endl;
    if(ok) {
      _ok_by_reaper++;
      if(ri->k->alive() && (!Kademlia::learn_stabilize_only || ci->fa->stattype == STAT_STABILIZE ||
                                                               ci->fa->stattype == STAT_LOOKUP))
        ri->k->update_k_bucket(ci->ki.id, ci->ki.ip);
      ri->k->record_stat(ri->stat, ci->fr->results.size(), 0);

    } else if(ri->k->flyweight[ci->ki.id]) {
      _timeouts_by_reaper++;
      if(ri->k->alive() && (!Kademlia::learn_stabilize_only || ci->fa->stattype == STAT_STABILIZE ||
                                                               ci->fa->stattype == STAT_LOOKUP))
        ri->k->erase(ci->ki.id);
    }

    // Although it slightly affects success rate, it's only fair
    // to only count nodes as dead if you're alive when you receive
    // their response.
    if( !ok && Kademlia::death_notification && ri->k->alive() ) {
      ri->is_dead->insert( ci->ki.id, true );
    }

    ri->outstanding_rpcs->remove(donerpc);
    delete ci;
  }

  // make vectors of dead-nodes sorted by the neighbor that told you.
  if( Kademlia::death_notification ) {

    HashMap<IPAddress, vector<NodeID> *> bad_info;
    HashMap<unsigned, erase_args*> deathmap;
    RPCSet deathrpcset;

    for( HashMap<NodeID, bool>::iterator i=ri->is_dead->begin(); 
	 i != ri->is_dead->end(); ++i ) {
      NodeID dead_node = i.key();
      vector<IPAddress> *v = (*(ri->who_told_me))[dead_node];
      if( v != NULL ) {
	for( unsigned j = 0; j < v->size(); j++ ) {
	  if( bad_info[(*v)[j]] == NULL ) {
	    bad_info.insert( (*v)[j], New vector<NodeID> );
	  }
	  (bad_info[(*v)[j]])->push_back( dead_node );	
	}
      }
    }
    
    // cleanup who_told_me state
    for( HashMap<NodeID, vector<IPAddress> *>::iterator i=
	   ri->who_told_me->begin(); 
	 i != ri->who_told_me->end(); ++i ) {
      
      vector<IPAddress> *v = i.value();
      if( v != NULL ) delete v;
      
    }
    
    // notify neighbors about all the deaths
    for( HashMap<IPAddress,vector<NodeID> *>::iterator i=bad_info.begin(); 
	 i != bad_info.end(); ++i ) {
      
      IPAddress informant = i.key();
      vector<NodeID> *v = i.value();

      if( informant == ri->k->ip() ) {
	delete v;
	continue;
      }
      
      erase_args *ea = New erase_args(v);
      ri->k->record_stat(STAT_ERASE, v->size(), 0);
      
      unsigned rpc = ri->k->asyncRPC(informant, &Kademlia::do_erase, ea, 
				     (erase_result *) NULL, 
				     ri->k->timeout(informant));
      deathrpcset.insert(rpc);
      deathmap.insert(rpc, ea);

    }

    // clean up any death notification state as well
    while(deathmap.size()) {
      bool ok;
      unsigned donerpc = ri->k->rcvRPC(&deathrpcset, ok);
      erase_args *ea = deathmap.find(donerpc);
      // NOTE: there's a conscious decision here to not record_stat 
      // the response.  The reason being that I don't care what the 
      // response is, and in real life I wouldn't wait for it.  I only
      // have to do so here to free the memory.
      delete ea->ids;
      delete ea;
      deathmap.remove(donerpc);
    }
    
  }

  // ri->k->_riset.remove(ri);
  delete ri;

  // cout << "reaper done" << endl;
  taskexit(0);
}
// }}}

// {{{ k_nodeinfo_cmp
inline int
k_nodeinfo_cmp(const void *k1, const void *k2)
{
  k_nodeinfo *kx1, *kx2;
  kx1 = *((k_nodeinfo**) k1);
  kx2 = *((k_nodeinfo**) k2);
  return kx1->lastts - kx2->lastts;
}
// }}}
// {{{ k_nodes::k_nodes
k_nodes::k_nodes(k_bucket *parent) : _parent(parent)
{
  _map.clear();
  _redo = REBUILD;
  _nodes = 0;
}
// }}}
// {{{ k_nodes::~k_nodes
k_nodes::~k_nodes()
{
  if(_nodes)
    delete _nodes;
  _map.clear();
}
// }}}
// {{{ k_nodes::insert
/*
 * pre: n is in the flyweight.
 * post: if n was contained in the set, its lastts is updated and
 *              its position in the set is updated.
 *       if it wasn't in the set, it is inserted.
 */
void
k_nodes::insert(Kademlia::NodeID n, bool touch = false)
{
//  Kademlia::NodeID _id = _parent->kademlia()->id();
  // KDEBUG(1) << "k_nodes::insert " << Kademlia::printID(n) << endl;
  checkrep();

  // assert(n);
  // assert(_parent->kademlia()->flyweight.find(n, 0));
  k_nodeinfo *ninfo = _parent->kademlia()->flyweight[n];
  // assert(ninfo);
  ninfo->checkrep();

  // if already in set, and we're not going to touch the timestamp, we're done.
  bool contained = contains(n);
  if(contained && !touch) {
    checkrep();
    return;
  }

  if(contained && touch && (ninfo->lastts != now())) {
    ninfo->lastts = now();
    ninfo->timeouts = 0;
    _redo = _redo ? _redo : RESORT;
    checkrep();
    return;
  }

  // insert into set
  _map.insert(ninfo, true);
  // assert(_parent->kademlia()->flyweight.find_pair(ninfo->id)->value == ninfo);
  _redo = REBUILD;

  checkrep();
}
// }}}
// {{{ k_nodes::clear
void
k_nodes::clear()
{
  checkrep();

  _map.clear();
  if(_nodes)
    delete _nodes;
  _nodes = 0;
  _redo = REBUILD;

  checkrep();
}
// }}}
// {{{ k_nodes::erase
/*
 * pre: n is in the flyweight. n is contained in this k_nodes.
 * post: n is erased from this k_nodes
 */
void
k_nodes::erase(Kademlia::NodeID n)
{
  checkrep();

  // assert(n);
  // assert(_parent->kademlia()->flyweight.find(n, 0));
  k_nodeinfo* ninfo = _parent->kademlia()->flyweight[n];
  ninfo->checkrep();
  // assert(_map.find(ninfo, false) || _parent->replacement_cache->contains(ninfo->id));

  _map.remove(ninfo);
  _redo = REBUILD;

  checkrep();
}
// }}}
// {{{ k_nodes::contains
bool
k_nodes::contains(Kademlia::NodeID n)
{
  checkrep();
  k_nodeinfo *ki = _parent->kademlia()->flyweight[n];
  // assert(ki);
  return _map.find(ki, false);
}
// }}}
// {{{ k_nodes::rebuild
void
k_nodes::rebuild()
{
  static unsigned _rebuild = 0;
  // Kademlia::NodeID _id = _parent->kademlia()->id();
  // KDEBUG(2) << "k_nodes::rebuild" << endl;

  if(_nodes)
    delete _nodes;
  _nodes = New k_nodeinfo*[_map.size()];

  // fill with entries from _map.
  unsigned j = 0;
  for(HashMap<k_nodeinfo*, bool>::iterator i = _map.begin(); i; i++)
    _nodes[j++] = i.key();
  _redo = RESORT;

  // once in a while throw entries beyond index k out of the set into the
  // replacement cache.  (don't do this if this *is* the replacement cache;
  // k_nodes also implements replacement cache).
  if(_rebuild++ < 100)
    return;

  bool i_am_replacement_cache = (this == _parent->replacement_cache);
  unsigned oldsize = _map.size();
  for(unsigned i=Kademlia::k; i<oldsize; i++) {
    k_nodeinfo *ki = _nodes[i];
    _map.remove(ki);
    if(i_am_replacement_cache || Kademlia::use_replacement_cache == Kademlia::DISABLED) {
      _parent->kademlia()->flyweight.remove(ki->id);
      Kademlia::pool->push(ki);
    } else
      _parent->replacement_cache->insert(ki->id);
  }
  _rebuild = NOTHING;
  rebuild(); // after cleanup, rebuild _nodes again.
}
// }}}
// {{{ k_nodes::get
inline k_nodeinfo*
k_nodes::get(unsigned i)
{
  // assert(i >= 0 && (int) i < _map.size());
  // Kademlia::NodeID _id = _parent->kademlia()->id();

  if(_redo == NOTHING)
    return _nodes[i];

  if(_redo == REBUILD)
    rebuild();

  if(_redo == RESORT) {
    if(_map.size() >= 2)
      qsort(_nodes, _map.size(), sizeof(k_nodeinfo*), &k_nodeinfo_cmp);
    _redo = NOTHING;
  }

  return _nodes[i];
}
// }}}
// {{{ k_nodes::checkrep
inline void
k_nodes::checkrep()
{
  if(!Kademlia::docheckrep)
    return;

  // Kademlia::NodeID _id = _parent->kademlia()->id();

  assert(_parent);
  assert(_parent->nodes == this || _parent->replacement_cache == this);
  assert(_parent->kademlia());
  assert(_parent->kademlia()->id());

  // own ID should never be in flyweight
  assert(!_parent->kademlia()->flyweight.find(_parent->kademlia()->id(), 0));

  if(!size())
    return;

  // all nodes are in flyweight
  // all nodes are unique
  // if we're replacement_cache, entry doesn't exist in nodes and vice versa
  k_nodes *other = _parent->nodes == this ? _parent->replacement_cache : this;
  HashMap<Kademlia::NodeID, bool> haveseen;
  for(unsigned i=0; i<size(); i++) {
    assert(_parent->kademlia()->flyweight.find(get(i)->id, 0));
    assert(_parent->kademlia()->flyweight.find_pair(get(i)->id)->value == get(i));
    assert(!haveseen.find(get(i)->id, false));
    assert(!other->contains(get(i)->id));
    haveseen.insert(get(i)->id, true);
  }

  if(size() == 1) {
    assert(get(0) == last());
    assert(get(0)->id == last()->id);
    return;
  }

  Time prev = get(0)->lastts;
  for(unsigned i=1; i<size(); i++) {
    assert(prev <= get(i)->lastts);
    prev = get(i)->lastts;
  }
}
// }}}

// {{{ k_nodeinfo::k_nodeinfo
k_nodeinfo::k_nodeinfo(NodeID id, IPAddress ip, Time RTT) : id(id), ip(ip), RTT(RTT)
{
  lastts = 0;
  checkrep();
}
// }}}
// {{{ k_nodeinfo::k_nodeinfo
k_nodeinfo::k_nodeinfo(k_nodeinfo *k) :
  id(k->id), ip(k->ip), lastts(k->lastts), timeouts(timeouts), RTT(RTT)
{
  checkrep();
}
// }}}
// {{{ k_nodeinfo::checkrep
void
k_nodeinfo::checkrep() const
{
  if(!Kademlia::docheckrep)
    return;

  assert(id);
  assert(ip);
}
// }}}

// {{{ k_bucket::k_bucket
k_bucket::k_bucket(k_bucket *parent, Kademlia *k) : parent(parent), _kademlia(k)
{
  child[0] = child[1] = 0;
  nodes = 0;
  replacement_cache = 0;
  leaf = false;
}
// }}}
// {{{ k_bucket::~k_bucket
k_bucket::~k_bucket()
{
  if(leaf) {
    delete nodes;
    delete replacement_cache;
    return;
  }

  delete child[0];
  delete child[1];
}
// }}}
// {{{ k_bucket::traverse
void
k_bucket::traverse(k_traverser *traverser, Kademlia *k, string prefix, unsigned depth, unsigned leftright)
{
//  Kademlia::NodeID _id = kademlia()->id();
  // KDEBUG(1) << "k_bucket::traverser for " << traverser->type() << ", prefix = " << prefix << endl;
  checkrep();

  if(!leaf) {
    if(!k->alive() || !child[0])
      return;
    child[0]->traverse(traverser, k, prefix + "0", depth+1, 0);
    if(!k->alive() || !child[1] || leaf)
      return;
    child[1]->traverse(traverser, k, prefix + "1", depth+1, 1);
    if(!k->alive())
      return;
    checkrep();
    return;
  }


  if(!k->alive())
    return;

  // we're a leaf
  traverser->execute(this, prefix, depth, leftright);

  checkrep();
}
// }}}
// {{{ k_bucket::find_node
void
k_bucket::find_node(Kademlia::NodeID key, vector<k_nodeinfo*> *v,
    unsigned nhits, unsigned depth)
{
  checkrep();

//  // Kademlia::NodeID _id = kademlia()->id();
  Kademlia::closer::n = key;

  // recurse deeper in the right direction if we can
  if(!leaf) {
    unsigned leftmostbit = Kademlia::getbit(key, depth);
    if(v->size() < nhits)
      child[leftmostbit]->find_node(key, v, nhits, depth+1);
    if(v->size() < nhits)
      child[leftmostbit^1]->find_node(key, v, nhits, depth+1);
    checkrep();
    return;
  }

  // collect stuff from the k-bucket itself
  for(unsigned i=0; i<nodes->size(); i++) {
    v->push_back(nodes->get(i));
    if(v->size() >= nhits) {
      checkrep();
      return;
    }
  }

  if(Kademlia::use_replacement_cache != Kademlia::FULL) {
    checkrep();
    return;
  }

  // NB: optimization.
  // collect stuff from replacement cache.
  for(int i = replacement_cache->size()-1; i >= 0; i--) {
    v->push_back(replacement_cache->get(i));
    if(v->size() >= nhits) {
      checkrep();
      return;
    }
  }

  checkrep();
}
// }}}
// {{{ k_bucket::insert
// pre: id is in flyweight, its timestamp (lastts) has been set.
// post: if id was already in k-bucket, its position is updated.
//       if id was not in k-bucket :
//              if k-bucket has space, id is added
//              if k-bucket is full, id is put in replacement cache
void
k_bucket::insert(Kademlia::NodeID id, bool touch, bool init_state, string prefix, unsigned depth)
{
  checkrep();
  // assert(kademlia()->flyweight.find(id, 0));

  // Kademlia::NodeID _id = kademlia()->id();
  // KDEBUG(1) << "k_bucket::insert " << Kademlia::printbits(id) << ", prefix = " << prefix << endl;

  // recurse deeper if we can
  if(!leaf) {
    unsigned leftmostbit = Kademlia::getbit(id, depth);
    // KDEBUG(1) << "k_bucket::insert heading towards " << leftmostbit << endl;
    child[leftmostbit]->insert(id, touch, init_state, prefix + (leftmostbit ? "1" : "0"), depth+1);
    checkrep();
    return;
  }

  // if not in k-bucket yet, or there's more space: insert it.
  if(nodes->contains(id) || !nodes->full()) {
    nodes->insert(id, touch);
    nodes->checkrep();
    return;
  }

  if(Kademlia::use_replacement_cache == Kademlia::DISABLED) {
    checkrep();
    return;
  }

  // we're full.  put in replacement_cache.
  replacement_cache->insert(id, touch);
  replacement_cache->checkrep();

  checkrep();
}
// }}}
// {{{ k_bucket::erase
void
k_bucket::erase(Kademlia::NodeID id, string prefix, unsigned depth)
{
  checkrep();

  // recurse deeper
  if(!leaf) {
    unsigned leftmostbit = Kademlia::getbit(id, depth);
    child[leftmostbit]->erase(id, prefix + (leftmostbit ? "1" : "0"), depth+1);
    checkrep();
    return;
  }

  // if not in k-bucket itself, it must be in replacement_cache.  remove from
  // there.
  if(!nodes->contains(id)) {
    k_nodeinfo *ki = kademlia()->flyweight[id];
    if(replacement_cache->contains(ki->id))
      replacement_cache->erase(ki->id);
    checkrep();
    return;
  }

  // remove from k-bucket
  nodes->erase(id);

  // get latest from replacement_cache to nodes
  if(Kademlia::use_replacement_cache == Kademlia::DISABLED || !replacement_cache->size()) {
    checkrep();
    return;
  }

  k_nodeinfo *ki = replacement_cache->last(); // youngest
  // assert(ki);
  // assert(ki->id);
  // assert(kademlia()->flyweight.find(ki->id, 0));
  nodes->insert(ki->id, false);
  replacement_cache->erase(ki->id);

  checkrep();
}
// }}}
// {{{ k_bucket::collapse
// transform back from node to leaf
void
k_bucket::collapse()
{
  // Kademlia::NodeID _id = kademlia()->id();
  // KDEBUG(2) << "k_bucket::collapse" << endl;
  checkrep();

  if(!leaf) {
    child[0]->collapse();
    child[1]->collapse();
  } else {
    nodes->clear();
    replacement_cache->clear();
  }

  checkrep();
}
// }}}
// {{{ k_bucket::checkrep
void
k_bucket::checkrep()
{
  if(!Kademlia::docheckrep)
    return;

  assert(_kademlia);

  if(!leaf) {
    assert(child[0] && child[1]);
    return;
  }


  // checkreps for leaf
  assert(nodes);
  assert(replacement_cache);
  nodes->checkrep();
  replacement_cache->checkrep();
}
// }}}

// {{{ k_stabilizer::execute
void
k_stabilizer::execute(k_bucket *k, string prefix, unsigned depth, unsigned leftright)
{
  // for // KDEBUG purposes
  Kademlia *mykademlia = k->kademlia();
  Kademlia::NodeID _id = mykademlia->id();

  if(!k->leaf || !mykademlia->alive())
    return;

  // return if any entry in this k-bucket is fresh
  if(!Kademlia::force_stabilization) {
    for(unsigned i = 0; i < k->nodes->size(); i++) {
      // XXX: this is the sign of bad shit going on.  get out.
      // it doesn't really, really matter that we're not completing this round of
      // stabilization.  better than crashing the simulator, anyway.
      k_nodeinfo *ki = k->nodes->get(i);
      if(!mykademlia->flyweight[ki->id])
        return;

      if(now() - ki->lastts < Kademlia::refresh_rate)
        return;
    }
  }

  // Oh.  stuff in this k-bucket is old, or we don't know anything. Lookup a
  // random key in this range.
  Kademlia::NodeID mask = 0L;
  for(unsigned i=0; i<Kademlia::idsize; i++) {
    unsigned bit = 1;
    if(i > depth)
      bit = (unsigned) random() & 0x1;
    mask |= (((Kademlia::NodeID) bit) << (Kademlia::idsize-i));
  }

  Kademlia::NodeID random_key = _id & mask;
  // KDEBUG(2) << "k_stabilizer: prefix = " << prefix << ", mask = " << Kademlia::printbits(mask) << ", random_key = " << Kademlia::printbits(random_key) << endl;

  // lookup the random key and update this k-bucket with what we learn
  Kademlia::lookup_args la(mykademlia->id(), mykademlia->ip(), random_key);
  la.stattype = Kademlia::STAT_STABILIZE;
  Kademlia::lookup_result lr;

  mykademlia->do_lookup(&la, &lr);
  if(!mykademlia->alive())
    return;

  // update our k-buckets
  NODES_ITER(&lr.results) {
    mykademlia->update_k_bucket(i->id, i->ip);
  }
}
// }}}
// {{{ k_stabilized::execute
void
k_stabilized::execute(k_bucket *k, string prefix, unsigned depth, unsigned leftright)
{
  // for // KDEBUG purposes
  Kademlia::NodeID _id = k->kademlia()->id();

  // if we know about nodes in this part of the ID space, great.
  if(!k->nodes->empty())
    return;

  // KDEBUG(2) << "stabilized: " << prefix << " not present, depth = " << depth << ", prefix = " << prefix << ", leftright = " << leftright << endl;

  //
  // Node claims there is no node to satisfy this entry in the finger table.
  // Check whether that is true.
  //
  Kademlia::NodeID upper = 0, lower = 0;
  for(unsigned i=0; i<Kademlia::idsize; i++) {
    if(i < (depth-1)) {
      unsigned bit = Kademlia::getbit(_id, i);
      lower |= (((Kademlia::NodeID) bit) << Kademlia::idsize-i-1);
      upper |= (((Kademlia::NodeID) bit) << Kademlia::idsize-i-1);
    } else if(i == (depth-1) && leftright) {
      lower |= (((Kademlia::NodeID) leftright) << Kademlia::idsize-i-1);
      upper |= (((Kademlia::NodeID) leftright) << Kademlia::idsize-i-1);
    } else if(i > (depth-1))
      upper |= (((Kademlia::NodeID) 1) << Kademlia::idsize-i-1);
  }

  // KDEBUG(2) << "stabilized: lower = " << Kademlia::printID(lower) << endl;
  // KDEBUG(2) << "stabilized: upper = " << Kademlia::printID(upper) << endl;

  // yields the node with smallest id greater than lower
  vector<Kademlia::NodeID>::const_iterator it = upper_bound(_v->begin(), _v->end(), lower);

  // check that this is smaller than upper.  if so, then this node would
  // qualify for this entry in the finger table, so the node that says there
  // is no such is WRONG.
  if(it != _v->end() && *it <= upper) {
    // KDEBUG(2) << "stabilized: prefix " << prefix << " on depth " << depth << " is invalid, but " << Kademlia::printID(*it) << " matches " << endl;
    _stabilized = false;
  }
  // // assert(false);
}
// }}}
// {{{ k_finder::execute
void
k_finder::execute(k_bucket *k, string prefix, unsigned depth, unsigned leftright)
{
  if(!k->leaf)
    return;

  k->checkrep();
  for(unsigned i = 0; i < k->nodes->size(); i++) {
    if(k->nodes->get(i)->id == _n)
      _found++;
  }

  for(int i = k->replacement_cache->size()-1; i >= 0; i--)
    if(k->replacement_cache->get(i)->id == _n)
      _found++;
}
// }}}
// {{{ k_dumper::execute
void
k_dumper::execute(k_bucket *k, string prefix, unsigned depth, unsigned leftright)
{
  string spaces = "";
  for(unsigned i=0; i<depth; i++)
    spaces += "  ";
  cout << spaces << "prefix: " << prefix << ", depth " << depth << endl;
  for(unsigned i = 0; i < k->nodes->size(); i++) {
    k_nodeinfo *ki = k->nodes->get(i);

    cout << spaces << "  " << Kademlia::printbits(ki->id) << ", lastts = " << ki->lastts << endl;
  }

  cout << spaces << "prefix: " << prefix << ", depth " << depth << ", replacement cache: " << endl;
  for(int i = k->replacement_cache->size()-1; i >= 0; i--) {
    cout << spaces << "  " << Kademlia::printID(k->replacement_cache->get(i)->id) << ", lastts = " << k->replacement_cache->get(i)->lastts << endl;
  }
}
// }}}
// {{{ k_check::execute
void
k_check::execute(k_bucket *k, string prefix, unsigned depth, unsigned leftright)
{
  if(!k->leaf)
    return;

  k->checkrep();

  const set<Node*> *l = Network::Instance()->getallnodes();

  // go through all pointers in node
  for(unsigned i = 0; i < k->nodes->size(); i++) {
    // k_nodeinfo *ki = k->nodes->get(i);
    for(set<Node*>::iterator pos = l->begin(); pos != l->end(); ++pos) {
      Kademlia *kad = (Kademlia*) *pos;
      if(kad->id() == k->kademlia()->id())
        continue;
      /*
      for(HashMap<Kademlia::NodeID, k_nodeinfo*>::const_iterator j = kad->flyweight.begin(); j; j++)
          // assert(j.value() != ki);
      */
    }
  }

  for(int i = k->replacement_cache->size()-1; i >= 0; i--) {
    for(set<Node*>::iterator pos = l->begin(); pos != l->end(); ++pos) {
      Kademlia *kad = (Kademlia*) *pos;
      if(kad->id() == k->kademlia()->id())
        continue;
      /*
      for(HashMap<Kademlia::NodeID, k_nodeinfo*>::const_iterator j = kad->flyweight.begin(); j; j++)
          // assert(j.value() != *i);
      */
    }
  }
}
// }}}

#include "p2psim/bighashmap.cc"