Logo Search packages:      
Sourcecode: scamper version File versions  Download package

scamper_addresslist.c

/*
 * scamper_addresslist
 *
 * $Id: scamper_addresslist.c,v 1.115.2.1 2007/12/03 09:09:59 mjl Exp $
 *
 * this code deals with storing trace objects with their associated lists.
 *
 * Copyright (C) 2004-2007 The University of Waikato
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, version 2.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 * 
 */

#include <sys/param.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/stat.h>
#include <netinet/in.h>
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>

#if defined(__APPLE__)
#include <stdint.h>
#endif

#include <assert.h>

#if defined(DMALLOC)
#include <dmalloc.h>
#endif

#include "scamper.h"
#include "scamper_addr.h"
#include "scamper_list.h"
#include "scamper_tlv.h"
#include "scamper_trace.h"
#include "scamper_debug.h"
#include "scamper_file.h"
#include "scamper_outfiles.h"
#include "scamper_target.h"
#include "scamper_task.h"
#include "scamper_addresslist.h"
#include "scamper_fds.h"
#include "scamper_linepoll.h"
#include "scamper_privsep.h"
#include "scamper_addr.h"
#include "scamper_do_trace.h"
#include "scamper_ping.h"
#include "scamper_do_ping.h"
#include "scamper_cyclemon.h"
#include "utils.h"
#include "mjl_list.h"
#include "mjl_splaytree.h"

typedef void *(*alloctask_t)(void *data, scamper_list_t *, scamper_cycle_t *);
typedef void *(*freedata_t)(void *data);

/*
 * command
 *
 * this structure is used to hold information for each command found in
 * a source's command list.
 */
typedef struct command
{
  /* the type of command */
  uint8_t             type;

#define command_probe_addr      command_un.probe.addr
#define command_probe_cyclemon  command_un.probe.cyclemon
#define command_probe_data      command_un.probe.data
#define command_probe_alloctask command_un.probe.alloctask
#define command_probe_freedata  command_un.probe.freedata

  union
  {
    /* hold the parameters for a probe command */
    struct command_probe_s
    {
      /* target address */
      scamper_addr_t     *addr;

      /* cycle */
      scamper_cyclemon_t *cyclemon;

      /* data parameter - a preinitialised structure with all parameters set */
      void               *data;

      /* function to allocate a scamper task using the data parameter */
      alloctask_t         alloctask;

      /* function to free the data pointed to above */
      freedata_t          freedata;
    } probe;

#define command_cycle_cycle    command_un.cycle.cycle

    /* hold the parameters for a cycle command */
    struct command_cycle_s
    {
      /* the new cycle */
      scamper_cycle_t    *cycle;
    } cycle;
  } command_un;

} command_t;

#define COMMAND_PROBE 0x00
#define COMMAND_CYCLE 0x01

/*
 * on_hold
 *
 * structure to keep details on the command on hold.
 */
typedef struct on_hold
{
  /* pointer to the task that has the lock on the address */
  scamper_task_t   *task;

  /* pointer to the source list that wants the address to be probed */
  scamper_source_t *source;

  /* pointer to the command that is waiting on the task to complete */
  command_t        *command;

  /* pointer to the node held in the task's on-hold list */
  void             *task_cookie;

  /* pointer to the node held in the source's on-hold list */
  dlist_node_t     *node;
} on_hold_t;

/*
 * alf
 *
 * structure to keep state when reading an file of commands.
 */
typedef struct alf
{
  /* back pointer to the source struct that receives each command */
  scamper_source_t   *source;

  /* the file descriptor that scamper is monitoring */
  scamper_fd_t       *fd;

  /* structure to break up a chunk of an ascii file into lines */
  scamper_linepoll_t *lp;
} alf_t;

/*
 * source_file
 *
 * structure to keep state on a source_file.  this structure is used when
 * a source is defined to cycle over a file.  this sturcture is not used when
 * a source is adhoc.
 */
typedef struct source_file
{
  /* the name of the file */
  char            *filename;

  /* a count of the number of cycles left to take through the list */
  int              cycles;

  /* if set, the list should be reloaded at the next cycle */
  int              reload;

  /* if set, the list should be refreshed each cycle if it has changed */
  int              autoreload;

  /* the modification time of the file when it was last read */
  time_t           mtime;
} source_file_t;

/*
 * scamper_source
 *
 * struct that keeps a record of the various address sources that scamper
 * is dealing with at present.
 *
 */
struct scamper_source
{
  /* reference count to decide when the source is no longer required */
  int                 refcnt;

  /* mix priority of this source */
  int                 priority;

  /* list of commands queued */
  slist_t            *commands;

  /* list of commands that are on hold until some external event completes */
  dlist_t            *on_hold;

  /*
   * the next two variables define the current state of the source based on
   * the queue it is in.  if set, the source is either in the active or
   * blocked state.  the first parameter defines which of the two lists it
   * is in, while the second parameter defines the list node the source has
   * been assigned.
   */
  void               *l;
  void               *ln;

  /* default command to use where none is otherwise specified */
  char               *command;
  size_t              command_len;

  /* the details of the list */
  scamper_list_t     *list;

  /* pointer to a structure that monitors when to write a cycle_stop record */
  scamper_cycle_t    *cycle;
  scamper_cyclemon_t *cyclemon;

  /* where the data should be written, if the source has a preference */
  scamper_outfile_t  *sof;

  /* how many cycle points are defined */
  int                 cycle_points;

  /*
   * if set, this source is adhoc; that is, it does not have a defined
   * command source.  an adhoc source may have commands that originate in
   * files, or are specified on the control socket.  if a source is not
   * adhoc, it may only obtain its commands from a single file, specified
   * below.
   */
  int                 adhoc;

  /*
   * the name of the input file that the source gets its commands from.
   * if set, the source does not take commands from other files, or on an
   * adhoc basis.
   */
  source_file_t      *file;

  /* maintain state when a file is being parsed for commands */
  alf_t              *alf;
};

/*
 * source_observe
 *
 */
typedef struct source_observe
{
  scamper_source_event_func_t  func;
  void                        *param;
  dlist_node_t                *node;
} source_observe_t;

/*
 * global variables for managing sources:
 *
 * a source is stored in one of two lists depending on its state.  it is
 * either stored in the active list, a round-robin circular list, or in
 * the blocked list.
 *
 * the source, if any, currently being used (that is, has not used up its
 * priority quantum) is pointed to by source_cur.  the number of tasks that
 * have been read from the current source in this rotation is held in
 * source_cnt.
 *
 * there are two special sources.  the default source is mostly used for
 * handling tasks passed in on the command line.  the preempt source is used
 * for handling high-priority tasks.
 *
 * the sources are stored in a tree that is searchable by name.  the tree
 * does not include special sources.
 */
static clist_t          *active      = NULL;
static dlist_t          *blocked     = NULL;
static scamper_source_t *source_cur  = NULL;
static int               source_cnt  = 0;
static scamper_source_t *source_def  = NULL;
static splaytree_t      *source_tree = NULL;

/*
 * scamper provides the ability for external monitoring of source events.
 * these observers are held in a list.
 */
static dlist_t          *observers = NULL;

/*
 * keep state to ensure that stdin is only ever supplied as a target
 * address source once.
 */
static int               stdin_used = 0;

/* global temporary buf for assembling commands */
static char             *command_buf = NULL;
static size_t            command_len = 0;

static int source_event_post_cb(void *item, void *param)
{
  scamper_source_event_t *event = (scamper_source_event_t *)param;
  source_observe_t *observe = (source_observe_t *)item;
  observe->func(event, observe->param);
  return 0;
}

static void source_event_post(scamper_source_t *source, int type,
                        scamper_source_event_t *ev)
{
  scamper_source_event_t sse;
  struct timeval tv;

  /* check if there is actually anyone observing */
  if(observers == NULL)
    {
      return;
    }

  /* if null event, then create one from scratch */
  if(ev == NULL)
    {
      memset(&sse, 0, sizeof(sse));
      ev = &sse;
    }

  gettimeofday_wrap(&tv);
  ev->source = source;
  ev->event = type;
  ev->sec = tv.tv_sec;
  dlist_foreach(observers, source_event_post_cb, ev);

  return;
}

/*
 * source_cycle_finish
 *
 * when the last cycle is written to disk, we can start on the next cycle.
 */
static void source_cycle_finish(scamper_cycle_t *cycle,
                        scamper_source_t *source,
                        scamper_outfile_t *outfile)
{
  scamper_file_t *sf;
  struct timeval tv;

  /* timestamp when the cycle ends */
  gettimeofday_wrap(&tv);
  cycle->stop_time = (uint32_t)tv.tv_sec;

  /* write the cycle stop record out */
  if(outfile != NULL)
    {
      sf = scamper_outfile_getfile(outfile);
      scamper_file_write_cycle_stop(sf, cycle);
    }

  /* last task for this cycle, reduce count of cycle points */
  if(source != NULL)
    {
      source->cycle_points--;
      if(source->alf != NULL && source->cycle_points < 2)
      {
        scamper_fd_read_unpause(source->alf->fd);
      }
    }

  return;
}

static void command_free(command_t *command)
{
  switch(command->type)
    {
    case COMMAND_PROBE:
      scamper_addr_free(command->command_probe_addr);
      scamper_cyclemon_unuse(command->command_probe_cyclemon);
      break;

    case COMMAND_CYCLE:
      break;
    }

  free(command);
  return;
}

/*
 * command_probe
 *
 * given the commands list and a target address, create a probe command
 * and put it on the head / tail of the list depending on the value of
 * the pushhead parameter; 0 means tail, 1 means head.
 */
static int command_probe(slist_t *commands, scamper_addr_t *addr,
                   scamper_cyclemon_t *cyclemon, void *data,
                   alloctask_t alloctask, freedata_t freedata)
{
  command_t *command = NULL;

  if((command = malloc_zero(sizeof(command_t))) == NULL)
    {
      goto err;
    }

  command->type                    = COMMAND_PROBE;
  command->command_probe_addr      = scamper_addr_use(addr);
  command->command_probe_cyclemon  = scamper_cyclemon_use(cyclemon);
  command->command_probe_data      = data;
  command->command_probe_alloctask = alloctask;
  command->command_probe_freedata  = freedata;

  if(slist_tail_push(commands, command) == NULL)
    {
      goto err;
    }

  return 0;

 err:
  if(command != NULL) command_free(command);
  return -1;
}

/*
 * command_cycle
 *
 * given the commands list, append a cycle command to it.
 */
static int command_cycle(scamper_source_t *source, scamper_cycle_t *cycle)
{
  command_t *command = NULL;

  if((command = malloc_zero(sizeof(command_t))) == NULL)
    {
      goto err;
    }

  command->type = COMMAND_CYCLE;
  command->command_cycle_cycle = cycle;

  if(slist_tail_push(source->commands, command) == NULL)
    {
      goto err;
    }

  source->cycle_points++;

  return 0;

 err:
  if(command != NULL) command_free(command);
  return -1;
}

/*
 * source_cycle
 *
 * allocate and initialise a cycle start object for the source.
 * write the cycle start to disk.
 */
static int source_cycle(scamper_source_t *source, uint32_t cycle_id)
{
  scamper_cyclemon_t *cyclemon = NULL;
  scamper_cycle_t *cycle = NULL;

  /* allocate the new cycle object */
  if((cycle = scamper_cycle_alloc(source->list)) == NULL)
    {
      goto err;
    }

  /* assign the cycle id */
  cycle->id = cycle_id;

  /* allocate structure to monitor references to the new cycle */
  if((cyclemon = scamper_cyclemon_alloc(cycle,
                              source_cycle_finish,
                              source,
                              source->sof)) == NULL)
    {
      goto err;
    }

  /* append the cycle record to the source's commands list */
  if(command_cycle(source, cycle) != 0)
    {
      goto err;
    }

  /*
   * if there is a previous cycle object associated with the source, then
   * free that.  also free the cyclemon.
   */
  if(source->cycle != NULL)
    {
      scamper_cycle_free(source->cycle);
    }
  if(source->cyclemon != NULL)
    {
      scamper_cyclemon_unuse(source->cyclemon);
    }

  /* store the cycle and we're done */
  source->cycle = cycle;
  source->cyclemon = cyclemon;
  return 0;

 err:
  if(cyclemon != NULL) scamper_cyclemon_free(cyclemon);
  if(cycle != NULL) scamper_cycle_free(cycle);
  return -1;
}

/*
 * source_file_open
 *
 * open the filename specified.  set it to non-blocking mode.
 * return the fd to the caller.
 */
static int source_file_open(char *filename)
{
  int fd = -1;

  if(strcmp(filename, "-") == 0)
    {
      if(stdin_used == 0)
      {
        if((fd = fileno(stdin)) == -1)
          {
            goto err;
          }
        stdin_used = 1;
      }
      else
      {
        goto err;
      }
    }
  else
    {
#if defined(WITHOUT_PRIVSEP)
      fd = open(filename, O_RDONLY);
#else
      fd = scamper_privsep_open_file(filename, O_RDONLY, 0);
#endif
      /* make sure the fd is valid, otherwise bail */
      if(fd == -1)
      {
        goto err;
      }
    }

  /* we don't want reads on this file descriptor to block */
  if(fcntl_set(fd, O_NONBLOCK) == -1)
    {
      goto err;
    }

  return fd;

 err:
  if(fd != -1) close(fd);
  return -1;
}

/*
 * source_cmp
 *
 * provide a sorting function for storing sources in a splay tree
 */
static int source_cmp(const void *a, const void *b)
{
  return strcasecmp(((const scamper_source_t *)b)->list->name,
                ((const scamper_source_t *)a)->list->name);
}

/*
 * source_next
 *
 * advance to the next source to read addresses from, and reset the
 * current count of how many addresses have been returned off the list
 * for this source-cycle
 */
static scamper_source_t *source_next(void)
{
  void *node;

  if((node = clist_node_next(source_cur->ln)) != source_cur->ln)
    {
      source_cur = clist_node_item(node);
    }

  source_cnt = 0;

  return source_cur;
}

/*
 * source_active_detach
 *
 * detach the source out of the active list.  move to the next source
 * if it is the current source that is being read from.
 */
static void source_active_detach(scamper_source_t *source)
{
  void *node;

  assert(source->l == active);

  if((node = clist_node_next(source->ln)) != source->ln)
    {
      source_cur = clist_node_item(node);
    }
  else
    {
      source_cur = NULL;
    }

  source_cnt = 0;

  clist_node_pop(active, source->ln);
  source->l = NULL;
  source->ln = NULL;

  return;
}

/*
 * source_blocked_detach
 *
 * detach the source out of the blocked list.
 */
static void source_blocked_detach(scamper_source_t *source)
{
  assert(source->l == blocked);

  dlist_node_pop(blocked, source->ln);
  source->ln = NULL;
  source->l = NULL;
  return;
}

/*
 * source_active_attach
 *
 * some condition has changed, which may mean the source can go back onto
 * the active list for use by the probing process.
 *
 * a caller MUST NOT assume that the source will necessarily end up on the
 * active list after calling this function.  for example, source_active_attach
 * may be called when new tasks are added to the command list.  however, the
 * source may have a zero priority, which means probing this source is
 * currently paused.
 */
static int source_active_attach(scamper_source_t *source)
{
  if(source->l == active)
    {
      return 0;
    }

  if(source->l == blocked)
    {
      /* if the source has a zero priority, it must remain blocked */
      if(source->priority == 0)
      {
        return 0;
      }
      source_blocked_detach(source);
    }

  if((source->ln = clist_tail_push(active, source)) == NULL)
    {
      return -1;
    }
  source->l = active;

  if(source_cur == NULL)
    {
      source_cur = source;
      source_cnt = 0;
    }

  return 0;
}

/*
 * source_blocked_attach
 *
 * put the specified source onto the blocked list.
 */
static int source_blocked_attach(scamper_source_t *source)
{
  if(source->l == blocked)
    {
      return 0;
    }

  if(source->l != NULL)
    {
      source_active_detach(source);
    }

  if((source->ln = dlist_tail_push(blocked, source)) == NULL)
    {
      return -1;
    }
  source->l = blocked;

  return 0;
}

/*
 * addresslist_unhold
 *
 * callback function that is used to take an on_hold structure and put
 * the command back on the source's command list.  finally, it puts the
 * source back on the active list (if it is not already).
 */
static void addresslist_unhold(void *cookie)
{
  on_hold_t        *on_hold = (on_hold_t *)cookie;
  scamper_source_t *source  = on_hold->source;
  command_t        *command = on_hold->command;

  /* remove the source's copy of the on-hold structure */
  dlist_node_pop(source->on_hold, on_hold->node);
  free(on_hold);

  /* push the command onto the front of the list */
  slist_head_push(source->commands, command);

  /* put the source on the active list */
  source_active_attach(source);

  return;
}

/*
 * addresslist_onhold
 *
 * the command (pointed to by command) cannot run right now because another
 * task (pointed to by task) is busy probing the same target.  put it on hold,
 * and note which source (pointed to by source) the command originates from.
 */
static int addresslist_onhold(scamper_source_t *source, scamper_task_t *task,
                        command_t *command)
{
  on_hold_t *on_hold;

  /* allocate the on-hold structure, and set up the basic parameters */
  if((on_hold = malloc_zero(sizeof(on_hold_t))) == NULL)
    {
      return -1;
    }
  on_hold->task = task;
  on_hold->source = source;
  on_hold->command = command;

  /* put the structure on the the source's onhold list */
  if((on_hold->node = dlist_tail_push(source->on_hold, on_hold)) == NULL)
    {
      goto cleanup;
    }

  /*
   * register the on-hold structure with the task we're blocked on, so that
   * when the task completes it can wake up the source (by using the
   * addresslist_unhold callback function).
   */
  if((on_hold->task_cookie =
      scamper_task_onhold(task, on_hold, addresslist_unhold)) == NULL)
    {
      goto cleanup;
    }

  return 0;

 cleanup:
  if(on_hold->node != NULL) dlist_node_pop(source->on_hold, on_hold->node);
  free(on_hold);
  return -1;
}

/*
 * source_command
 *
 */
static int source_command(scamper_source_t *source, char *command)
{
  char            *opts;
  alloctask_t      alloctask;
  freedata_t       freedata;
  scamper_trace_t *trace;
  scamper_ping_t  *ping;
  scamper_addr_t  *addr;
  void            *data;

  opts = string_nextword(command);

  if(strcasecmp(command, "trace") == 0)
    {
      if((trace = scamper_do_trace_alloc(opts)) == NULL)
      {
        return -1;
      }

      alloctask = (alloctask_t)scamper_do_trace_alloctask;
      freedata  = (freedata_t)scamper_trace_free;
      data      = trace;
      addr      = trace->dst;
    }
  else if(strcasecmp(command, "ping") == 0)
    {
      if((ping = scamper_do_ping_alloc(opts)) == NULL)
      {
        return -1;
      }

      alloctask = (alloctask_t)scamper_do_ping_alloctask;
      freedata  = (freedata_t)scamper_ping_free;
      data      = ping;
      addr      = ping->dst;
    }
  else goto err;

  if(source == NULL) source = source_def;

  if(command_probe(source->commands, addr, source->cyclemon, data,
               alloctask, freedata) != 0)
    {
      goto err;
    }

  source_active_attach(source);
  return 0;

 err:
  return -1;
}

static int command_assemble(char *cmd, size_t cmd_len,
                      char *addr, size_t addr_len)
{
  size_t reqd_len;
  void *tmp;

  /* allocate buffer large enough, if required */
  if(command_len < (reqd_len = cmd_len + 1 + addr_len + 1))
    {
      if((tmp = malloc(reqd_len)) == NULL)
      {
        return -1;
      }

      if(command_buf != NULL)
      {
        free(command_buf);
      }

      command_buf = tmp;
      command_len = reqd_len;
    }

  /* assemble the command string */
  memcpy(command_buf, cmd, cmd_len);
  command_buf[cmd_len] = ' ';
  memcpy(&command_buf[cmd_len+1], addr, addr_len+1);

  return 0;
}

/*
 * alf_free
 *
 * free up all resources related to an address-list-file.
 * if feedlastline is set, then any partial line is parsed.
 * if closefd is set, then the file descriptor is closed.
 */
static void alf_free(alf_t *alf, int feedlastline)
{
  int fd = -1;

  if(alf->lp != NULL)
    {
      scamper_linepoll_free(alf->lp, feedlastline);
    }

  if(alf->fd != NULL)
    {
      fd = scamper_fd_fd_get(alf->fd);
      scamper_fd_free(alf->fd);
    }

  if(fd != -1)
    {
      close(fd);
    }

  free(alf);
  return;
}

/*
 * alf_read_line
 *
 * this callback receives a single line per call, which should contain an
 * address in string form.  it combines that address with the source's
 * default command and then passes the string to source_command for further
 * processing.  the line eventually ends up in the commands queue.
 */
static int alf_read_line(void *param, uint8_t *buf, size_t len)
{
  alf_t *alf = (alf_t *)param;
  scamper_source_t *source = alf->source;
  char *str = (char *)buf;

  /* make sure the string contains only printable characters */
  if(string_isprint(str, len) == 0)
    {
      return -1;
    }

  /* null terminate at these characters */
  string_nullterm(str, " \r\t#");

  /* make sure the line isn't blank or a comment line */
  if(str[0] == '\0' || str[0] == '#')
    {
      return 0;
    }

  if(command_assemble(source->command, source->command_len, str, len) != 0)
    {
      return -1;
    }

  /* resolve the address */
  if(source_command(source, command_buf) == -1)
    {
      return -1;
    }

  return 0;
}

/*
 * alf_read
 *
 * this callback is called when the fd has something ready to read.
 */
static void alf_read(const int fd, void *param)
{
  alf_t *alf = (alf_t *)param;
  scamper_source_t *source = alf->source;
  uint8_t buf[1024];
  ssize_t rc;
  time_t mtime;
  int reload = 0;
  int newfd;

  if((rc = read(fd, buf, sizeof(buf))) > 0)
    {
      /* got data to read. parse the buffer for addresses, one per line. */
      scamper_linepoll_handle(alf->lp, buf, (size_t)rc);

      /*
       * if probe queue for this source is sufficiently large, then
       * don't read any more for the time being
       */
      if(slist_count(source->commands) >= scamper_pps_get())
      {
        scamper_fd_read_pause(alf->fd);
      }
    }
  else if(rc == 0 && (source->adhoc == 1 || source->file->cycles == 1))
    {
      /*
       * when we get EOF on an adhoc source, or we complete the last cycle
       * over an input file, the input file is closed
       */
      alf_free(alf, 1);
      source->alf = NULL;
    }
  else if(rc == 0 && source->adhoc == 0)
    {
      /* a cycle value of zero means cycle indefinitely */
      if(source->file->cycles != 0)
      {
        source->file->cycles--;
      }

      /* decide if we should reload the file at this point */
      if(source->file->reload == 1)
      {
        /* stat the file so we have an mtime value for later */
        if(stat_mtime(source->file->filename, &mtime) == 0)
          {
            reload = 1;
          }
      }
      else if(source->file->autoreload == 1)
      {
        /*
         * reload is conditional on being able to stat the file, and the
         * mtime being different to whatever our record of the mtime is
         */
        if(stat_mtime(source->file->filename, &mtime) == 0 &&
           source->file->mtime != mtime)
          {
            reload = 1;
          }
      }

      /* we have to reload the file (if we can open it) */
      if(reload == 1 && (newfd=source_file_open(source->file->filename)) != -1)
      {
        /* use the new file descriptor */
        if(scamper_fd_fd_set(source->alf->fd, newfd) == -1)
          {
            goto err;
          }

        /* close the existing file */
        close(fd);

        /* update file details; ensure reload is reset to zero */
        source->file->mtime = mtime;
        source->file->reload = 0;
      }
      else
      {
        /* rewind the current file position */
        if(lseek(fd, 0, SEEK_SET) == -1)
          {
            goto err;
          }
      }

      /* check to see if we should pause, or allow reading to continue */
      if(source->cycle_points < 1)
      {
        scamper_fd_read_unpause(source->alf->fd);
      }
      else
      {
        scamper_fd_read_pause(source->alf->fd);
      }

      /* create a new cycle record, etc */
      if(source_cycle(source, source->cycle->id + 1) == -1)
      {
        goto err;
      }
    }
  else if(rc == -1 && errno != EAGAIN)
    {
      printerror(errno, strerror, __func__, "read failed");
      goto err;
    }

  return;

 err:
  alf_free(alf, 0);
  source->alf = NULL;
  return;
}

static alf_t *alf_alloc(int fd, scamper_source_t *source)
{
  alf_t *alf = NULL;

  if((alf = malloc_zero(sizeof(alf_t))) == NULL)
    {
      goto err;
    }

  if((alf->fd = scamper_fd_private(fd, alf_read, alf, NULL, NULL)) == NULL)
    {
      goto err;
    }

  if((alf->lp = scamper_linepoll_alloc(alf_read_line, alf)) == NULL)
    {
      goto err;
    }

  alf->source = source;

  return alf;

 err:
  if(alf != NULL)
    {
      if(alf->fd != NULL) scamper_fd_free(alf->fd);
      if(alf->lp != NULL) scamper_linepoll_free(alf->lp, 0);
      free(alf);
    }
  return NULL;
}

/*
 * source_free
 *
 * clean up the source
 */
static void source_free(scamper_source_t *source)
{
  command_t *command;
  on_hold_t *onhold;

  if(source->cyclemon != NULL)
    {
      scamper_cyclemon_source_detach(source->cyclemon);
      scamper_cyclemon_unuse(source->cyclemon);
      source->cyclemon = NULL;
    }

  /* detach the source from whatever list it is in */
  if(source->l == active)
    {
      source_active_detach(source);
    }
  else if(source->l == blocked)
    {
      source_blocked_detach(source);
    }

  /*
   * empty the source of any addresses that are currently on hold.
   * this occurs before we flush the commands list as it is possible that
   * an on-hold address might end up in the command list for this source.
   */
  if(source->on_hold != NULL)
    {
      while((onhold = dlist_head_pop(source->on_hold)) != NULL)
      {
        scamper_task_dehold(onhold->task, onhold->task_cookie);
        free(onhold);
      }
      dlist_free(source->on_hold);
    }

  /* now empty the list of commands */
  if(source->commands != NULL)
    {
      while((command = slist_head_pop(source->commands)) != NULL)
      {
        if(command->type == COMMAND_PROBE)
          {
            command->command_probe_freedata(command->command_probe_data);
          }
        command_free(command);
      }
      slist_free(source->commands);
    }

  /* release this structure's hold on the scamper_outfile */
  if(source->sof != NULL) scamper_outfile_free(source->sof);

  /* remove the source from the source tree */
  splaytree_remove_item(source_tree, source);

  /* free up details of the source file */
  if(source->file != NULL)
    {
      if(source->file->filename != NULL) free(source->file->filename);
      free(source->file);
    }

  if(source->alf != NULL) alf_free(source->alf, 0);
  if(source->command != NULL) free(source->command);
  if(source->list != NULL) scamper_list_free(source->list);
  if(source->cycle != NULL) scamper_cycle_free(source->cycle);

  free(source);
  return;
}

scamper_source_t *scamper_source_use(scamper_source_t *source)
{
  source->refcnt++;
  return source;
}

void scamper_source_unuse(scamper_source_t *source)
{
  /*
   * decrement the reference count.  check to see if the reference count
   * shows there are active tasks probing.  if there are active tasks, then
   * return now.
   */
  if(--source->refcnt > 1)
    {
      return;
    }

  /*
   * if the source is an adhoc source, then don't remove it; other commands
   * may be supplied in the future
   */
  if(source->adhoc == 1)
    {
      return;
    }

  /* if there are more cycles to come, then don't remove the source */
  if(source->file->cycles != 1)
    {
      return;
    }

  /*
   * do not remove a source that has targets queued, or has the possibility
   * of having targets queued
   */
  if(slist_count(source->commands) > 0 || source->alf != NULL ||
     dlist_count(source->on_hold) > 0)
    {
      return;
    }

  source_event_post(source, SCAMPER_SOURCE_EVENT_FINISH, NULL);
  source_free(source);

  return;
}

void *scamper_addresslist_observe(scamper_source_event_func_t func, void *p)
{
  source_observe_t *observe = NULL;

  if(observers == NULL && (observers = dlist_alloc()) == NULL)
    {
      goto err;
    }

  if((observe = malloc_zero(sizeof(source_observe_t))) == NULL)
    {
      goto err;
    }

  observe->func = func;
  observe->param = p;

  if((observe->node = dlist_tail_push(observers, observe)) == NULL)
    {
      goto err;
    }

  return observe;

 err:
  if(observe != NULL)
    {
      if(observe->node != NULL) dlist_node_pop(observers, observe->node);
      free(observe);
    }
  return NULL;
}

void scamper_addresslist_unobserve(void *handle)
{
  source_observe_t *observe = (source_observe_t *)handle;

  if(observers == NULL)
    {
      return;
    }

  /* free the node */
  dlist_node_pop(observers, observe->node);
  free(observe);

  /* if there are no other observations, free the list as well */
  if(dlist_count(observers) == 0)
    {
      dlist_free(observers);
      observers = NULL;
    }

  return;
}

uint32_t scamper_source_getlistid(const scamper_source_t *source)
{
  return source->list->id;
}

uint32_t scamper_source_getcycleid(const scamper_source_t *source)
{
  return source->cycle->id;
}

const char *scamper_source_getname(const scamper_source_t *source)
{
  return source->list->name;
}

const char *scamper_source_getdescr(const scamper_source_t *source)
{
  return source->list->descr;
}

int scamper_source_getpriority(const scamper_source_t *source)
{
  return source->priority;
}

int scamper_source_getadhoc(const scamper_source_t *source)
{
  return source->adhoc;
}

scamper_outfile_t *scamper_source_getoutfile(const scamper_source_t *s)
{
  /* if there is no defined outfile, then the default outfile will be used */
  if(s->sof == NULL)
    {
      /* passing NULL means the default outfile */
      return scamper_outfiles_get(NULL);
    }
  return s->sof;
}

const char *scamper_source_getfilename(const scamper_source_t *source)
{
  if(source->file == NULL) return NULL;
  return source->file->filename;
}

int scamper_source_getcycles(const scamper_source_t *source)
{
  if(source->file == NULL) return 1;
  return source->file->cycles;
}

int scamper_source_getautoreload(const scamper_source_t *source)
{
  if(source->file == NULL) return 0;
  return source->file->autoreload;
}

int scamper_source_update(scamper_source_t *source,
                    const int *autoreload, const int *cycles,
                    const int *priority)
{
  scamper_source_event_t sse;
  int old_priority;

  /* first, ensure there is actually some parameter to update */
  if(autoreload == NULL && cycles == NULL && priority == NULL)
    {
      return 0;
    }

  /*
   * second, sanity check the parameters;
   *
   * if there is no file, then can't update autoreload.
   * if there is no file, then can't update cycle count.
   * negative priority forbidden
   */
  if((autoreload != NULL && source->file == NULL) ||
     (cycles != NULL && source->file == NULL)  ||
     (priority != NULL && *priority < 0))
    {
      return -1;
    }

  /* build up an event structure to post */
  memset(&sse, 0, sizeof(sse));

  if(autoreload != NULL)
    {
      sse.sse_update_flags |= 0x01;
      sse.sse_update_autoreload = *autoreload;
      source->file->autoreload = *autoreload;
    }

  if(cycles != NULL)
    {
      sse.sse_update_flags |= 0x02;
      sse.sse_update_cycles = *cycles;
      source->file->cycles = *cycles;
    }

  if(priority != NULL)
    {
      sse.sse_update_flags |= 0x04;
      sse.sse_update_priority = *priority;

      /* swap the priorities around */
      old_priority = source->priority;
      source->priority = *priority;

      /* if priority is set to zero */
      if(*priority == 0 && old_priority > 0)
      {
        source_blocked_attach(source);
      }
      /* else if the priority is raised from zero */
      else if(*priority > 0 && old_priority == 0)
      {
        source_active_attach(source);
      }
    }

  source_event_post(source, SCAMPER_SOURCE_EVENT_UPDATE, &sse);

  return 0;
}

/*
 * scamper_source_do
 */
int scamper_source_do(scamper_source_t *source, char *command)
{
  return source_command(source, command);
}

int scamper_source_do_array(scamper_source_t *source, char *command,
                      char **iparray, int ipcount)
{
  size_t cmd_len;
  int i;

  /* need a command to go with the array */
  if(command == NULL && (command = source->command) == NULL)
    {
      return -1;
    }
  cmd_len = strlen(command);

  for(i=0; i<ipcount; i++)
    {
      if(command_assemble(command,cmd_len,iparray[i],strlen(iparray[i])) != 0)
      {
        return -1;
      }

      if(source_command(source, command_buf) != 0)
      {
        return -1;
      }
    }

  return 0;
}

/*
 * scamper_source_do_file
 *
 * queue the contents of the specified file for reading on an ad-hoc
 * list.
 */
int scamper_source_do_file(scamper_source_t *source, char *filename)
{
  int fd = -1;

  if(source == NULL) source = source_def;

  /* only adhoc sources may have files queued, and only one file at a time */
  if(source->adhoc != 1 || source->alf != NULL)
    {
      goto err;
    }

  if((fd = source_file_open(filename)) == -1)
    {
      goto err;
    }

  if((source->alf = alf_alloc(fd, source)) == NULL)
    {
      goto err;
    }

  return 0;

 err:
  if(fd != -1) close(fd);
  return -1;
}

/*
 * scamper_addresslist_addsource
 *
 * link an address list into rotation.
 */
scamper_source_t *scamper_addresslist_addsource(scamper_source_params_t *ssp)
{
  scamper_source_t *source = NULL;
  int fd = -1;
  int is_default = 0;
  const char *def;
  int cycleid;

  if((def = scamper_option_listname()) == NULL)
    {
      def = "default";
    }

  if(strcasecmp(ssp->name, def) == 0)
    {
      is_default = 1;
    }

  /*
   * don't let a source named default get into the tree if there already
   * is a default source
   */
  if(is_default && source_def != NULL)
    {
      goto err;
    }

  /* if a source is not 'ad-hoc', then it has to specify a file */
  if(ssp->adhoc == 0 && ssp->filename == NULL)
    {
      goto err;
    }

  if((source = malloc_zero(sizeof(scamper_source_t))) == NULL)
    {
      goto err;
    }

  source->refcnt = 1;
  source->priority = ssp->priority;
  source->adhoc = ssp->adhoc;

  /* set the default command to use with the source */
  if((source->command = strdup(ssp->command != NULL ?
                         ssp->command : scamper_command_get())) == NULL)
    {
      goto err;
    }
  source->command_len = strlen(source->command);

  /* if we have been passed an output file to target, then use it */
  if(ssp->sof != NULL)
    {
      source->sof = scamper_outfile_use(ssp->sof);
    }

  /* the targets list is an ordered list of addresses to probe */
  if((source->commands = slist_alloc()) == NULL)
    {
      goto err;
    }

  /* the on-hold list is a collection of on_hold records for this source */
  if((source->on_hold = dlist_alloc()) == NULL)
    {
      goto err;
    }

  /*
   * this structure defines the source where a series of traces came
   * from.  it is shared by the source and by the traces that reference
   * it.
   */
  if((source->list = scamper_list_alloc(ssp->list_id, ssp->name, ssp->descr,
                              scamper_monitorname_get())) == NULL)
    {
      goto err;
    }

  /* initialise the cycle record */
  if(is_default == 1)
    {
      if((cycleid = scamper_option_cycleid()) >= 0 &&
       source_cycle(source, cycleid) == -1)
      {
        goto err;
      }
    }
  else
    {
      if(source_cycle(source, ssp->cycle_id) == -1)
      {
        goto err;
      }
    }

  /* if there is a file to initialise the source with, then open it now */
  if(ssp->filename != NULL)
    {
      if((fd = source_file_open(ssp->filename)) == -1)
      {
        goto err;
      }

      if((source->alf = alf_alloc(fd, source)) == NULL)
      {
        goto err;
      }
    }

  /*
   * if the source is a managed list, then we need to keep state on the
   * destination list file.
   */
  if(source->adhoc == 0)
    {
      if((source->file = malloc_zero(sizeof(source_file_t))) == NULL)
      {
        goto err;
      }

      if(fstat_mtime(fd, &source->file->mtime) == -1)
      {
        goto err;
      }

      if((source->file->filename = strdup(ssp->filename)) == NULL)
      {
        goto err;
      }

      source->file->autoreload = ssp->autoreload;
      source->file->cycles = ssp->cycles;
    }

  /*
   * put the source into the splaytree, where it can be found quickly
   * when needed.
   */
  if(splaytree_insert(source_tree, source) == NULL)
    {
      goto err;
    }

  /*
   * put the source in the blocked list, as there currently is no
   * addresses ready to be probed at this time.
   */
  if(is_default == 0 && source_blocked_attach(source) == -1)
    {
      goto err;
    }

  source_event_post(source, SCAMPER_SOURCE_EVENT_ADD, NULL);

  return source;

 err:
  if(fd != -1) close(fd);
  source_free(source);
  return NULL;
}

/*
 * scamper_addresslist_getsource
 *
 * look for the address list specified by the name passed
 */
scamper_source_t *scamper_addresslist_getsource(char *name)
{
  scamper_source_t findme;
  scamper_list_t list;
  list.name = name;
  findme.list = &list;
  return (scamper_source_t *)splaytree_find(source_tree, &findme);
}

/*
 * scamper_addressslist_delsource
 *
 * remove the source from the list of available address sources if
 * possible
 */
int scamper_addresslist_delsource(scamper_source_t *source)
{
  const char *def;

  if((def = scamper_option_listname()) == NULL)
    {
      def = "default";
    }

  /* the default source cannot be removed */
  if(strcasecmp(source->list->name, def) == 0)
    {
      return -1;
    }

  /* if there are external references to the source, then don't free it */
  if(source->refcnt > 1)
    {
      return -1;
    }

  source_event_post(source, SCAMPER_SOURCE_EVENT_DELETE, NULL);

  source_free(source);
  return 0;
}

/*
 * scamper_addresslist_cyclesource
 *
 * if the source is an adhoc source [and it isn't the default source]
 * then append a cycle command to the source.
 */
int scamper_addresslist_cyclesource(scamper_source_t *source)
{
  /* check that the source is not managed / is not the default list */
  if(source->adhoc == 0 || source == source_def)
    {
      return -1;
    }

  /* append the cycle command */
  if(source_cycle(source, source->cycle->id + 1) == -1)
    {
      return -1;
    }

  source_active_attach(source);

  return 0;
}

/*
 * scamper_addresslist_empty
 *
 * remove all addresses and sources in the address list
 * XXX: fix up to use source reference counting properly
 */
void scamper_addresslist_empty()
{
  scamper_source_t *source;

  /*
   * for each source, go through and empty the lists, close the files, and
   * leave the list of sources available to read from empty.
   */
  while((source = dlist_tail_pop(blocked)) != NULL)
    {
      source->l = NULL; source->ln = NULL;
      source_free(source);
    }

  while((source = clist_tail_pop(active)) != NULL)
    {
      source->l = NULL; source->ln = NULL;
      source_free(source);
    }

  return;
}

void scamper_addresslist_foreach(void *param,
                         int (*func)(void *p, scamper_source_t *src))
{
  splaytree_inorder(source_tree, (splaytree_inorder_t)func, param);
  return;
}

/*
 * command_probe_handle
 *
 *
 */
static int command_probe_handle(scamper_source_t *source, command_t *command,
                        scamper_task_t **task_out)
{
  scamper_target_t *target;
  scamper_cycle_t *cycle;
  scamper_task_t *task;

  /*
   * if this command is blocked, then we can't allocate the caller a task.
   * put the command on hold, and return NULL in task_out.
   */
  if((target = scamper_target_find(command->command_probe_addr)) != NULL)
    {
      addresslist_onhold(source, target->task, command);
      *task_out = NULL;
      return 0;
    }

  /* get a pointer to the cycle for *this* task */
  cycle = scamper_cyclemon_cycle(command->command_probe_cyclemon);

  /* allocate the task structure to keep everything together */
  if((task = command->command_probe_alloctask(command->command_probe_data,
                                    source->list, cycle)) == NULL)
    {
      command_free(command);
      return -1;
    }

  /* pass the cyclemon structure to the task */
  task->cyclemon = scamper_cyclemon_use(command->command_probe_cyclemon);

  /* pass reference to the file to write the task to */
  task->source = scamper_source_use(source);

  /* don't need the command any more */
  command_free(command);

  /* return to the caller the task we allocated */
  *task_out = task;

  return 0;
}

static int command_cycle_handle(scamper_source_t *source, command_t *command)
{
  scamper_source_event_t sse;
  scamper_cycle_t *cycle = command->command_un.cycle.cycle;
  scamper_file_t *file;
  struct timeval tv;
  char hostname[MAXHOSTNAMELEN];

  /* get the hostname of the system for the cycle point */
  if(gethostname(hostname, sizeof(hostname)) == 0)
    {
      cycle->hostname = strdup(hostname);
    }

  /* get a timestamp for the cycle start point */
  gettimeofday_wrap(&tv);
  cycle->start_time = (uint32_t)tv.tv_sec;

  /* write a cycle start point to disk if there is a file to do so */
  if(source->sof != NULL &&
     (file = scamper_outfile_getfile(source->sof)) != NULL)
    {
      scamper_file_write_cycle_start(file, cycle);
    }

  /* post an event saying the cycle point just rolled around */
  sse.sse_cycle_cycle_id = cycle->id;
  source_event_post(source, SCAMPER_SOURCE_EVENT_CYCLE, &sse);

  command_free(command);
  return 0;
}

/*
 * scamper_addresslist_get
 *
 * return the next address to trace.
 *
 * note that if this function returns null, it does not necessarily infer
 * that there are no more addresses to trace; if there are addresses
 * currently on-hold that are blocked on other traces in the active window
 * and no other traces to enqueue then this function will return null.
 *
 * XXX: replace head_pop with head_node, and then head_pop when the item
 * is successfully returned.
 */
scamper_task_t *scamper_addresslist_get(void)
{
  scamper_source_t *source;
  scamper_task_t *task;
  command_t *command;

  /*
   * if the priority of the source was changed in between calls to this
   * function, then make sure the source's priority hasn't been lowered to
   * below how many tasks it has had allocated in this cycle
   */
  if(source_cur != NULL && source_cnt >= source_cur->priority)
    {
      source_next();
    }

  if((source = source_cur) == NULL)
    {
      return NULL;
    }

  /*
   * if the number of available tasks is less than the packets-per-second
   * rate, and we're not too far ahead, then queue the source to read more
   * tasks.
   */
  if(source->alf != NULL && source->cycle_points < 2 &&
     slist_count(source->commands) < scamper_pps_get())
    {
      scamper_fd_read_unpause(source->alf->fd);
    }

  while((source = source_cur) != NULL)
    {
      assert(source->priority != 0);

      /* fetch commands off until we have a task to return */
      while((command = slist_head_pop(source->commands)) != NULL)
      {
        if(command->type == COMMAND_PROBE)
          {
            if(command_probe_handle(source, command, &task) == -1)
            {
              return NULL;
            }
            else if(task == NULL)
            {
              continue;
            }

            source_cnt++;
            return task;
          }
        else if(command->type == COMMAND_CYCLE)
          {
            command_cycle_handle(source, command);
          }
      }

      /* the previous source could not supply a command */
      assert(slist_count(source->commands) == 0);

      /*
       * if there is still the possibility of getting an address
       * out of the source then put the list in the blocked list.
       */
      if(source->alf != NULL || dlist_count(source->on_hold) > 0) 
      {
        source_blocked_attach(source);
      }
      /*
       * if the source is the default list, then we merely detach it
       * from the active list and leave it 'dangling' in that it is not
       * in either of the active or blocked lists.
       */
      else if(source == source_def)
      {
        source_active_detach(source);
      }
      /*
       * put the source onto the blocked list for now.
       */
      else
      {
        source_blocked_attach(source);
      }
    }

  /* there shouldn't be any active address list sources available ... */
  assert(clist_count(active) == 0);

  return NULL;
}

/*
 * scamper_addresslist_isempty
 *
 * return to the caller if the address list is empty or not.
 */
int scamper_addresslist_isempty()
{
  /*
   * if there are either active or blocked address list sources, the list
   * can't be empty
   */
  if(clist_count(active) > 0 || dlist_count(blocked) > 0)
    {
      return 0;
    }

  /*
   * if the default source is waiting for more addresses then the list
   * can't be empty
   */
  if(source_def->alf != NULL)
    {
      return 0;
    }

  return 1;
}

/*
 * scamper_addresslist_isready
 *
 * return to the caller if the address list appears to be able to supply
 * an address for probing.
 */
int scamper_addresslist_isready()
{
  if(source_cur != NULL)
    {
      return 1;
    }

  return 0;
}

int scamper_addresslist_init()
{
  scamper_source_params_t ssp;
  const char *list_name;
  int   list_id;

  if((active = clist_alloc()) == NULL)
    {
      return -1;
    }

  if((blocked = dlist_alloc()) == NULL)
    {
      return -1;
    }

  if((source_tree = splaytree_alloc(source_cmp)) == NULL)
    {
      return -1;
    }

  if((list_id = scamper_option_listid()) < 0)
    {
      list_id = 0;
    }

  if((list_name = scamper_option_listname()) == NULL)
    {
      list_name = "default";
    }

  /* create the default address list source and put it into rotation */
  memset(&ssp, 0, sizeof(ssp));
  ssp.list_id  = list_id;
  ssp.name     = (char *)list_name;
  ssp.descr    = "default";
  ssp.priority = 1;
  ssp.adhoc    = 1;

  if((source_def = scamper_addresslist_addsource(&ssp)) == NULL)
    {
      return -1;
    }

  return 0;  
}

void scamper_addresslist_cleanup()
{
  struct timeval tv;

  if(source_def != NULL)
    {
      if(source_def->cycle != NULL)
      {
        /* timestamp when the cycle ends */
        gettimeofday_wrap(&tv);
        source_def->cycle->stop_time = (uint32_t)tv.tv_sec;
        scamper_file_write_cycle_stop(scamper_outfile_getfile(
                               scamper_outfiles_get(NULL)),
                              source_def->cycle);
      }

      source_free(source_def);
      source_def = NULL;
    }

  if(source_tree != NULL)
    {
      splaytree_free(source_tree, NULL);
      source_tree = NULL;
    }

  if(observers != NULL)
    {
      dlist_free(observers);
      observers = NULL;
    }

  if(blocked != NULL)
    {
      dlist_free(blocked);
      blocked = NULL;
    }

  if(active != NULL)
    {
      clist_free(active);
      active = NULL;
    }

  if(command_buf != NULL)
    {
      free(command_buf);
      command_buf = NULL;
    }

  return;
}

Generated by  Doxygen 1.6.0   Back to index