Dissecting the rainbow

By tom, March 5, 2018

If you’re entering this year’s PiWars competition you might have encountered some example code, originally from PiBorg, intended to get you going with the ‘Over the Rainbow’ challenge. If you’re not familiar with OpenCV, however, you might find this code a bit baffling. So, in this post I’m going to go line-by-line through the image processing part of the code – the bit that starts with an image from your Pi camera or webcam and outputs a size and location of the largest coloured blob (of a selected colour) in that image.

Note – I’d probably implement this differently; I’d certainly structure it differently, and I don’t agree with the naming conventions PiBorg use in their code – they’re contrary to good Python style, but this is what you’ve been given as an example so this is what I’m going to show! This is taken from the PiWars website here.

Original example code

This is what you’ve got:

#!/usr/bin/env python
# coding: Latin

# Load library functions we want
import time
import os
import sys
# import ThunderBorg
import io
import threading
import picamera
import picamera.array
import cv2
import numpy

print('Libraries loaded')

# Global values
global running
# global TB
global camera
global processor
global debug
global colour

running = True
debug = True
colour = 'blue'

# Setup the ThunderBorg
# TB = ThunderBorg.ThunderBorg()
# TB.i2cAddress = 0x15                  # Uncomment and change the value if you have changed the board address
# TB.Init()
##if not TB.foundChip:
##    boards = ThunderBorg.ScanForThunderBorg()
##    if len(boards) == 0:
##        print('No ThunderBorg found, check you are attached :)'
##    else:
##        print('No ThunderBorg at address %02X, but we did find boards:' % (TB.i2cAddress)
##        for board in boards:
##            print('    %02X (%d)' % (board, board)
##        print('If you need to change the I²C address change the setup line so it is correct, e.g.'
##        print('TB.i2cAddress = 0x%02X' % (boards[0])
##    sys.exit()
##TB.SetCommsFailsafe(False)

# Power settings
##voltageIn = 12.0                        # Total battery voltage to the ThunderBorg
##voltageOut = 12.0 * 0.95                # Maximum motor voltage, we limit it to 95% to allow the RPi to get uninterrupted power

# Camera settings
imageWidth = 320  # Camera image width
imageHeight = 240  # Camera image height
frameRate = 3  # Camera image capture frame rate

# Auto drive settings
autoMaxPower = 1.0  # Maximum output in automatic mode
autoMinPower = 0.2  # Minimum output in automatic mode
autoMinArea = 10  # Smallest target to move towards
autoMaxArea = 10000  # Largest target to move towards
autoFullSpeedArea = 300  # Target size at which we use the maximum allowed output

# Setup the power limits
##if voltageOut > voltageIn:
##    maxPower = 1.0
##else:
##    maxPower = voltageOut / float(voltageIn)
##autoMaxPower *= maxPower

# Image stream processing thread
class StreamProcessor(threading.Thread):
    def __init__(self):
        super(StreamProcessor, self).__init__()
        self.stream = picamera.array.PiRGBArray(camera)
        self.event = threading.Event()
        self.terminated = False
        self.start()
        self.begin = 0

    def run(self):
        # This method runs in a separate thread
        while not self.terminated:
            # Wait for an image to be written to the stream
            if self.event.wait(1):
                try:
                    # Read the image and do some processing on it
                    self.stream.seek(0)
                    self.ProcessImage(self.stream.array, colour)
                finally:
                    # Reset the stream and event
                    self.stream.seek(0)
                    self.stream.truncate()
                    self.event.clear()

    # Image processing function
    def ProcessImage(self, image, colour):
        # View the original image seen by the camera.
        if debug:
            cv2.imshow('original', image)
            cv2.waitKey(0)

        # Blur the image
        image = cv2.medianBlur(image, 5)
        if debug:
            cv2.imshow('blur', image)
            cv2.waitKey(0)

        # Convert the image from 'BGR' to HSV colour space
        image = cv2.cvtColor(image, cv2.COLOR_RGB2HSV)
        if debug:
            cv2.imshow('cvtColour', image)
            cv2.waitKey(0)

        # We want to extract the 'Hue', or colour, from the image. The 'inRange'
        # method will extract the colour we are interested in (between 0 and 180)
        # In testing, the Hue value for red is between 95 and 125
        # Green is between 50 and 75
        # Blue is between 20 and 35
        # Yellow is... to be found!
        if colour == "red":
            imrange = cv2.inRange(image, numpy.array((95, 127, 64)), numpy.array((125, 255, 255)))
        elif colour == "green":
            imrange = cv2.inRange(image, numpy.array((50, 127, 64)), numpy.array((75, 255, 255)))
        elif colour == 'blue':
            imrange = cv2.inRange(image, numpy.array((20, 64, 64)), numpy.array((35, 255, 255)))

        # I used the following code to find the approximate 'hue' of the ball in
        # front of the camera
        #        for crange in range(0,170,10):
        #            imrange = cv2.inRange(image, numpy.array((crange, 64, 64)), numpy.array((crange+10, 255, 255)))
        #            print(crange)
        #            cv2.imshow('range',imrange)
        #            cv2.waitKey(0)
        
        # View the filtered image found by 'imrange'
        if debug:
            cv2.imshow('imrange', imrange)
            cv2.waitKey(0)

        # Find the contours
        contourimage, contours, hierarchy = cv2.findContours(imrange, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)
        if debug:
            cv2.imshow('contour', contourimage)
            cv2.waitKey(0)

        # Go through each contour
        foundArea = -1
        foundX = -1
        foundY = -1
        for contour in contours:
            x, y, w, h = cv2.boundingRect(contour)
            cx = x + (w / 2)
            cy = y + (h / 2)
            area = w * h
            if foundArea < area:
                foundArea = area
                foundX = cx
                foundY = cy
        if foundArea > 0:
            ball = [foundX, foundY, foundArea]
        else:
            ball = None
        # Set drives or report ball status
        self.SetSpeedFromBall(ball)

    # Set the motor speed from the ball position
    def SetSpeedFromBall(self, ball):
        global TB
        driveLeft = 0.0
        driveRight = 0.0
        if ball:
            x = ball[0]
            area = ball[2]
            if area < autoMinArea:
                print('Too small / far')
            elif area > autoMaxArea:
                print('Close enough')
            else:
                if area < autoFullSpeedArea:
                    speed = 1.0
                else:
                    speed = 1.0 / (area / autoFullSpeedArea)
                speed *= autoMaxPower - autoMinPower
                speed += autoMinPower
                direction = (x - imageCentreX) / imageCentreX
                if direction < 0.0:
                    # Turn right
                    print('Turn Right')
                    driveLeft = speed
                    driveRight = speed * (1.0 + direction)
                else:
                    # Turn left
                    print('Turn Left')
                    driveLeft = speed * (1.0 - direction)
                    driveRight = speed
                print('%.2f, %.2f' % (driveLeft, driveRight))
        else:
            print('No ball')


# TB.SetMotor1(driveLeft)
#        TB.SetMotor2(driveRight)

# Image capture thread
class ImageCapture(threading.Thread):
    def __init__(self):
        super(ImageCapture, self).__init__()
        self.start()

    def run(self):
        global camera
        global processor
        print('Start the stream using the video port')
        camera.capture_sequence(self.TriggerStream(), format='bgr', use_video_port=True)
        print('Terminating camera processing...')
        processor.terminated = True
        processor.join()
        print('Processing terminated.')

    # Stream delegation loop
    def TriggerStream(self):
        global running
        while running:
            if processor.event.is_set():
                time.sleep(0.01)
            else:
                yield processor.stream
                processor.event.set()


# Startup sequence
print('Setup camera')
camera = picamera.PiCamera()
camera.resolution = (imageWidth, imageHeight)
camera.framerate = frameRate
imageCentreX = imageWidth / 2.0
imageCentreY = imageHeight / 2.0

print('Setup the stream processing thread')
processor = StreamProcessor()

print('Wait ...')
time.sleep(2)
captureThread = ImageCapture()

try:
    print('Press CTRL+C to quit')
    ##    TB.MotorsOff()
    ##    TB.SetLedShowBattery(True)
    # Loop indefinitely until we are no longer running
    while running:
        # Wait for the interval period
        # You could have the code do other work in here &#x1f642;
        time.sleep(1.0)
        # Disable all drives
##    TB.MotorsOff()
except KeyboardInterrupt:
    # CTRL+C exit, disable all drives
    print('\nUser shutdown')
##    TB.MotorsOff()
except:
    # Unexpected error, shut down!
    e = sys.exc_info()[0]
    print
    print(e)
    print('\nUnexpected error, shutting down!')
##    TB.MotorsOff()
# Tell each thread to stop, and wait for them to end
running = False
captureThread.join()
processor.terminated = True
processor.join()
del camera
##TB.MotorsOff()
##TB.SetLedShowBattery(False)
##TB.SetLeds(0,0,0)
print('Program terminated.')

Of this code, a lot of it is boilerplate, that is to say, code we’re not really all that interested in. Some of it is used to drive the PiBorg motor control board, for example, and isn’t directly relevant to your robot (unless you have that board I guess!).

The image processing bit

The code we’re really interested in is the function at line 96:

def ProcessImage(self, image, colour):

This is a bound function (a method) so the first argument can be ignored for our purposes. The second and third arguments to this function are what we actually use – the ‘image’ argument is a frame captured from the Pi camera, and ‘colour’ is a colour name, in this case either ‘red’, ‘green’ or ‘blue’. Each time a frame is captured from the camera, this function is called to process it.

# View the original image seen by the camera.
if debug:
    cv2.imshow('original', image)
    cv2.waitKey(0)

You’ll see a few cases like the above – if ‘debug’ is set, this will show an image, in this case the original captured one, and wait for you to press a key before it continues. Here we’re just showing the image passed to the function in its own window (so this won’t work if you’re running without a display!)

# Blur the image
image = cv2.medianBlur(image, 5)
if debug:
    cv2.imshow('blur', image)
    cv2.waitKey(0)

The above code starts by blurring the input image. Why blur it? Well, we’re looking for continuous regions of a target colour, but our input image might be noisy – whether that’s dust on the sensor (unlikely) or thermal noise (like you’d get when using a mobile phone camera in low light). This very high frequency (small size) detail is just going to make our life harder, so the blur in effect filters out features below a certain size and cleans up the image so we can process it more easily in the next stage. Again, a debug flag pops up a window and waits for input if set, only this time it’s showing the blurred image (on line 102 we over-write the value assigned to ‘image’ with the new, blurred, one)

# Convert the image from 'BGR' to HSV colour space
image = cv2.cvtColor(image, cv2.COLOR_RGB2HSV)
if debug:
    cv2.imshow('cvtColour', image)
    cv2.waitKey(0)

The next step is to convert the image into HSV space from the default RGB. RGB images are represented as proportions of red, green and blue. That’s normal for a computer, but unhelpful when processing. HSV represents colours as a hue (the colour), saturation (the amount of colour vs white) and value (in effect the total intensity). Because we’re in the real world, a green object can have multiple shades of green in the image, but hopefully the hue won’t vary all that much (a shadow will change the value, but not the hue part). Again, a window is shown if debug is enabled, but you shouldn’t see any difference – the image is the same, just its internal representation has been changed.

# We want to extract the 'Hue', or colour, from the image. The 'inRange'
# method will extract the colour we are interested in (between 0 and 180)
# In testing, the Hue value for red is between 95 and 125
# Green is between 50 and 75
# Blue is between 20 and 35
# Yellow is... to be found!
if colour == "red":
    imrange = cv2.inRange(image, numpy.array((95, 127, 64)), numpy.array((125, 255, 255)))
elif colour == "green":
    imrange = cv2.inRange(image, numpy.array((50, 127, 64)), numpy.array((75, 255, 255)))
elif colour == 'blue':
    imrange = cv2.inRange(image, numpy.array((20, 64, 64)), numpy.array((35, 255, 255)))

Lines 120 to 125 do a range based thresholding. Bear with me! A threshold operation is one that takes each pixel in the image, applies a test to it, and colours the corresponding pixel in the output image either black (not in range) or white (in range). In this particular case the ‘range’ is expressed as two colours in HSV space, and any pixel where each of the H, S and V components is within those ranges will pass the filter and be white in the output (or maybe black, I can’t remember, but either way it’ll produce the right effect because all we actually care about is the shape of the border!).

So, if we’ve asked for ‘red’, the filter checks that the Hue component is between 95 and 125 (the first values in each triple), that the Saturation component (the amount of colour) is between 127 and 255 (because we can only operate when there’s some colour, a saturation value lower than that would indicate a grey or white, too pale to use), and where the Value is between 64 and 255 because we also can’t work if we’re seeing very dark colours because they’re all too similar
The result of this is that your original image is filtered for ‘things that are quite like red’, and ‘quite like red’ pixels in your input image (which is the one you previously blurred and converted to HSV space) will be white, and those not passing that test, black (or vice versa!). Again, you’re shown the thresholded image and asked to click to continue if debug is enabled.

# Find the contours
contourimage, contours, hierarchy = cv2.findContours(imrange, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)
if debug:
    cv2.imshow('contour', contourimage)
    cv2.waitKey(0)

The next bit is contouring. Contouring is a process where the outlines of continuous regions are found.

Lines 140 to 144 find the contours using cv2.findContours(), passing the thresholded image and the name of the algorithm opencv should use (you can vary these to handle e.g. shapes within other shapes, but here we’re doing the simplest version that returns a list of contours rather than a tree)

Again you’ll be shown an image, only because findContours returns three values and the first one is an image with the contours overlaid on it, you should see outlines of what the algorithm has detected as shapes. The actual value used next though is the second returned value, ‘contours’ (if you’ve not seen this kind of multiple assignment call before, what’s actually happening is that cv2.findContours has returned a three element array, and Python allows you to set multiple values directly from that array. This is called implicit de-structuring and it’s one of my favourite Python features – it’s the little things sometimes!)

# Go through each contour
foundArea = -1
foundX = -1
foundY = -1
for contour in contours:
     x, y, w, h = cv2.boundingRect(contour)
     cx = x + (w / 2)
     cy = y + (h / 2)
     area = w * h
     if foundArea < area:
         foundArea = area
         foundX = cx
         foundY = cy
if foundArea > 0:
    ball = [foundX, foundY, foundArea]
else:
    ball = None

Finally, lines 147 to 162 iterate over the contours and look for the biggest one. Each contour is a reasonably complex object describing the shape of the outline it’s found, but in this case we’re using the cv2.boundingRect(contour) operation to get the x, y coordinates and width and height (all in pixels). The x and y returned are of one corner of the bounding box, so to get the centre we add half the width to the x and half the height to the y coordinate (lines 152, 153).
We care about the size of the target, so find the area of the bounding box on line 154 by multiplying the width and height. Line 155 then checks to see whether the area for this contour is bigger than any found already and, if so, sets some values to the area and centre x,y of the contour.
This means that when we leave the loop on line 169, the variables foundArea, foundX and foundY contain the area, and x,y centre of the largest found contour (assuming there was at least one). The largest found contour is assigned to ‘ball’, or None if there wasn’t one found. In the original code this is passed to a further function which works out what to actually do, but that’s all the image processing done!

Concluding

Hopefully you should now have a better idea of what the example code is trying to do. It’s not necessarily the best approach, but those basic operations of blurring, using the HSV colour space, thresholding and then contouring will appear again and again in OpenCV examples so if you can get your head around those you’re in good shape to refine the above or write your own code from scratch.

If you want to get a better introduction to this kind of processing I’d strongly recommend you walk through these three linked articles on the PyImageSearch blog for a better, more flexible, approach; he explains everything very well and his code is much better than the example code you’ve got here. The blog’s worth a read anyway as it covers a lot of useful detail about OpenCV running on the Pi.

Have fun, and happy rainbow hunting!

What do you think?

You must be logged in to post a comment.