[Electronic work] I made a Suica touch sound detector with Raspberry Pi

Introduction

About five years ago (around 2015), when I went to dinner with a deaf acquaintance ** Sometimes I can't hear the Suica touch sound (of the transportation IC card) and I'm worried if I could touch it ** I heard the story. At that time, I was just buying a Raspberry Pi in Akihabara, and I was wondering if there was anything I could do about it. ** Can I make a machine that lights up the LED when I pick up the touch sound of Suica (PASMO or ICOCA, whatever)? ** ** I thought about it and tried to make it.

It's good to remember that, but since the equipment (wiring) has already been disassembled, I wonder how I explained it. By the way, even though I was in such a situation at the time of writing this article, I left it for another two years until I posted it ...

equipment

  1. Raspberry Pi 1 Model B +: What I bought at that time was version 1. I will write it later, but it was difficult in terms of performance. Now it's up to Raspberry Pi 4, but ...
  2. Desktop microphone (MS-STM55): A microphone that I bought a long time ago. Connect to RPi and input sound. Considering practical use, something like a pin microphone may be better.
  3. USB audio converter (BSHSAU01BK): Certainly, when I connected it to the 3.5 inch plug of RPi, it was noisy and I could not record well. So I wonder if I decided to connect via USB.
  4. USB Mobile Battery (Anker Astro Mini): I tried to take the power of RPi from the mobile battery for portability. It's a battery that can only output 1A, but it worked fine. 5.7-segment LED (OSL40562-IR): A part that is stuck in the board and displays 0. A guy who can display numbers from 0 to 9. The reason why it's not just an LED is that I wanted to display the number of times the touch sound was heard.
  5. Lead wire or universal board: Ideal for experiments as it can be easily inserted and removed.
  6. LAN cable: Used to enter commands from a PC via SSH. If you set the detection program to start as a service, it should work without a PC, so you won't need a LAN cable (although I haven't tried it). Now that Raspberry Pi is also equipped with Wi-Fi, this is also a trend of the times.

In the image above, a USB current / voltage checker is also connected. I think this was during standby (when the detection program was not running), but since the USB power supply is about 5V, it means that it is operating at about 1.4W.

Wiring

If you look closely at the image of the board earlier, there is a resistor, but this is to limit the current of the 7-segment LED. Looking at the 7-segment LED data sheet,

--The current flowing through the LED is 20mA or less [^ 1] --LED forward voltage drops by about 2.0V (DC Forward Voltage)

So, I connected it to the GPIO of RPi via the 220Ω resistor that I happened to have. For GPIO, for example, please see the following page. ON / OFF can be controlled by commands and programs.

[^ 1]: Since the current that can be passed through the GPIO pin of Raspberry Pi is also limited to 16mA, it is necessary to meet not only the parts but also that condition.

By the way, GPIO of RPi1 is 3.3V, so if you use a 220Ω resistor, will it flow about 6mA to the LED [^ 2]?

[^ 2]: The total current that can be passed through the GPIO pin of the Raspberry Pi is limited to 50mA, but in this case it should be okay because it is 42mA even if all 7 segments are lit.

話のネタにするために当時書いていたメモ

If you look at one (lit) segment, you should see a circuit like this. Actually, the image is that seven segments are connected in parallel, like a science miniature bulb experiment. However, it should be noted that the + side is actually 3.3V only in the lit segment, and the lit segment is 0V on the + side. The off-off segment has 0V on both the + and-sides, so no current flows.

program

After that, I will write a program that runs on Python on Raspbian. Broadly divided

--Where to capture sound from the microphone --Where to detect touch sound --Where to illuminate the LED

You need to make a hit.

Recording from a microphone

A microphone connected via a USB audio converter can be treated as an ALSA device, so I wrote it using the alsaaudio module. I also wanted to read from a wav file for debugging, so I wrote it in a way that supports both.

One thing to note about the Raspberry Pi is that writing a bad program can be heavy and unhelpful. Processing that can be completed in an instant on a PC is a difficult task for RPi (especially when you want to operate in real time like this time). While the FFT must process a fixed number of frames (2 to the power), the microphone input cannot always read a fixed number of frames. That's why there is a process such as creating a ring buffer, combining the read data, and cutting out to a certain length for FFT. If I wrote it by combining lists and slicing without thinking about anything here, it would have been too heavy to work in real time ...

pcmmod.py


# -*- coding: utf-8 -*-
import numpy
import alsaaudio
import wave
import collections

FrameArrayTuple = collections.namedtuple(
    "FrameArrayTuple",
    ["array", "nframes_read"])

# Based on https://scimusing.wordpress.com/2013/10/25/ring-buffers-in-pythonnumpy/
class RingBuffer:
    "A 1D ring buffer using numpy arrays"
    def __init__(self, length):
        pow2 = int(numpy.ceil(numpy.log2(length)))
        self.length = 2 ** pow2
        self.data = numpy.zeros(self.length, dtype='float64')
        self.index_top = 0
        self.index_bottom = 0

    def extend(self, x):
        "adds array x to ring buffer"
        if x.size > 0:
            x_index = (self.index_bottom + numpy.arange(x.size)) & (self.length-1)
            self.data[x_index] = x
            self.index_bottom = x_index[-1] + 1

    def get(self, n=None):
        "Returns the first-in-first-out data in the ring buffer"
        idx = (self.index_top + numpy.arange(min(n, self.count()) if n is not None else self.count())) & (self.length-1)
        self.index_top = idx[-1] + 1
        return self.data[idx]

    def count(self):
        c = (self.index_bottom - self.index_top + self.length) & (self.length-1)
        return c

class PCMInputStream:
    def __init__(self, maxNumFrame):
        #Cannot be instantiated independently, but can be called from the inheritance destination
        self.maxNumFrame = maxNumFrame
        self.op_array = self.getFramesToArrayOperator()

    def readFrames(self):
        raise NotImplementedError("readFrames() must be implemented")

    def readFrameArray(self):
        frames = self.readFrames()
        return self.op_array(frames)

    def getNumChannels(self):
        raise NotImplementedError("getNumChannels() must be implemented")

    def getFrameRate(self):
        raise NotImplementedError("getFrameRate() must be implemented")

    def getSampleWidthInBytes(self):
        raise NotImplementedError("getSampleWidthInBytes() must be implemented")

    def getFramesToArrayOperator(self):
        sw = self.getSampleWidthInBytes()
        if sw == 1:
            fmt = "uint8"
            shift_amp = 128.0
            max_amp = 128.0
        elif sw == 2:
            fmt = "int16"
            shift_amp = 0.0
            max_amp = 32768.0
        else:
            raise ValueError("getSampleWidthInBytes() must be return 1 or 2")
            return

        return (lambda frames: (numpy.frombuffer(frames, dtype=fmt) - shift_amp) / max_amp)

    def getFrameArrayIterator(self):
        #Try to always return numMaxFrame at a time.
        nframes_read = 0
        num_channels = self.getNumChannels()
        arr_buffer = RingBuffer(self.maxNumFrame * 10)
        l = -1
        while l != 0:
            if arr_buffer.count() >= self.maxNumFrame * num_channels:
                nframes_read += self.maxNumFrame
                #Divide into channels
                arr_channels = arr_buffer.get(self.maxNumFrame * num_channels).reshape(self.maxNumFrame, num_channels).T
                yield FrameArrayTuple(arr_channels, nframes_read)
            else:
                arr = self.readFrameArray()
                l = arr.shape[0]
                assert l % num_channels == 0
                #Combine the read contents
                arr_buffer.extend(arr)

        #Returns the last data
        arr = arr_buffer.get()
        nframes_read += arr.shape[0] / num_channels
        arr_channels = arr.reshape(arr.shape[0] / num_channels, num_channels).T
        yield FrameArrayTuple(arr_channels, nframes_read)

    def close(self):
        pass

class PCMInputStreamFromWave(PCMInputStream):
    def __init__(self, filename, maxNumFrame):
        self.wv = wave.open(filename, "r")
        self.ch = self.wv.getnchannels()
        self.rate = self.wv.getframerate()
        self.sw = self.wv.getsampwidth()
        self.maxNumFrame = maxNumFrame
        PCMInputStream.__init__(self, maxNumFrame)

    def readFrames(self):
        return self.wv.readframes(self.maxNumFrame)

    def getNumChannels(self):
        return self.ch

    def getFrameRate(self):
        return self.rate

    def getSampleWidthInBytes(self):
        return self.sw

    def close(self):
        self.wv.close()

class PCMInputStreamFromMic(PCMInputStream):
    def __init__(self, rate, sampleWidth, maxNumFrame):
        self.ch = 1
        self.rate = rate
        self.sw = sampleWidth
        self.maxNumFrame = maxNumFrame

        #Recording device initialization
        self.pcm = alsaaudio.PCM(alsaaudio.PCM_CAPTURE)
        self.pcm.setchannels(self.ch)
        self.pcm.setrate(self.rate)
        #Read two times at once to speed up processing
        print self.pcm.setperiodsize(self.maxNumFrame * 4)
        if self.sw == 1:
            self.pcm.setformat(alsaaudio.PCM_FORMAT_U8)
        elif self.sw == 2:
            self.pcm.setformat(alsaaudio.PCM_FORMAT_S16_LE)
        else:
            raise ValueError("sampleWidth must be 1 or 2")

        PCMInputStream.__init__(self, maxNumFrame)

    def readFrames(self):
        length, frames = self.pcm.read()
        return frames

    def getNumChannels(self):
        return self.ch

    def getFrameRate(self):
        return self.rate

    def getSampleWidthInBytes(self):
        return self.sw

Detect touch sound

This is also a rather difficult memory. Basically, the frequency of the voice was analyzed by FFT, and if the given frequency (of the touch sound) had more power than the other frequencies, it should have been judged as "touched". However, if you use the for loop easily as if you were writing a program on a PC, it will be too heavy to work in real time when calculating the power of a specific frequency from the components of the frequency bin, for example (if you are not good at it, the processing of the FFT itself). I remember doing various things, such as rewriting the parts that can be written with built-in modules, NumPy, and SciPy.

suicadetection.py


# -*- coding: utf-8 -*-

import numpy
import scipy.fftpack
import time
import bisect
import collections

DetectionHistoryTuple = collections.namedtuple(
    "DetectionHistoryTuple",
    ["cond_energy", "energy_peak", "freq_center_detected"])

#Constant representing the detection status
DETECTION_ON = "on"
DETECTION_OFF = "off"

class SuicaDetection:
    #Radius of frequency for energy calculation[Hz]
    FREQ_TOLERANCE = 50
    #Number of saved histories
    NUM_HIST_SAVED = 3
    #Energy ratio threshold at the time of rise judgment
    THRES_ENERGY_RATIO = 0.25
    #Minimum value for float64
    EPS_FLOAT64 = numpy.finfo(numpy.float64).eps

    def freq_filter_vector(self, freq_center, freq_tolerance):
        freq_delta = self.freq_axis[1] - self.freq_axis[0]
        # freq_center +/- freq_The energy contained in tolerance
        #Returns a weight vector to calculate.
        energy_weight = numpy.array(
            [(lambda freq_min, freq_max:
                      (1.0 if freq_min <= f and f + freq_delta <= freq_max
                       else (freq_max - f) / freq_delta if freq_min <= f <= freq_max <= f + freq_delta
                       else (f + freq_delta - freq_min) / freq_delta if f <= freq_min <= f + freq_delta <= freq_max
                       else (freq_tolerance * 2 / freq_delta) if f <= freq_min and freq_max <= f + freq_delta
                       else 0.0))
                 (freq_center - freq_tolerance, freq_center + freq_tolerance)
                 for f in self.freq_axis])
        return energy_weight

    def __init__(self, center_freqs, freq_axis):
        self.ts = time.time()
        self.hist = []
        self.time_axis = []
        self.nframes_read = 0
        self.detected_freq = None
        self.init_energy = None
        self.freq_axis = freq_axis
        #Energy weight for the center frequency you want to detect
        self.center_freqs = center_freqs
        self.energy_weight_array = numpy.array([
                self.freq_filter_vector(center_freq, self.FREQ_TOLERANCE)
                for center_freq in center_freqs
            ]).T

    def input_array(self, arr):
        assert len(arr.shape) == 1
        num_frames = arr.shape[0]

        self.detect(arr)

        status = None
        if self.detected_freq:
            #When the sound is ringing, the sound of that frequency band is heard three times immediately before.-End when 5dB drops
            if all((t.energy_peak is None) or (t.energy_peak - self.init_energy) < -5 for t in self.hist[-3:]):
                self.detected_freq = None
                status = DETECTION_OFF
        else:
            #If there is no sound, it is OK if the energy condition is met twice out of the last three times.
            if len([t for t in self.hist[-3:] if t.cond_energy]) >= 2:
                self.detected_freq = self.hist[-1].freq_center_detected
                self.init_energy = self.hist[-1].energy_peak
                status = DETECTION_ON

        self.nframes_read += num_frames

        return (self.nframes_read, status)
            
    def detect(self, arr):
        #print "start", time.time()
        assert len(arr.shape) == 1
        num_frames = arr.shape[0]

        # FFT
        f = scipy.fftpack.fft(arr)
        e = numpy.square(numpy.absolute(f))
        #Error prevention
        e = numpy.maximum(e, self.EPS_FLOAT64)

        #Total energy (constant times)
        energy_total = e.sum()
        #Energy for each frequency
        # +/-Summarize (double) the energy of the frequencies of
        energy_axis = 2 * e[0:num_frames/2]
        log_energy_axis = 10 * numpy.log10(energy_axis)

        #Calculate the energy ratio near the specified frequency
        energy_weighted = energy_axis.dot(self.energy_weight_array)
        energy_ratio_max, freq_center_detected = \
            max(zip(list(energy_weighted / energy_total), self.center_freqs),
                key=lambda t: t[0])

        #Energy condition
        #Focus on the strongest frequency
        cond_energy = (energy_ratio_max >= self.THRES_ENERGY_RATIO)

        # +/-Based on the maximum power within 100Hz
        idx_low = bisect.bisect_left(self.freq_axis, freq_center_detected - 100)
        idx_high = bisect.bisect_right(self.freq_axis, freq_center_detected + 100)
        energy_peak = log_energy_axis[idx_low:idx_high+1].max()
        #Add to history
        self.hist.append(DetectionHistoryTuple(cond_energy=cond_energy,
                                               energy_peak=energy_peak,
                                               freq_center_detected=freq_center_detected))
        #Delete old history
        if len(self.hist) > self.NUM_HIST_SAVED:
            self.hist.pop(0)

Make the LED shine

Control the ON / OFF of the corresponding GPIO according to the number you want to shine. Which GPIO to use is set by the calling program. I used the RPi.GPIO module for this ON / OFF control.

It seems that GPIO cannot be controlled unless it is executed as root, so be careful there.

sevenseg.py


# -*- coding: utf-8 -*-
import sys
import RPi.GPIO as GPIO
import time

class GPIO7seg:
    sevenseg_on = [[0, 2, 3, 5, 6, 7, 8, 9],
                   [0, 1, 2, 3, 4, 7, 8, 9],
                   [0, 1, 3, 4, 5, 6, 7, 8, 9],
                   [0, 2, 3, 5, 6, 8, 9],
                   [0, 2, 6, 8],
                   [0, 4, 5, 6, 8, 9],
                   [2, 3, 4, 5, 6, 8, 9]]

    def __init__(self, id_pin_seg):
        self.id_pin_seg = id_pin_seg
        GPIO.setmode(GPIO.BCM)
        for i in id_pin_seg:
            GPIO.setup(i, GPIO.OUT)

    def digit(self, n):
        for i in xrange(7):
            GPIO.output(self.id_pin_seg[i], n not in self.sevenseg_on[i])

Main part

While calling each module from the main program, the LED lights up according to the judgment result.

--The number is counted up for the touch sound within 1 second after the previous touch sound came. --The internal counter for the number of touch sounds has returned to 0 for 1 to 2 seconds from the previous touch sound, but the display counter has not been reset. ――If it sounds 1.5 seconds after it sounds once, or if it sounds again, the counter will not be 2 (it will be reset internally) and 1 will continue to be output. At first, the display was immediately returned to 0 after 1 second, but I thought that the movement was too busy, so I added this part. --The displayed counter returns to 0 2 seconds after the previous touch sound.

For example, 1 is displayed when a beep sounds, and 2 is displayed when a beep sounds. When passing through the ticket gate, the meaning is different once and twice, so let's try to understand it. It is a program called.

It is a GPIO for illuminating the 7-segment LED, but this time, pins 15 to 21 correspond to the segments A to G (see the data sheet) of the 7-segment LED, respectively.

If you run this main program as root (for GPIO control), hopefully the numbers will change in response to the touch sound picked up by the microphone.

suica_main.py


#!/usr/bin/env python2
# -*- coding: utf-8 -*-

import sys
import numpy
import itertools

import pcmmod
import suicadetection
import sevenseg

# ================================

def main():
    #Signal length (number of samples)
    MAX_NUM_FRAMES = 512
    #Frequency you want to detect
    FREQS_CENTER = [2550, 2700, 3000]

    #Window function
    ham_window = numpy.hamming(MAX_NUM_FRAMES)

    #WAVE reading or microphone input
    if len(sys.argv) == 1:
        #Microphone
        rate = 44100
        sw = 2
        sound = pcmmod.PCMInputStreamFromMic(rate, sw, MAX_NUM_FRAMES)
    else:
        # WAVE
        sound = pcmmod.PCMInputStreamFromWave(sys.argv[1], MAX_NUM_FRAMES)
        rate = sound.getFrameRate()

    #Frequency axis
    freq_axis = numpy.fft.fftfreq(MAX_NUM_FRAMES, 1.0/rate)[0:MAX_NUM_FRAMES/2]

    sd = suicadetection.SuicaDetection(FREQS_CENTER, freq_axis)

    #counter(7SEG LED)
    counter_ring = 0
    id_pin_seg = [15, 16, 17, 18, 19, 20, 21]
    gpio7 = sevenseg.GPIO7seg(id_pin_seg)
    gpio7.digit(counter_ring)
    #See the last counted up time
    counted_last = None

    #Read the waveform
    #Use up to the full and read part
    for arr, nframes_read in itertools.takewhile(lambda t: t.array.shape[1] == MAX_NUM_FRAMES,
                                                 sound.getFrameArrayIterator()):
        #Used for judgment:Take L for 2ch
        time_frames, status = sd.input_array(arr[0] * ham_window)
        if status == suicadetection.DETECTION_ON:
            print float(time_frames) / rate, "ON"
            # 0.Do not count until 1 second has passed
            counted_last = time_frames
            pending = True
        elif status == suicadetection.DETECTION_OFF:
            print float(time_frames) / rate, "OFF"
            print
            pending = False

        if counted_last is not None:
            if time_frames > counted_last + rate * 2.0:
                #Reset the LED after 2 seconds
                gpio7.digit(counter_ring)
                counted_last = None
            elif time_frames > counted_last + rate * 1.0:
                #No combo judgment after 1 second (reset counter only internally)
                counter_ring = 0
            elif pending and time_frames > counted_last + rate * 0.1:
                counter_ring += 1
                gpio7.digit(counter_ring)
                pending = False

if __name__ == "__main__":
    main()

Summary

I think I made a lot of adjustments when judging the touch sound. If you look at it simply with power, it will react in a noisy station yard.

After that, I found that the frequency of the touch sound is not unified unexpectedly. ** Even if you compare the case of the ticket gate and the case of the cash register at the convenience store, the pitch (frequency) of the touch sound is actually different **, so it becomes more and more complicated when thinking about making it possible to detect either. ….

By the way, I hadn't been concerned about the frequency of the touch sound before, so when I wrote this program, I first looked up specific numbers in Audacity. When I reported to the acquaintance at the beginning that "the touch sound seems to be 2550Hz or 2700Hz", I was convinced that "I can't hear it."

It's been a long time since people said that it was barrier-free or universal design. ** Are you looking only at visible steps? Do you design the sound with the frequency band in mind? ** ** I think I noticed that. However, if you make the sound low, it may be difficult to hear because of the crowd. it's difficult.

Recommended Posts

[Electronic work] I made a Suica touch sound detector with Raspberry Pi
I made a resource monitor for Raspberry Pi with a spreadsheet
I made a surveillance camera with my first Raspberry PI.
I made a web server with Raspberry Pi to watch anime
Enjoy electronic work with GPIO on Raspberry Pi
[For beginners] I made a motion sensor with Raspberry Pi and notified LINE!
I made a fortune with Python.
I made a daemon with Python
Using a webcam with Raspberry Pi
Build a Tensorflow environment with Raspberry Pi [2020]
I made a character counter with Python
Improved motion sensor made with Raspberry Pi
Make a wash-drying timer with a Raspberry Pi
I made a Hex map with Python
I made a life game with Numpy
I made a stamp generator with GAN
Operate an oscilloscope with a Raspberry Pi
Create a car meter with raspberry pi
I made a roguelike game with Python
I made a simple blackjack with Python
I made a configuration file with Python
I made a WEB application with Django
I made a neuron simulator with Python
I tried to make a traffic light-like with Raspberry Pi 4 (Python edition)
I made a pet camera that is always connected with WebRTC (Nuxt.js + Python + Firebase + SkyWay + Raspberry Pi)
I made a stamp substitute bot with line
I made a weather forecast bot-like with Python.
I made a GUI application with Python + PyQt5
I made a Twitter fujoshi blocker with Python ①
I tried L-Chika with Raspberry Pi 4 (Python edition)
[Python] I made a Youtube Downloader with Tkinter.
I made a simple Bitcoin wallet with pycoin
I made a LINE Bot with Serverless Framework!
I tried to create a button for Slack with Raspberry Pi + Tact Switch
I made a random number graph with Numpy
Stock investment analysis app made with Raspberry Pi
I made a bin picking game with Python
I made a Mattermost bot with Python (+ Flask)
I made a QR code image with CuteR
I made a Python program for Raspberry Pi that operates Omron's environmental sensor in the mode with data storage
File sharing server made with Raspberry Pi that can be used for remote work
[AWS] I made a reminder BOT with LINE WORKS
I made a Twitter BOT with GAE (python) (with a reference)
I made a household account book bot with LINE Bot
I made a ready-to-use syslog server with Play with Docker
I made a Christmas tree lighting game with Python
I made a vim learning game "PacVim" with Go
I made a window for Log output with Tkinter
I made a net news notification app with Python
I made a Python3 environment on Ubuntu with direnv.
I want to work with a robot in python.
I made a LINE BOT with Python and Heroku
A memorandum when making a surveillance camera with Raspberry Pi
I made a falling block game with Sense HAT
〇✕ I made a game
DigitalSignage with Raspberry Pi
I tried to make a motion detection surveillance camera with OpenCV using a WEB camera with Raspberry Pi
A story that stumbled when I made a chatbot with Transformer
I made a simple typing game with tkinter in Python
Create a web surveillance camera with Raspberry Pi and OpenCV
I made a package to filter time series with python