Child pages
  • Getting Started with the TI AWR1642 mmWave Sensor
Skip to end of metadata
Go to start of metadata

Introduction

Texas Instruments' XWR1642 is an integrated mmWave radar system in a single package.  The device contains all the RF circuity to transmit and receive 76-81 GHz FMCW (Frequency Modulated Continuous Wave) signals combined with both a TI C674x DSP and an ARM Cortex-R4F to handle all the RF signal processing and application code, respectively.  This high frequency band provides a number of advantages over other radar implementations.  For example, millimeter level accuracy and the ability to "see" through objects such as plastic or clothing or elemental interference like rain or fog.  Two flavors of this IC are available from TI, either an Automotive version (AWR) or Industrial version (IWR), with the AWR version having some additional automotive certifications and an extra CAN interface.  Although this tutorial was developed using the AWR version of the sensor, it should be fully compatible with the IWR version as well since they are nearly identical.  

This guide will demonstrate the steps required to start developing with the AWR1642 by building on top of the labs TI provides in their mmWave Toolboxes.  The target application is a navigation system for an autonomous robot although it is just a simulation for now.  TI's Short Range Radar (SRR) lab from the Automotive Toolbox is used as the base firmware since it already has all the radar configuration and processing code.  

Resources

This project was developed using the AWR1642BOOST development kit and Code Composer Studio v9.1.  There are a number of additional software requirements needed by Code Composer that will be covered in the next section.  

Setting up Pre-Requisites

There is a lot to set up and download before starting a project.  Because this guide uses a project from the Automotive Toolbox, the first step will be downloading that and getting the documentation from it.  All the pre-reqs for each example project will be contained in that.  

  1. In Code Composer, select View → Resource Explorer.  
  2. Within the Resource Explorer tab, select Software → mmWave Sensors.
  3. Double-click on Automotive Toolbox and select the Install button on the far right.




    Even if you have an automotive (AWR) kit, the Industrial Toolbox labs will still work. Don't be afraid to download both toolboxes for more examples. However, the AWR1642BOOST-ODS kit only works with the AWR ODS demo because it has a different antenna structure.

  4. When the Toolbox has finished downloading, open the Obstacle Detection lab.
  5. Double-click on each of the two CCS Projects (one for the DSP, one for the Cortex) and click the Import to IDE button on the top right.  
  6. By default, the Toolbox files will be installed to C:/TI/mmwave_automotive_toolbox_<version>. Navigate there and then open labs/lab0002_short_range_radar/docs/AutoSrr_usersguide.html.  This manual describes the project and all the requirements for building it.  

  7. Download everything from the Software Requirements and Firmware Requirements.  You don't need the MATLAB Runtime for this tutorial, but you will if you run TI's GUI.  Pay close attention to the versions listed because the newest version of the various software packages is not always compatible, especially the mmWave SDK.  A quick summary of the important packages and versions used for this tutorial is provided below.  Other packages that are installed with CCS and the mmWave SDK should be the correct version.  

    PackageVersion
    mmWave Automotive Toolbox2.5.0
    mmWave SDK2.0.0.04
    SYS/BIOS6.53.2.00
    TI DSP Compiler8.1.3
    TI ARM Compiler*18.12.1.LTS

    *This version of the ARM compiler is different than the specified version of 16.9.6.LTS, but still worked with the examples.  I wasn't able to find a place to download the older compiler.

  8. Verify that your project is using all the correct versions of the software packages by right-clicking the project and selecting Properties → General.  The first tab will show the compiler version.  Click on the Products tab to see the additional packages being used.  
  9. At this point, you should be able to build both projects by right-clicking and selecting Rebuild Project.

Debugging Setup

There are some extra steps required to use debugging on the AWR1642 which are described in the lab's User Guide and I won't cover in detail here.  Refer to the section 4 (Execute the Lab) and click the Expand for help with Debug mode button to enable programming and debugging through Code Composer Studio. 

Because the process to enable debugging requires a binary to be flashed on the device, take care to avoid resetting the device through CCS.  If you are debugging and try to reset/reprogram the device in CCS, you will overwrite the debug binary that was flashed in the instructions mentioned above.  If this happens, the processor will crash on boot and show a fault in the debug window.  To fix this, re-flash the binary from above to get back into debug mode


To avoid issues, I use the following method to reset the device while debugging:

  1. In the Debug window, right-click on Group 1 and select Disconnect Target.
  2. Press the reset button on the target board.  This will re-run the debug program.
  3. In the Debug window, right-click on Group 1 and select Connect Target.  
  4. Load programs and debug as usual.  

Parsing the Example Code

There is a lot of code in the two example projects to sift through.  Including the extensive comments/documentation, there are about 10k lines of code between the few primary files in both projects.  This section will provide an overview of some of the most noteworthy sections for the core functionality of the demo, grouped by processor core.  See also the comment block at the top of srrdemo_16xx_mss/mss_main.c for a more complete description of the project. 

Although this example uses the Short Range Radar lab, many of the other labs included in the mmWave Automotive and Industrial Toolboxes follow a similar structure.  Some of the following information may apply to those.  

MSS (ARM)

The ARM core has three main duties in this demo:  run the CLI for configuration from a host, log radar results to the serial port, and run the robot navigation application that will be added later.  

Command Line Interface

The CLI initialization is defined in SRR_MSS_CLIInit() in the mss_srr_cli.c file.  Most of the commands defined here are used to pass configuration settings to the DSP.  

If you want to use any custom commands for your application, you can add them here.  Each entry requires a command string, a help string, and a handler function.  The following is an example entry for this application which updates the destination for the navigation code.  

cli.c - Example Snippet
/* Custom CLI command */
cliCfg.tableEntry[cnt].cmd            = "navSetDestination";
cliCfg.tableEntry[cnt].helpString     = "<x> <y>";
cliCfg.tableEntry[cnt].cmdHandlerFxn  = navigation_CLISetDestination;
cnt++;

Log Results

Writing the radar results to the UART happens in MmwDemo_mboxReadTask() in mss_main.c.  This function checks the mailbox that is shared in memory between the two processors.  If the message contains the object detection results, they are sent directly over the UART.  The format of these packets is described in the lab user's guide so I won't cover it again here.  We will make use of these packets later in the tutorial.  

Run Navigation Algorithm

Because the MmwDemo_mboxReadTask() function from the last step is where the ARM gets the radar data from the DSP, it also happens to be a great place to add any application code that uses the radar data.  The detected object data is located in the message.body element of the structure.  

mss_main.c - Snippet
/* Process the received message: */
switch (message.type)
{
    case MMWDEMO_DSS2MSS_DETOBJ_READY:
        /* Use radar data to update and recalculate mapping data */
        updateMapFromRadar(&message.body.detObj);

        /* Modify/hijack message to append path data */
        appendPathTLV(&message.body.detObj);

        /* ... */
}

DSS (C674x)

The DSP's job, as you may have guessed, is to do math.  dss_data_path.c contains the majority of the radar processing code.  MmwDemo_interFrameProcessing is the primary processing function and contains the second radar FFT, as well as CFAR, tracking, and other procedures.  

Back in the dss_main.c file, MmwDemo_dssDataPathProcessEvents() is the main event handler and calls the data path functions to do the processing.  MmwDemo_dssSendProcessOutputToMSS() is the function where all of the radar results get copied to the shared mailbox.  The resulting message will trigger the MMWDEMO_DSS2MSS_DETOBJ_READY case shown above in the MmwDemo_mboxReadTask() function.  

Because the DSP is set up with the object detection processing needed for this tutorial, there isn't anything else to add or modify.  

Navigation

Now that the obstacle detection is being handled by TI's firmware, it's time to do something useful with the data.  In this tutorial, the goal is to find the shortest path for a hypothetical robot to travel around the obstacles.  Dijkstra's algorithm will be used to implement the search. 

First, a couple of characteristics for the robot need to be defined.  

  • TI's default SRR radar configuration gives a max detection distance of about 80m with a field of view of 120 degrees.  However, that is much more than this application requires and the 20m USRR (Ultra Short Range Radar) setting will be used.  
  • Assume the robot uses a 1m grid for navigation.  The actual size of the robot can be smaller, but 1m allows for extra clearance (and steps of 1 make things easy).  
  • Both orthogonal and diagonal movement is allowed between grid spaces.  
    • Forward and backward movement is allowed, but technically the radar only "sees" forward.

Using this information, we can start building up the navigation code.  It will consist of 3 layers:  a graph, a map, and an interface layer.  If you downloaded the project source files from above, these files will already be included in the project under the navigation directory.  

Typically these sorts of algorithms use malloc() to dynamically allocate the graph nodes, but since this is running on an embedded system where memory is less plentiful and because of the other issues with dynamic memory allocation, a static implementation is used.  


Graph Layer

The graph level is the lowest level of the navigation application.  A graph consists of nodes and the connections between the nodes.  In this case, each node is connected to all of its neighbors because the robot is allowed to move to any adjacent grid space.  For example, a 3x3 graph would look like this:

At this level, the graph has no knowledge of coordinates.  All it knows is that there is an array of nodes and how they are connected to each other.  

graph.h
#ifndef GRAPH_H_
#define GRAPH_H_

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <limits.h>

#define MAX_GRAPH_SIZE      250
#define MAX_EDGES_PER_NODE  8
#define UNINITIALZED        UINT_MAX

typedef enum node_pos {MID_NODE, LEFT_NODE, RIGHT_NODE, BOTTOM_NODE} NodePos;

typedef struct Graph {
  unsigned int rows;
  unsigned int cols;
  unsigned int size;
  unsigned int edges[MAX_GRAPH_SIZE][MAX_EDGES_PER_NODE];
} graph_t;

void createGraph(graph_t *graph, unsigned int rows, unsigned int cols);
void createEdge(graph_t *graph, int src, int dest);
NodePos checkNode(int rows, int cols, unsigned int index);

#endif //GRAPH_H_


graph.c
#include "graph.h"

void createGraph(graph_t *graph, unsigned int rows, unsigned int cols)
{
  unsigned int i, j = 0;
  unsigned int n;
  unsigned int c;
  NodePos current_node_pos;

  // Save dimensions
  graph->rows = rows;
  graph->cols = cols;
  n = graph->rows * graph->cols;
  graph->size = n;

  // Initialize edges for each node
  for(i=0; i < MAX_GRAPH_SIZE; i++)
  {
    for(j = 0; j < MAX_EDGES_PER_NODE; j++)
    {
      graph->edges[i][j] = UNINITIALZED;
    }
  }

  // Column offset used below
  c = graph->cols;

  // Create edges between nodes
  for(i=0; i < n; i++)
  {
    current_node_pos = checkNode(graph->rows, graph->cols, i);
    switch(current_node_pos)
    {
      case MID_NODE:
        // Contains 8 edges
        createEdge(graph, i, i-1);
        createEdge(graph, i, i+1);
        createEdge(graph, i, i+c);
        createEdge(graph, i, (i+c)-1);
        createEdge(graph, i, (i+c)+1);
        createEdge(graph, i, i-c);
        createEdge(graph, i, (i-c)-1);
        createEdge(graph, i, (i-c)+1);
        break;
      case LEFT_NODE:
        // Contains 5 edges
        createEdge(graph, i, i+1);
        createEdge(graph, i, i+c);
        createEdge(graph, i, (i+c)+1);
        createEdge(graph, i, i-c);
        createEdge(graph, i, (i-c)+1);
        break;
      case RIGHT_NODE:
        // Contains 5 edges
        createEdge(graph, i, i-1);
        createEdge(graph, i, i+c);
        createEdge(graph, i, (i+c)-1);
        createEdge(graph, i, i-c);
        createEdge(graph, i, (i-c)-1);
        break;
    }
  }
}

void createEdge(graph_t *graph, int src, int dest)
{
  unsigned int idx = 0;

  // Ignore requested edge if it is out of bounds
  if(dest < 0 || dest >= graph->size)
  {
    return;
  }

  // Get to the first unused entry in the array
  while(graph->edges[src][idx] != UNINITIALZED && idx < MAX_EDGES_PER_NODE)
  {
    idx++;
  }

  graph->edges[src][idx] = dest;
}

NodePos checkNode(int rows, int cols, unsigned int index)
{

  if(index % cols == 0)
  {
    return LEFT_NODE;
  }
  else if(index % cols == cols-1)
  {
    return RIGHT_NODE;
  }
  else
  {
    return MID_NODE;
  }

}


Map Layer

The map layer builds on top of the graph layer and maps real world coordinates like (-1, 3) onto the underlying graph nodes.  Whereas the graph nodes contain no information other than their connections, the nodes in the map layer contain an x,y coordinate and the obstruction status at that location.  The outer map structure also contains the current destination and the last shortest path calculated for it.  

So then the 3x3 example from above at the map layer would like the figure below, where a red node is obstructed and a green node is clear.  

If a robot wanted to travel from (0, 0) to (0, 2) and sees that (0,1) has been obstructed, Dijkstra's algorithm would run and result in (0, 0) → (1, 1) → (0, 2).  

map.h
#ifndef MAP_H_
#define MAP_H_

#include <stdio.h>
#include <stdbool.h>
#include <stdint.h>
#include <limits.h>

#include "graph.h"

// Map builds on the underlying graph by mapping real coordinates to the nodes
#define MAX_MAP_SIZE  MAX_GRAPH_SIZE
#define INF           UINT_MAX
#define UNDEFINED     UINT_MAX

typedef struct MapNode {
  // Map node contains real coordinates
  int n;
  int x;
  int y;
  bool traversable;             // Permanent condition ex: walls
  bool obstructed;              // Temporary condition ex: person walking by
  unsigned int *graph_node;     // Link to the associated graph node list
} map_node_t;

typedef struct PathNode {
  // Points to a specific node of the map
  map_node_t *node;
  // Points to the next node in the path
  struct PathNode *next;
} path_t;

typedef struct Map {
  // Coordinate system
  int left_bound;
  int right_bound;
  int upper_bound;
  int lower_bound;

  unsigned int rows;
  unsigned int cols;
  unsigned int size;

  // nodes mirrors the underlying graph and can be used to lookup the mode nodes
  // from the indices resulting from a graph search
  map_node_t nodes[MAX_MAP_SIZE];
  map_node_t *origin;
  map_node_t *destination;

  // Array of pointers to keep track of the optimal path
  map_node_t *path[MAX_MAP_SIZE];
  unsigned int path_length;

  // Underlying graph structure
  graph_t graph;
} map_t;

// Map functions
void createMap(map_t *map, int left, int right, int down, int up);
void addMapNode(map_t *map, int x, int y);
map_node_t* getNodeAt(map_t *map, int x, int y);
void setOrigin(map_t *map, int x, int y);
void setDestination(map_t *map, int x, int y);
void setTraversableStatus(map_t *map, int x, int y, bool traversable);
void setObstructionStatus(map_t *map, int x, int y, bool obstructed);
void clearObstructions(map_t *map);
int8_t findOptimalPath(map_t *map);

// Utility functions
int8_t dijkstra(map_t *map, map_node_t *src, map_node_t *dest, unsigned int *prev);
unsigned int distance(map_node_t *p1, map_node_t *p2);
unsigned int find_min(bool *unvisited_nodes, unsigned int *distances);
unsigned int find_next_unvisited(bool *unvisited_nodes, unsigned int *graph_nodes, unsigned int start);
bool isPathObstructed(map_t *map);

#endif //MAP_H_


map.c
#include "map.h"

/* Map functions */

void createMap(map_t *map, int left, int right, int down, int up)
{
  unsigned int i;
  int x, y;

  map->left_bound = left;
  map->right_bound = right;
  map->lower_bound = down;
  map->upper_bound = up;

  // Keep track of size
  map->rows = (map->upper_bound - map->lower_bound) + 1;
  map->cols = (map->right_bound - map->left_bound) + 1;
  map->size = map->rows * map->cols;

  // Initialize navigation parameters
  map->origin = NULL;
  map->destination = NULL;
  map->path_length = 0;
  for(i = 0; i < MAX_MAP_SIZE; i++)
  {
      map->path[i] = NULL;
  }

  // Create a graph with matching dimensions
  createGraph(&map->graph, map->rows, map->cols);

  // Assign x, y values to  nodes
  for(y = down; y <= up; y++)
  {
    for(x = left; x <= right; x++)
    {
      // Create a new unobstructed map node at (x,y)
      addMapNode(map, x, y);
    }
  }
}

void addMapNode(map_t *map, int x, int y)
{
  unsigned int n = (x - map->left_bound) + (y - map->lower_bound) * map->cols;

  map->nodes[n].n = n;
  map->nodes[n].x = x;
  map->nodes[n].y = y;
  map->nodes[n].obstructed = false;
  map->nodes[n].traversable = true;
  map->nodes[n].graph_node = map->graph.edges[n];
}

map_node_t* getNodeAt(map_t *map, int x, int y)
{
  unsigned int n = (x - map->left_bound) + (y - map->lower_bound) * map->cols;

  if(n > map->size)
  {
    return NULL;
  }

  return &map->nodes[n];
}

void setOrigin(map_t *map, int x, int y)
{
  map_node_t *node = getNodeAt(map, x, y);
  map->origin = node;
}

void setDestination(map_t *map, int x, int y)
{
  map_node_t *node = getNodeAt(map, x, y);
  map->destination = node;
}

void setTraversableStatus(map_t *map, int x, int y, bool traversable)
{
  map_node_t *node = getNodeAt(map, x, y);
  node->traversable = traversable;
}

void setObstructionStatus(map_t *map, int x, int y, bool obstructed)
{
  map_node_t *node = getNodeAt(map, x, y);
  node->obstructed = obstructed;
}

void clearObstructions(map_t *map)
{
  unsigned int i;

  // Clear all obstructions from the map
  for(i = 0; i < map->size; i++)
  {
    map->nodes[i].obstructed = false;
  }
}

int8_t findOptimalPath(map_t *map)
{
  unsigned int i;
  unsigned int u = map->destination->n;
  unsigned int hops[MAX_MAP_SIZE];

  if(map->destination == NULL || map->origin == NULL)
  {
      return -1;
  }

  // Delete the last path
  for(i = 0; i < map->path_length; i++)
  {
      map->path[i] = NULL;
  }
  map->path_length = 0;

  dijkstra(map, map->origin, map->destination, hops);

  if(hops[u] != UNDEFINED || u == map->origin->n)
  {
      // Determine the path length so we can reverse iterate
      while(hops[u] != UNDEFINED)
      {
        map->path_length++;
        u = hops[u];
      }

      // Rebuild the path by reverse iteration
      u = map->destination->n;
      for(i = map->path_length; i > 0; i--)
      {
          map->path[i] = &map->nodes[u];
          u = hops[u];
      }
  }

  // Add the origin to the path for completeness
  map->path[0] = map->origin;
  map->path_length++;

  return 0;
}

/* Utility functions */

int8_t dijkstra(map_t *map, map_node_t *src, map_node_t *dest, unsigned int *prev)
{
  // Dijkstra's Algorithm
  graph_t *graph = &map->graph;

  unsigned int i;
  unsigned int alt;
  unsigned int current_index, neighbor_index;
  map_node_t *current_node, *current_neighbor;

  unsigned int remaining_nodes;

  unsigned int graph_idx;
  unsigned int *graph_edges;

  // Create set of unvisited nodes
  bool unvisited[MAX_MAP_SIZE] = {0};
  unsigned int num_unvisited = 0;

  unsigned int dist[MAX_MAP_SIZE];

  /* Initialize temporary distance values & prev node list */
  for(i = 0; i < graph->size; i++)
  {
    // Distance to all non-source nodes is initialized to infinity
    dist[i] = INF;
    // Previous nodes are undefined because none have been visited
    prev[i] = UNDEFINED;
    // Only allow a node to be visited if it's unobstructed
    if(map->nodes[i].obstructed || !map->nodes[i].traversable)
    {
      continue;
    }
    unvisited[i] = true;
    num_unvisited++;
  }
  // Distance to the intial node is 0
  dist[src->n] = 0;

  remaining_nodes = num_unvisited;
  while(remaining_nodes > 0)
  {
    // Set node index with the smallest distance as current
    current_index = find_min(unvisited, dist);
    if(current_index == UNDEFINED)
    {
      prev = NULL;
      return -1;
    }

    // Expects a pointer
    current_node = &map->nodes[current_index];

    // Mark current node visited and remove from unvisited set
    unvisited[current_index] = false;
    remaining_nodes--;

    if(current_node == dest)
    {
      // Dest has been reached and algorithm can stop
      return 0;
    }

    // Get the array of connected nodes from the graph
    graph_edges = map->graph.edges[current_index];

    // Get the first connected, unvisited node index
    graph_idx = find_next_unvisited(unvisited, graph_edges, -1);

    while(graph_idx != UNDEFINED && graph_idx < MAX_EDGES_PER_NODE)
    {
      // neighbor_index = graph_node->dest;
      neighbor_index = graph_edges[graph_idx];
      current_neighbor = &map->nodes[neighbor_index];

      // Calculate distances through current node and all unvisited neighbors
      alt = dist[current_index] + distance(current_node, current_neighbor);

      // If new distance is smaller, assign it
      if(alt < dist[neighbor_index])
      {
        dist[neighbor_index] = alt;
        prev[neighbor_index] = current_index;
      }

      // First check if we've reached the end of the list, if not see if the next edge is defined
      // Second if statement will cause a run time error if the first condition isn't checked first
      if(graph_idx >= MAX_EDGES_PER_NODE)
      {
        break;
      }
      else if(graph_edges[graph_idx + 1] == UNDEFINED)
      {
        break;
      }

      // Follow the next edge in the graph
      graph_idx = find_next_unvisited(unvisited, graph_edges, graph_idx);
    }
  }

  // If the function reaches this point, something probably went wrong
  return -1;
}

unsigned int distance(map_node_t *p1, map_node_t *p2)
{
  // Returns distance squared, only need a relative value
  int d_x = p2->x - p1->x;
  int d_y = p2->y - p1->y;
  return (unsigned int) d_x*d_x + d_y*d_y;
}

unsigned int find_min(bool *unvisited_nodes, unsigned int *distances)
{
  unsigned int current_idx;
  float current_dist;
  float min = INF;
  unsigned int index = UNDEFINED;

  for(current_idx = 0; current_idx < MAX_MAP_SIZE; current_idx++)
  {
      if(unvisited_nodes[current_idx])
      {
          current_dist = distances[current_idx];
          if(current_dist < min)
          {
              min = current_dist;
              index = current_idx;
          }
      }
  }

  return index;
}

unsigned int find_next_unvisited(bool *unvisited_nodes, unsigned int *graph_edges, unsigned int start)
{
  unsigned int edge_idx = start + 1;
  unsigned int current_graph_idx = graph_edges[edge_idx];

  while(edge_idx < MAX_EDGES_PER_NODE)
  {
    if(current_graph_idx != UNINITIALZED && unvisited_nodes[current_graph_idx])
    {
      return edge_idx;
    }

    edge_idx++;
    current_graph_idx = graph_edges[edge_idx];
  }

  return UNDEFINED;
}

bool isPathObstructed(map_t *map)
{
  unsigned int i;

  for(i = 0; i < map->path_length; i++)
  {
    if(map->path[i]->obstructed)
    {
      return true;
    }
  }

  return false;
}


Interface Layer

The interface layer is where the TI detection code interacts with the map layer.  The functions here are used to update the obstacle status of the map nodes, add the calculated path to the UART data packet, and update the map's destination using the CLI.  

The navigationInit() function is where the map and graph structure gets initialized.  The maximum detection distance of the radar is set to about 20m, but this tutorial's hypothetical robot doesn't need to see that far.  So the coordinate range for navigation is set to -10 to 10 on the x-axis and -1 to 10 on the y-axis.  he origin should always be set to (0, -1) because the robot would always start moving from whatever space is directly behind the origin of the radar.

updateMapFromRadar() loads the obstacle data from the radar into the map.  The function takes in a pointer to the detection results that reside in the mailbox message, but the actual data is still in the DSP's memory.  Because the pointer to this data was created by the DSP, the MSS must translate the address before it can access any of the data.  

You must use SOC_translateAddress(<some pointer>, SOC_TranslateAddr_Dir_FROM_OTHER_CPU, NULL) on any pointers that came from the DSP in order to access the corresponding data. This is easy to overlook, but you will get garbage results or cause other problems if you don't translate the pointer before accessing its contents.

appendPathTLV() is used to inject the path calculation results into the frame created by the DSP.  This allows the extension of TI's existing serial frame structure with little to no disruption.  As with the address translation in the last step, there is another "gotcha" here.

The totalPacketLen element of the detection results header must be a multiple of 32 bytes. The DSP rounds up to the nearest multiple, but when modifying it in the MSS you must redo this step after making any additions. You may notice data missing in your serial frames if you don't round to the proper length.


navigate.h
#ifndef NAVIGATE_H_
#define NAVIGATE_H_

#include <stdint.h>

#include <ti/drivers/soc/soc.h>
#include "../common/mmw_messages.h"

#include "map.h"

/* Pick some number not used by TI for the path TLV ID */
#define MMWDEMO_OUTPUT_MSG_BEST_PATH    42

/* Clone of the structure used by the DSP */
typedef struct detected_object
{
    int16_t   speed;        /*!< @brief Doppler index */
    uint16_t   peakVal;     /*!< @brief Peak value */
    int16_t  x;             /*!< @brief x - coordinate in meters. Q format provides the bitwidth. */
    int16_t  y;             /*!< @brief y - coordinate in meters. Q format provides the bitwidth. */
} detected_object_t;

/* Structure for transmitting path data over UART */
typedef struct path_point
{
    int8_t x;
    int8_t y;
} path_point_t;

/* Function prototypes */
void navigationInit(void);
void updateMapFromRadar(MmwDemo_detInfoMsg *detInfo);
void appendPathTLV(MmwDemo_detInfoMsg *detInfo);

/* CLI command functions */
int32_t navigation_CLIBuildMap(int32_t argc, char *argv[]);
int32_t navigation_CLISetDestination(int32_t argc, char *argv[]);

#endif /* NAVIGATE_H_*/


navigate.c
#include "navigate.h"

static map_t radar_map;
static path_point_t points[MAX_MAP_SIZE];

/* Initialize structures and calculate initial path */
void navigationInit(void)
{
  createMap(&radar_map, -9, 9, -1, 10);
  setOrigin(&radar_map, 0, -1);
  setDestination(&radar_map, -1, 4);
  findOptimalPath(&radar_map);
}

/* Update map with obstacle data from radar */
void updateMapFromRadar(MmwDemo_detInfoMsg *detInfo)
{
  uint16_t i;
  map_t *map = &radar_map;

  uint8_t *bytes_ptr;
  MmwDemo_msgTlv current_tlv;
  MmwDemo_output_message_dataObjDescr *desc;
  detected_object_t *detected_objs;
  detected_object_t current_obj;

  uint32_t num_objs;
  int16_t x, y;

  /* Get detected point data from the TLVs */
  for(i = 0; i < detInfo->header.numTLVs; i++)
  {
      current_tlv = detInfo->tlv[i];

      if(current_tlv.type == MMWDEMO_OUTPUT_MSG_DETECTED_POINTS)
      {
          /* Address has to be translated from the other core */
          bytes_ptr = (uint8_t*) SOC_translateAddress(current_tlv.address, SOC_TranslateAddr_Dir_FROM_OTHER_CPU, NULL);

          /* Extract only the detected object structure */
          desc = (MmwDemo_output_message_dataObjDescr *) bytes_ptr;
          detected_objs = (detected_object_t *) (bytes_ptr + sizeof(MmwDemo_output_message_dataObjDescr));

          num_objs = desc->numDetetedObj;

          // Don't loop any longer than needed
          break;
      }
  }

  // Sanity check
  if(num_objs > 50)
  {
      return;
  }

  // Clear the detected obstructions from the last frame
  clearObstructions(map);

  // For each object detected by the radar, mark its coordinates on the map as obstructed
  // Distances are in meters (might need to be scaled for Q format)
  // Assume that each map grid space is 1 square meter
  for(i = 0; i < num_objs; i++)
  {
    current_obj = detected_objs[i];

    /* Scale for Q format of x, y coords */
    x = current_obj.x >> desc->xyzQFormat;
    y = current_obj.y >> desc->xyzQFormat;

    /* Coordinates are simply integers for now
     * Would be helpful to handle fractional values in the future
     * Block the whole space if something is in it*/
    if(x > map->right_bound || x < map->left_bound)
    {
        continue;
    }

    if(y > map->upper_bound || y < map->lower_bound)
    {
        continue;
    }

    // Don't allow the origin to be obstructed because we're already there and the resulting path would always be null
    if(x == map->origin->x && y == map->origin->y)
    {
        continue;
    }

    setObstructionStatus(map, x, y, true);
  }

  // If the original path has been obstructed, calculate a new one
  if(isPathObstructed(map) || map->path_length <= 1)
  {
    findOptimalPath(map);
  }

}

/* Hijack the TLV structure from the DSS to transmit the path */
void appendPathTLV(MmwDemo_detInfoMsg *detInfo)
{
    unsigned int i;

    uint32_t new_tlvIdx = detInfo->header.numTLVs;
    uint32_t new_tlvLength = radar_map.path_length * sizeof(path_point_t);
    uint32_t totalPacketLen;

    /* Fill out the point array used to transmit the path */
    for(i = 0; i < radar_map.path_length; i++)
    {
        points[i].x = radar_map.path[i]->x;
        points[i].y = radar_map.path[i]->y;
    }

    /* Add the new TLV to the message */
    detInfo->tlv[new_tlvIdx].length = new_tlvLength;
    detInfo->tlv[new_tlvIdx].type = MMWDEMO_OUTPUT_MSG_BEST_PATH;
    detInfo->tlv[new_tlvIdx].address = (uint32_t) &points;

    // Increment the number of TLVs
    detInfo->header.numTLVs++;

    // Increment total packet length
    totalPacketLen = detInfo->header.totalPacketLen + new_tlvLength;

    /* Round total length up to a multiple of 32 */
    detInfo->header.totalPacketLen = MMWDEMO_OUTPUT_MSG_SEGMENT_LEN * ((totalPacketLen + (MMWDEMO_OUTPUT_MSG_SEGMENT_LEN-1))/MMWDEMO_OUTPUT_MSG_SEGMENT_LEN);
}

/* Command line functions for run-time path management */
int32_t navigation_CLIBuildMap(int32_t argc, char *argv[])
{
    int xmin, xmax, ymin, ymax;

    if(argc != 5)
    {
        return -1;
    }

    xmin = (int) atoi(argv[1]);
    xmax = (int) atoi(argv[2]);
    ymin = (int) atoi(argv[3]);
    ymax = (int) atoi(argv[4]);

    createMap(&radar_map, xmin, xmax, ymin, ymax);
    return 0;
}

int32_t navigation_CLISetDestination(int32_t argc, char *argv[])
{
    int x, y;

    if(argc != 3)
    {
        return -1;
    }

    x = (int) atoi(argv[1]);
    y = (int) atoi(argv[2]);

    setDestination(&radar_map, x, y);
    findOptimalPath(&radar_map);
    return 0;
}


Visualization

In the absence of a physical robot, a plot will be used to visualize the path the robot will follow.  TI implements their GUI using MATLAB and provides a compiled version of the scripts.  This isn't easy to modify if you don't have a MATLAB license.  Therefore, this tutorial will use the free and easily accessible Python to achieve a similar result.  An example of the visualization results is shown below.  

Frame Structure

At the lowest level of the Python program, we mirror the structures used by the embedded device to translate the binary data into a more human readable form.  frames.py defines the Frame class which represents one whole packet from the radar.  The input to the class constructor is the whole byte string received from the serial port.  The string is then parsed into the various parameters of the header and each of the included TLV (Type Length Value) data vectors using struct.unpack to convert the serialized C structures into a Python tuple of the structure's elements.  

Serial frames from the AWR1642 are in Little Endian format and need to be converted accordingly.

frames.py
import struct


class FrameError(Exception):
    def __init__(self, value):
        self.value = value


class TLVData:
    def __init__(self, serial_tlv):
        pass

    @staticmethod
    def preparsing(serial_tlv):
        # Only used if there is extra data in a frame outside of the regular objects
        return None, serial_tlv

    def __str__(self):
        result = ''
        for key in self.__dict__.keys():
            result += '{}: {}\n'.format(key, self.__dict__[key])
        return result


class DataObjDescriptor:
    def __init__(self, serial_tlv):
        elements = struct.unpack('<2h', serial_tlv)
        self.numDetectedObj = elements[0]
        self.xyzQFormat = elements[1]


class DetectedPoints(TLVData):
    NAME = 'DETECTED_POINTS'
    SIZE_BYTES = 8

    def __init__(self, serial_tlv):
        super(DetectedPoints, self).__init__(serial_tlv)
        # Unpack elements from the structure
        elements = struct.unpack('<hHhh', serial_tlv)
        self.speedIdx = elements[0]
        self.peakVal = elements[1]
        self.x = elements[2]
        self.y = elements[3]

    @staticmethod
    def preparsing(serial_tlv):
        # Strip off the 4 byte description header
        descriptor = DataObjDescriptor(serial_tlv[0:4])
        stripped_tlv = serial_tlv[4:]
        return descriptor, stripped_tlv


class ClusteringResults(TLVData):
    NAME = 'CLUSTERING_RESULTS'
    SIZE_BYTES = 8

    def __init__(self, serial_tlv):
        super(ClusteringResults, self).__init__(serial_tlv)
        # Unpack elements from the structure
        elements = struct.unpack('<4h', serial_tlv)
        self.xCenter = elements[0]
        self.yCenter = elements[1]
        self.xSize = elements[2]
        self.ySize = elements[3]

    @staticmethod
    def preparsing(serial_tlv):
        # Strip off the 4 byte description header
        descriptor = DataObjDescriptor(serial_tlv[0:4])
        stripped_tlv = serial_tlv[4:]
        return descriptor, stripped_tlv


class TrackedObjects(TLVData):
    NAME = 'TRACKED_OBJECTS'
    SIZE_BYTES = 12

    def __init__(self, serial_tlv):
        super(TrackedObjects, self).__init__(serial_tlv)
        # Unpack elements from the structure
        elements = struct.unpack('<6h', serial_tlv)
        self.x = elements[0]
        self.y = elements[1]
        self.xd = elements[2]
        self.yd = elements[3]
        self.xsize = elements[4]
        self.ysize = elements[5]

    @staticmethod
    def preparsing(serial_tlv):
        # Strip off the 4 byte description header
        descriptor = DataObjDescriptor(serial_tlv[0:4])
        stripped_tlv = serial_tlv[4:]
        return descriptor, stripped_tlv


class ParkingAssist(TLVData):
    NAME = 'PARKING_ASSIST'
    SIZE_BYTES = 2

    def __init__(self, serial_tlv):
        super(ParkingAssist, self).__init__(serial_tlv)
        # elements = struct.unpack('h', serial_tlv)

    @staticmethod
    def preparsing(serial_tlv):
        # Strip off the 4 byte description header
        descriptor = DataObjDescriptor(serial_tlv[0:4])
        stripped_tlv = serial_tlv[4:]
        return descriptor, stripped_tlv


class BestPath(TLVData):
    NAME = 'BEST_PATH'
    SIZE_BYTES = 2

    def __init__(self, serial_tlv):
        super(BestPath, self).__init__(serial_tlv)
        elements = struct.unpack('<2b', serial_tlv)
        self.x = elements[0]
        self.y = elements[1]


# Map type ID to the corresponding class
TLV_TYPES = {
    # Obstacle detection
    1: DetectedPoints,
    2: ClusteringResults,
    3: TrackedObjects,
    4: ParkingAssist,
    # Custom
    42: BestPath,
}


class TLVHeader:
    SIZE_BYTES = 8

    def __init__(self, serial_tlv_header):
        elements = struct.unpack('<2I', serial_tlv_header)
        self.type = elements[0]
        self.length = elements[1]

    def __str__(self):
        result = 'TLV Header:\n'
        for key in self.__dict__.keys():
            result += '{}: {}\n'.format(key, self.__dict__[key])
        return result


class TLV:
    def __init__(self, serial_tlv):
        self.header = TLVHeader(serial_tlv[0:TLVHeader.SIZE_BYTES])

        # Lookup constructor for the specific type of object
        self.obj_class = TLV_TYPES[self.header.type]
        self.name = self.obj_class.NAME
        # Note this size is PER OBJECT
        self.obj_size = self.obj_class.SIZE_BYTES

        # Strip off excess headers before parsing objects
        headerless_tlv = serial_tlv[TLVHeader.SIZE_BYTES:]
        self.descriptor, processed_tlv = self.obj_class.preparsing(headerless_tlv)
        try:
            self.objects = self.parse_objects(processed_tlv)
        except struct.error as e:
            # Save whole frame from failing if one TLV is bad
            print('Exception while parsing TLV objects: ', e)
            self.objects = []

    def __str__(self):
        result = str(self.header) + 'Name: {}\n'.format(self.name)
        for each in self.objects:
            result += str(each)
        return result

    def parse_objects(self, serial_tlv):
        objects = []
        num_objects = int(self.header.length / self.obj_size)
        for i in range(0, num_objects):
            new = self.obj_class(serial_tlv[0:self.obj_size])
            objects.append(new)
            serial_tlv = serial_tlv[self.obj_size:]

        return objects


class FrameHeader:
    def __init__(self, serial_header):
        self.full_header = serial_header

    def __str__(self):
        result = 'Frame Header:\n'
        for key in self.__dict__.keys():
            result += '{}: {}\n'.format(key, self.__dict__[key])

        return result

    def verify_checksum(self):
        pass


class ShortRangeRadarFrameHeader(FrameHeader):
    SIZE_BYTES = 40
    PACKET_LENGTH_END = 16

    def __init__(self, serial_header):
        super().__init__(serial_header)
        self.sync = serial_header[0:8]
        self.version = serial_header[8:12]
        self.packetLength = struct.unpack('<I', serial_header[12:16])[0]
        self.platform = serial_header[16:20]

        values = struct.unpack('<5I', serial_header[20:40])
        self.frameNumber = values[0]
        self.timeCpuCycles = values[1]
        self.numDetectedObj = values[2]
        self.numTLVs = values[3]
        self.subFrameNumber = values[4]


class Frame:
    FRAME_START = b'\x02\x01\x04\x03\x06\x05\x08\x07'

    def __init__(self, serial_frame, frame_type):
        # Parse serial data into header and TLVs
        # Note that frames are LITTLE ENDIAN

        # Length should be > header_size
        frame_length = len(serial_frame)
        if frame_length < frame_type.SIZE_BYTES:
            raise FrameError('Frame is smaller than required header size. '
                             'Expected length {}. Measured length {}.'.format(frame_type.SIZE_BYTES, frame_length))

        # Initialize the header
        self.header = frame_type(serial_frame[0:frame_type.SIZE_BYTES])

        # Second sanity check
        if frame_length < self.header.packetLength:
            raise FrameError('Frame is too small. Expected {} bytes, '
                             'receieved {} bytes.'.format(self.header.packetLength, frame_length))

        # Convert remaining data into TLVs
        full_tlv_data = serial_frame[frame_type.SIZE_BYTES:]
        tlv_data = full_tlv_data
        self.tlvs = []
        for i in range(self.header.numTLVs):
            # Check header to get length of each TLV
            length = TLVHeader.SIZE_BYTES + TLVHeader(tlv_data[0:TLVHeader.SIZE_BYTES]).length
            # Create a 'length' bytes TLV instance
            new_tlv = TLV(tlv_data[0:length])
            self.tlvs.append(new_tlv)

            # Slice off the consumed TLV data
            tlv_data = tlv_data[length:]

    def __str__(self):
        # Print header followed by TLVs
        result = "START FRAME\n"
        result += str(self.header)
        result += 'TLVs: {\n'
        for each in self.tlvs:
            result += str(each)
        result += '}\n'
        result += 'END FRAME\n'
        return result


Serial Interface

The next level is the actual serial interface, implemented using PySerial.  The interface opens the control and data serial ports of the radar board and reads the data into queues.  Each COM port gets its own Tx and Rx queues and its own thread so that they can run independently from the rest of the program. 

Using the data provided in frames.py, the interface checks for the start of frame "magic word."  When a new frame is detected, a number of bytes specified by the frame type are read until the total packet length is received.   The interface reads up to the total packet length and places the resulting frame into the receive queue to be processed by the main application.  

serial_interface.py
import serial
import time
import struct
from multiprocessing import Queue
from threading import Thread

from frames import Frame


class SerialInterface:
    def __init__(self, user_port, data_port, frame_type):
        self.control_uart = serial.Serial(user_port, 115200)
        self.data_uart = serial.Serial(data_port, 921600)
        self.frame_type = frame_type

        self.uarts_enable = False

        # Use Queues to transfer data between processes
        self.control_tx_queue = Queue()
        self.control_rx_queue = Queue()
        self.data_tx_queue = Queue()
        self.data_rx_queue = Queue()

        # Spawn processes to handle serial data transfer in the background
        self.control_process = Thread(target=self.process_control_uart)
        self.data_process = Thread(target=self.process_data_uart)

    def process_control_uart(self):
        while self.uarts_enable:
            self._read_serial_lines(self.control_uart, self.control_rx_queue)
            self._write_serial(self.control_uart, self.control_tx_queue)

            # Sleep briefly to allow other threads to run and data to arrive
            time.sleep(0.001)

    def process_data_uart(self):
        while self.uarts_enable:
            self._read_serial_frames(self.data_uart, self.data_rx_queue, self.frame_type)
            self._write_serial(self.data_uart, self.data_tx_queue)

            # Sleep briefly to allow other threads to run and data to arrive
            time.sleep(0.001)

    @staticmethod
    def _read_serial_lines(ser_inst, rx_q):
        # Read a byte at a time until a \n is reached
        byte_data = b''
        while ser_inst.in_waiting:
            byte = ser_inst.read(1)
            byte_data += byte
            if byte == '\n'.encode():
                break

        if byte_data:
            data = byte_data.decode()
            rx_q.put(data)

    @staticmethod
    def _read_serial_frames(ser_inst, rx_q, frame_type):
        data = b''
        bytes_read = 0
        sync = False
        while ser_inst.in_waiting:
            byte = ser_inst.read(1)
            data += byte

            if not sync and Frame.FRAME_START in data:
                # Start of frame found, discard extra data
                sync = True
                sync_data = data[data.find(Frame.FRAME_START):]
                bytes_read += len(Frame.FRAME_START)

                # Read bytes until the packet length is received
                length_data = ser_inst.read(frame_type.PACKET_LENGTH_END - len(Frame.FRAME_START))
                bytes_read += len(length_data)
                packet_length = struct.unpack('<I', length_data[-4:])[0]
                if packet_length > 5000:
                    print("Packet length is {} which seems too large.  Skipping...".format(packet_length))
                    break

                # Read remaining bytes specified by the header
                remaining_data = ser_inst.read(packet_length - bytes_read)

                full_frame = sync_data + length_data + remaining_data

                rx_q.put(full_frame)

    @staticmethod
    def _write_serial(ser_inst, tx_q):
        if not tx_q.empty():
            data = tx_q.get()
            data = data.encode()

            ser_inst.write(data)

    @staticmethod
    def send_item(tx_q, data):
        # Send one item (typically a line ending in '\n') to the tx buffer
        tx_q.put(data)

    @staticmethod
    def recv_item(rx_q):
        # Fetch one line or frame from the rx buffer
        return rx_q.get() if not rx_q.empty() else None

    def start(self):
        self.uarts_enable = True

        self.control_process.start()
        self.data_process.start()

    def stop(self):
        self.uarts_enable = False

        self.control_process.join()
        self.data_process.join()

Plotting

Detected points and path results are plotted using Matplotlib.  Regions that have obstacles in them are marked red, while the regions along the path are marked in green.  Each region is a 1m square because the resolution of the map was earlier defined to be 1m.  The boxed regions make it easier to see that a map node of (0, 0) corresponds to the real world square area between 0 and 1. 

Matplotlib's animation API is used to periodically update and draw any changes to the plot.  Finally the data from the radar and robot receive some final formatting before it is plotted.  

nav_plot.py
import numpy as np
import time

import matplotlib
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from matplotlib import patches
from matplotlib.collections import PatchCollection

from math import tan, radians


class NavPlot:
    def __init__(self, radar_bounds, radar_queue_in=None):
        matplotlib.rcParams['toolbar'] = 'None'

        # Copy all the bounding data
        self.radar_left_bound = radar_bounds[0]
        self.radar_right_bound = radar_bounds[1]
        self.radar_back_bound = radar_bounds[2]
        self.radar_front_bound = radar_bounds[3]

        # Queues to receive data
        self.radar_queue = radar_queue_in

        # Initialize plots
        self.fig = plt.figure()
        self.radar_ax = self.fig.add_subplot(111)
        self.radar_plot_init()

        # Initialize the actual plot with empty data
        self.obj_scatter = self.radar_ax.scatter([], [], s=60, marker='o')
        self.radar_path_plot, = self.radar_ax.plot([], [], 'go--')

        # Save results from previous update in case there is no new radar data ready, keeps plot smooth
        self.prev_artists = []

        # Set refresh rate to ~60 Hz
        self.ani = FuncAnimation(self.fig, self.update, interval=16, blit=True)

    def radar_plot_init(self):
        self.radar_ax.set_title('Radar Plot')
        self.radar_ax.set_aspect('equal')
        self.radar_ax.set_xlabel('X (m)')
        self.radar_ax.set_ylabel('Y (m)')
        self.radar_ax.set_xlim(self.radar_left_bound, self.radar_right_bound)
        self.radar_ax.set_ylim(self.radar_back_bound, self.radar_front_bound)

        # Invert axes to match current positioning
        # self.radar_ax.invert_yaxis()
        # self.radar_ax.invert_xaxis()
        # self.radar_ax.xaxis.set_label_position('top')
        # self.radar_ax.xaxis.tick_top()

        # Configure grid
        minor_xticks = np.arange(self.radar_left_bound, self.radar_right_bound, 0.5)
        minor_yticks = np.arange(self.radar_back_bound, self.radar_front_bound, 0.5)
        self.radar_ax.set_xticks(minor_xticks, minor=True)
        self.radar_ax.set_yticks(minor_yticks, minor=True)
        self.radar_ax.grid(True, which='both')

        # Draw an section to make the graph look radar-y
        x_size = abs(self.radar_left_bound) + abs(self.radar_right_bound)
        y_size = 2 * abs(self.radar_front_bound)
        arc = patches.Arc((0, 0), x_size, y_size, 0, theta1=30, theta2=150)

        half_rad = self.radar_front_bound / 2
        line_x = half_rad * tan(radians(60))
        left_line = patches.ConnectionPatch((0.5, 0), (-line_x, half_rad), coordsA='data', coordsB='data')
        right_line = patches.ConnectionPatch((0.5, 0), (line_x, half_rad), coordsA='data', coordsB='data')

        self.radar_ax.add_patch(arc)
        self.radar_ax.add_patch(left_line)
        self.radar_ax.add_patch(right_line)

    def update_obstacles(self):
        # Keep track of things that have changed for the animation
        modified_artists = []

        # Obstacle detection data
        radar_data = self.radar_queue.get()
        if len(radar_data) == 0:
            # Keep data from last frame so plot doesn't get choppy
            modified_artists = self.prev_artists
        else:
            # Process each TLV type
            if 'DETECTED_POINTS' in radar_data.keys():
                detections = radar_data['DETECTED_POINTS']
                self.obj_scatter.set_offsets(detections[:, 0:2])
                modified_artists.append(self.obj_scatter)

                # Create shaded regions to mark blocked areas
                boxes = []
                for pt in detections:
                    xmin = np.floor(pt[0])
                    ymin = np.floor(pt[1])

                    box = patches.Rectangle((xmin, ymin), width=1.0, height=1.0, fill=True)
                    boxes.append(box)

                detect_pc = PatchCollection(boxes, facecolor='r', alpha=0.2)
                self.radar_ax.add_collection(detect_pc)
                modified_artists.append(detect_pc)

            if 'CLUSTERING_RESULTS' in radar_data.keys():
                clusters = radar_data['CLUSTERING_RESULTS']

                for cluster in clusters:
                    xmin = cluster[0] - (cluster[1] / 2)
                    width = cluster[1]
                    ymin = cluster[2] - (cluster[3] / 2)
                    height = cluster[3]
                    rect = patches.Rectangle((xmin, ymin), width=width, height=height, color='#99ff33', fill=False)

                    # Do the rectangles individually to preserve the fill=False flag (does't work with PatchCollection)
                    self.radar_ax.add_patch(rect)
                    modified_artists.append(rect)

            if 'BEST_PATH' in radar_data.keys():
                path = radar_data['BEST_PATH']
                self.radar_path_plot.set_data(path[:, 0], path[:, 1])
                modified_artists.append(self.radar_path_plot)

                # Draw the origin in a different color
                xmin = np.floor(path[0][0])
                ymin = np.floor(path[0][1])
                box = patches.Rectangle((xmin, ymin), width=1.0, height=1.0, fill=False, color='b')
                self.radar_ax.add_patch(box)
                modified_artists.append(box)

                boxes = []
                for pt in path[1:]:
                    xmin = np.floor(pt[0])
                    ymin = np.floor(pt[1])

                    box = patches.Rectangle((xmin, ymin), width=1.0, height=1.0, fill=False)
                    boxes.append(box)

                path_pc = PatchCollection(boxes, facecolors='g', alpha=0.5)
                self.radar_ax.add_collection(path_pc)
                modified_artists.append(path_pc)

        self.prev_artists = modified_artists
        return modified_artists

    def update(self, frame):
        # Keep track of things that have changed for the animation
        modified_artists = []

        # Add any changes from either the obstacle detection or robot movement
        modified_artists.extend(self.update_obstacles())

        return modified_artists

    def show(self):
        plt.tight_layout()
        plt.show()


Main Application

The main application connects everything together.  Its job is to take data from the serial interface, convert into a Frame instance, and send the Frame's data to the plot.  To make sure everything runs together, the main application creates a separate thread to handle the reading and processing of radar frames.  The thread will also update the destination for navigation to a new random location every 15 seconds.  It also creates the queues that are used to pass data between all the threads.  Finally, it creates the plot which runs in the main thread.  The demo runs until the plot window is closed.  

mmwave_nav.py
import time
import struct
import numpy as np
from threading import Thread
from multiprocessing import Queue

from serial_interface import SerialInterface
# Use the correct frame format for the firmware
from frames import Frame, ShortRangeRadarFrameHeader, FrameError
from nav_plot import NavPlot
from cli_utils import *

# The SRR firmware is configured for ~20m range with 120 degree viewing angle, about half range should be fine
radar_bounds = (-10, 10, -1, 10)


def generate_new_destination():
    dist = 0
    new_x = 0
    new_y = 0
    while dist < 2:
        new_x = np.random.randint(radar_bounds[0], radar_bounds[1], dtype=np.int8)
        new_y = np.random.randint(0, radar_bounds[3], dtype=np.int8)

        dist = np.sqrt(np.power(new_x, 2) + np.power(new_y, 2))

    return new_x, new_y


def randomize_destination(last_time, period, serial_inst):
    now = time.time()
    if now - last_time > period:
        x, y = generate_new_destination()
        send_destination(serial_inst, x, y)
        return now
    else:
        return last_time


def process_frame(serial_inst, plot_queue):
    # Get a frame from the data queue and parse
    last_dest_update = 0
    while serial_inst.uarts_enable:
        # Update to a new random destination every 15 seconds
        last_dest_update = randomize_destination(last_dest_update, 15, serial_inst)

        serial_frame = serial_inst.recv_item(serial_inst.data_rx_queue)
        if serial_frame:
            try:
                frame = Frame(serial_frame, frame_type=serial_inst.frame_type)
                results = dict()

                # Frames that don't contain the parking assist data only have 1-2 points
                # Plotting them makes the graph run choppily so just ignore them
                if 'PARKING_ASSIST' in [tlv.name for tlv in frame.tlvs]:
                    # There can be at most one of each type of TLV in the frame
                    for tlv in frame.tlvs:
                        objs = tlv.objects

                        if tlv.name == 'DETECTED_POINTS':
                            tuples = [(float(obj.x), float(obj.y)) for obj in objs]
                            coords = np.array(tuples) / 2 ** tlv.descriptor.xyzQFormat
                            results['DETECTED_POINTS'] = coords

                        elif tlv.name == 'CLUSTERING_RESULTS':
                            tuples = [(float(obj.xCenter), float(obj.xSize),
                                       float(obj.yCenter), float(obj.ySize)) for obj in objs]
                            data = np.array(tuples) / 2 ** tlv.descriptor.xyzQFormat
                            data = np.around(data, 3)
                            results['CLUSTERING_RESULTS'] = data

                        elif tlv.name == 'BEST_PATH':
                            tuples = [(obj.x, obj.y) for obj in objs]
                            # For plotting, offset path by 0.5 so it plots the points in the center of each 1m block
                            path = np.array(tuples) + 0.5
                            if path.size == 0:
                                results['BEST_PATH'] = np.array([[0, 0]])
                            else:
                                results['BEST_PATH'] = path

                    plot_queue.put(results)
            except (KeyError, struct.error, IndexError, FrameError, OverflowError) as e:
                # Some data got in the wrong place, just skip the frame
                print('Exception occured: ', e)
                print("Skipping frame due to error...")

        # Sleep to allow other threads to run
        time.sleep(0.001)


def run_demo(control_port, data_port, reconfig=False):
    # Open serial interface to the device
    # Specify which frame/header structure to search for
    interface = SerialInterface(control_port, data_port, frame_type=ShortRangeRadarFrameHeader)
    interface.start()

    # Write configs to device and start the sensor
    if reconfig:
        print("Sending configuration command...")
        interface.send_item(interface.control_tx_queue, 'advFrameCfg\n')
        time.sleep(3)

        print("Starting sensor...")
        interface.send_item(interface.control_tx_queue, 'sensorStart\n')

    # Create queues that will be used to transfer data between processes
    radar2plot_queue = Queue()

    # Create a thread to parse the frames
    # Sends data to both the plot and the robot
    processing_thread = Thread(target=process_frame, args=(interface, radar2plot_queue,))
    processing_thread.start()

    # Plot instance
    # Receives data from the processing and robot threads
    my_plot = NavPlot(radar_bounds, radar2plot_queue)
    my_plot.show()

    # When the plot is closed, stop all the threads and safely end the program
    print("Shutting down...")

    interface.send_item(interface.control_tx_queue, 'sensorStop\n')
    interface.stop()
    processing_thread.join()


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(description='Run mmWave navigation visualization demo.')
    parser.add_argument('control_port', help='COM port for configuration, control')
    parser.add_argument('data_port', help='COM port for radar data transfer')
    parser.add_argument('--reconfig', help='Send configuration settings to radar', action='store_true')
    args = parser.parse_args()

    run_demo(args.control_port, args.data_port, args.reconfig)



Running the Demo

  1. Download the Project Source Files from the Resources section above.
  2. Complete the Pre-Requisite and Debugging steps from above.
  3. In Code Composer Studio, right-click on the srrdemo_16xx_mss and select Import → Import... → General → File System.
  4. Import everything from the downloaded source files in step 1.  
  5. Overwrite the existing TI source files when asked.  
  6. Select both the projects, right-click then select Rebuild Project to clean and build.  
  7. Program the board using the debug instructions from the lab user's manual or by using the resulting .bin file from the last step and Uniflash.  Once programmed, the board will wait to be configured.
  8. Install required Python packages.  Preferably inside a virtual environment.  

    pip install
    cd <path to downloaded source files>
    pip install -r requirements.txt
  9. To run the visualizer, connect the board to a PC and specify the serial ports it creates.  The --reconfig flag is only required if the AWR1642 has not already been started.  If you only stop and re-run the Python script, you don't need to reconfigure the board.  

    Run Demo
    cd <path to downloaded source files>
    python mmwave_nav.py <Control Port> <Data Port> --reconfig

Conclusion

This tutorial has walked through the steps to get started developing with Texas Instruments' XWR1642 mmWave sensor kits by building a robot navigation application on top of TI's examples.  Although it is a fairly simple path planning implementation, it should provide a starting point for those looking to incorporate mmWave radar into their own projects.  The next course of action would be integrating this into an actual robot and improving the granularity of the path results instead of the naive 1m square approach.  

If you have any questions or would like to provide feedback, please visit Digi-Key's TechForum.  

  • No labels