Remote Spectrum Monitor User Guide : Programming with SCPI : SCPI Command Programming Examples
 
SCPI Command Programming Examples
This section provides information on spectrum trace data and I/Q data via SCPI commands.
Spectrum Trace Data via SCPI
SCPI commands are sent to port 9001 of the instrument. Below is a simple example to capture spectrum trace data.
SENS:FREQ:START 88 MHz
SENS:FREQ:STOP 108 MHz
 
//Set sweep mode
SWEEP:MODE FFT
 
//Set RBW 30 kHz
BANDWIDTH 30 KHz
 
//Set Reference Level to -30 dBm
DISP:WIND:TRAC:Y:SCAL:RLEV -30
 
//Set to single sweep
INIT:CONT OFF
 
//Get trace amplitude data
TRACE:DATA? 1
 
//Get number of display points to calculate frequency array
DISP:POIN?
Spectrum Trace Data Format
Trace data uses SCPI standard (IEEE 488.2) block data format. The data format is '#AXD', where D is a comma separated list of amplitudes (in ASCII), X is one or more ASCII digits specifying the number of bytes in D, and A is a single ASCII digit specifying the number of digits in X.
Trace data only contains amplitude. The frequency information for each point is
Frequency = [start_frequency + (span/(display_points-1))*N
N = 0, 1, … display_points
Raw Socket Connection
import socket
from time import sleep, time
 
class SocketConnection:
"""Provides a means to connect and send SCPI commands to the DUT using a raw TCP socket."""
def __init__(self, ipAddress):
"""
Initializes an instance of SocketConnection class
@param ipAddress The IP address of the device
"""
 
# split out port number if given
splitIpAddress = ipAddress.split(':')
 
assert len(splitIpAddress) > 0
assert len(splitIpAddress) <= 2
self._ipAddress = splitIpAddress[0]
 
#assign port
if len(splitIpAddress) == 2:
self._portNumber = int(splitIpAddress[1])
else:
self._portNumber = 9001
 
self._socketConnection = None
 
self._timeoutInSec = 120
self._socketReadSize = 4096
self.__nonBulkDataSizeCuttoff = 32768
# Time to let the other end of the connection close
self.__timeoutAfterCloseInSec = 1
self._terminatedBlockResponse = False
self.prefix = ''
self._verbose = False
self._establishConnection()
 
def __del__(self):
"""
This gets called by the garbage collector so it is possible that the connection will
remain open for a while before this gets collected.
"""
self._closeConnection()
 
def getpeername(self):
return self._ipAddress, self._portNumber
 
def settimeout(self, *args, **kwargs):
return self._socketConnection.settimeout(*args, **kwargs)
 
def expectTerminatedBlockResponse(self, newval=None):
if newval is not None:
self._terminatedBlockResponse = newval
return self._terminatedBlockResponse
 
def sendWriteCommand(self, scpiCommand):
"""
Sends a SCPI write command.
@param scpiCommand The SCPI command to send.
"""
scpiCommand = self.prefix + scpiCommand
try:
returnValue = self._socketConnection.sendall(scpiCommand + "\n")
assert returnValue is None, "Error sending command: " + scpiCommand
if self._verbose:
if len(scpiCommand) < self.__nonBulkDataSizeCuttoff:
print(scpiCommand + " sent successfully")
else:
print( "sent long scpi command of length: " + str(len(scpiCommand)))
except socket.error as msg:
assert False, "Failed to send SCPI command: a socket error occurred (Error code: " + str(msg[0]) + ", Error message: " + str(msg[1]) + ")"
return
def sendQueryCommand(self, scpiCommand):
"""
Sends a SCPI query command and return the response.
@param scpiCommand The SCPI query to send.
@return The result of the SCPI command.
"""
scpiCommand = self.prefix + scpiCommand
try:
returnValue = self._socketConnection.sendall(scpiCommand + "\n")
assert returnValue is None, "failed to send command"
if self._verbose:
print(scpiCommand + " sent successfully")
 
# Read 1 byte to check for a block data response header
data = self._socketConnection.recv(1)
assert len(data) > 0, "No data returned for query"
if len(data) > 0 and data[0] == '#':
# Block data response
data = self._getBlockDataResponse()
elif len(data) > 0 and data[0] == '\n':
# Check for a response string that only contains a newline. Remove the newline and return empty data.
data = data[:-1]
elif len(data) > 0:
# ASCII response: receive until the entire response is read
while True:
data += self._socketConnection.recv(self._socketReadSize)
 
assert len(data) < self.__nonBulkDataSizeCuttoff, \
"No newline character found in response to " + scpiCommand + " SCPI command."
 
# Check for a new line at the end of the response
if data[-1] == '\n':
break;
 
# Remove the trailing \n from the response
data = data[:-1]
if self._verbose:
print('Data received: "%s"' % data)
except socket.error as msg:
assert False, "Failed to send SCPI command: a socket error occurred \n" + msg.__str__()
return data
 
def _establishConnection(self):
"""Establishes a connection. The call will fail if a connection is already open."""
assert self._socketConnection is None, "connection should not already be open"
try:
self._socketConnection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socketConnection.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._socketConnection.settimeout(self._timeoutInSec)
self._socketConnection.connect((self._ipAddress, self._portNumber))
self._socketConnection.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
except socket.error as msg:
assert False, "Failed to establish DUT connection (Error code: " + str(msg[0]) + ", Error message: " + str(msg[1]) + ")"
 
def _closeConnection(self):
"""
Closes the socket connection and asserts that it closed. This informs the other end of the
socket that it should close but it may take some time depending on implementation,
network conditions, etc.
"""
if self._socketConnection is not None:
self._socketConnection.shutdown(socket.SHUT_RDWR)
self._socketConnection.close()
self._socketConnection = None
sleep(self.__timeoutAfterCloseInSec)
assert self._socketConnection is None, "Socket connection not closed"
 
def _getBlockDataResponse(self):
"""
Receives a SCPI block data response of the form 'AXD' where A is a single ASCII byte
specifying the number of digits in X, X is one or more ASCII bytes specifying the number
of bytes in D, and D is one or more bytes containing the response binary data.
"""
numSizeBytes = int(self._socketConnection.recv(1))
 
assert numSizeBytes > 0, "The definite-length empty block response must be #10 not #0."
 
numDataBytesLeft = int(self._socketConnection.recv(numSizeBytes))
responses = []
readBuffer = bytearray(numDataBytesLeft)
view = memoryview(readBuffer)
timeoutSeconds = self._socketConnection.gettimeout()
lastReadTime = time()
 
while numDataBytesLeft > 0:
numBytesRead = self._socketConnection.recv_into(view, numDataBytesLeft)
if numBytesRead > 0:
lastReadTime = time()
 
dt = time() - lastReadTime
if dt > timeoutSeconds:
raise Exception('Timeout after %d ms: Only read %d/%d bytes'
% (dt, len(readBuffer),
len(readBuffer) + numDataBytesLeft))
 
view = view[numBytesRead:]
numDataBytesLeft = numDataBytesLeft - numBytesRead
 
if self._terminatedBlockResponse:
blockTerminator = self._socketConnection.recv(2)
assert blockTerminator in ('\r\n', '\n')
if self._verbose:
print("Read bytes of block data: ", len(readBuffer))
return readBuffer
 
def reset(self, delay_seconds=-1):
"""
Resets the established connection
@param delay_seconds: Wait time between closing the connection and attempting to
re-establish the connection. This is useful when rebooting an instrument.
"""
self._closeConnection()
 
if delay_seconds >= 0:
sleep(delay_seconds)
try:
self._establishConnection()
except socket.error as msg:
assert False, "Failed to establish DUT connection (Error code: " + str(msg[0]) + ", Error message: " + str(msg[1]) + ")"
else:
reset_timeout = 300 # 300 seconds == 5 minutes == max polling time
time.sleep(5) # Fixed delay before attempting to reconnect
while reset_timeout > 0:
try:
self._socketConnection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socketConnection.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._socketConnection.settimeout(self._timeoutInSec)
self._socketConnection.connect((self._ipAddress, self._portNumber))
break
except Exception as msg :
self._socketConnection.close()
self._socketConnection = None
sleep(1)
reset_timeout -= 1
if reset_timeout <= 0:
assert False, "Failed to establish DUT connection (Error code: " + str(msg[0]) + ", Error message: " + str(msg[1]) + ")"
 
 
I/Q Block Capture via SCPI
SENS:FREQ:CENTER 100 MHz
SENS:FREQ:SPAN 20 MHz
SWEEP:MODE FFT
//Set RBW 30 kHz
BANDWIDTH 30 KHz
//Set Reference Level to -30 dBm
DISP:WIND:TRAC:Y:SCAL:RLEV -30
//Set to single sweep
INIT:CONT OFF
//abort any sweep in progress
:ABORT
 
//Set Capture bandwidth. Not same as RBW.
IQ:BANDWIDTH 20 MHz
 
//Set 16 bit resolution
IQ:BITS 16
 
//Set to I/Q block capture mode
IQ:MODE SINGLE
//enable time stamp
SENS:IQ:TIME 1
 
//Set capture length to 5 msec
IQ:LENGTH 5 ms
 
//Start IQ Capture. Triggers single capture. Data is saved to DDR2 SDRAM memory.
MEAS:IQ:CAPT
 
//Check if capture is completed normally
STATus:OPERation?
 
//The STATus:OPERation? query responds with a integer. Convert this integer to binary.
//Bit 9 is set to 1 when the MEAS:IQ:CAPT command is issued.
//Bit 9 is set to 0 when the capture is completed normally in block mode.