7
7
# internal python modules
8
8
import logging
9
9
import time
10
+ from typing import Union
10
11
11
12
# external python modules
12
13
import serial
@@ -90,7 +91,7 @@ def __init__(self, num_of_channels: int = 512, serial_number: str = "") -> None:
90
91
91
92
:param num_of_channels: integer between 1 and 512
92
93
:param serial_number: serial number of the RS-485 chip as string. If you want to know the current serial number
93
- of your device, call my_dmx.use_device .serial_number
94
+ of your device, call my_dmx.device .serial_number
94
95
:return None:
95
96
"""
96
97
@@ -102,7 +103,7 @@ def __init__(self, num_of_channels: int = 512, serial_number: str = "") -> None:
102
103
103
104
# Search for RS-485 devices, for this look into DEVICE_LIST
104
105
self .ser = None
105
- self .use_device = None
106
+ self .device = None
106
107
for device in serial .tools .list_ports .comports ():
107
108
for known_device in DEVICE_LIST :
108
109
if device .vid == known_device .vid and device .pid == known_device .pid and serial_number == "" :
@@ -112,7 +113,7 @@ def __init__(self, num_of_channels: int = 512, serial_number: str = "") -> None:
112
113
except (OSError , serial .SerialException ):
113
114
pass
114
115
else :
115
- self .use_device = device
116
+ self .device = device
116
117
break
117
118
118
119
elif device .vid == known_device .vid and device .pid == known_device .pid and \
@@ -124,22 +125,22 @@ def __init__(self, num_of_channels: int = 512, serial_number: str = "") -> None:
124
125
except (OSError , serial .SerialException ) as error :
125
126
raise error
126
127
else :
127
- self .use_device = device
128
+ self .device = device
128
129
logger .info ("Found device with serial number: " + serial_number )
129
130
break
130
- if self .use_device :
131
- logger .info ("Found RS-485 interface: " + self .use_device .description )
131
+ if self .device :
132
+ logger .info ("Found RS-485 interface: " + self .device .description )
132
133
break
133
134
134
- if self .use_device is None :
135
+ if self .device is None :
135
136
raise ConnectionError ("Could not find the RS-485 interface." )
136
137
137
- if self .use_device .vid == EUROLITE_USB_DMX512_PRO_CABLE_INTERFACE .vid and \
138
- self .use_device .pid == EUROLITE_USB_DMX512_PRO_CABLE_INTERFACE .pid :
138
+ if self .device .vid == EUROLITE_USB_DMX512_PRO_CABLE_INTERFACE .vid and \
139
+ self .device .pid == EUROLITE_USB_DMX512_PRO_CABLE_INTERFACE .pid :
139
140
self .start_byte = np .array ([0x7E , 0x06 , 0x01 , 0x02 , 0x00 ], np .uint8 )
140
141
self .end_byte = np .array ([0xE7 ], np .uint8 )
141
142
self .num_of_channels = num_of_channels
142
- self .ser = serial .Serial (self .use_device .name ,
143
+ self .ser = serial .Serial (self .device .name ,
143
144
baudrate = 250000 ,
144
145
parity = serial .PARITY_NONE ,
145
146
bytesize = serial .EIGHTBITS ,
@@ -149,7 +150,7 @@ def __init__(self, num_of_channels: int = 512, serial_number: str = "") -> None:
149
150
self .start_byte = np .array ([0x00 ], np .uint8 )
150
151
self .end_byte = np .array ([], np .uint8 )
151
152
self .num_of_channels = num_of_channels
152
- self .ser = serial .Serial (self .use_device .name ,
153
+ self .ser = serial .Serial (self .device .name ,
153
154
baudrate = 250000 ,
154
155
parity = serial .PARITY_NONE ,
155
156
bytesize = serial .EIGHTBITS ,
@@ -175,8 +176,8 @@ def num_of_channels(self, num_of_channels: int) -> None:
175
176
if num_of_channels > 512 :
176
177
raise ValueError ("Number of channels are maximal 512! Only channels 1 to 512 can be accessed. " +
177
178
"Channel 0 is reserved as start channel." )
178
- if self .use_device .vid == EUROLITE_USB_DMX512_PRO_CABLE_INTERFACE .vid and \
179
- self .use_device .pid == EUROLITE_USB_DMX512_PRO_CABLE_INTERFACE .pid :
179
+ if self .device .vid == EUROLITE_USB_DMX512_PRO_CABLE_INTERFACE .vid and \
180
+ self .device .pid == EUROLITE_USB_DMX512_PRO_CABLE_INTERFACE .pid :
180
181
self .start_byte [2 ] = (num_of_channels + 1 ) & 0xFF
181
182
self .start_byte [3 ] = ((num_of_channels + 1 ) >> 8 ) & 0xFF
182
183
self .__num_of_channels = num_of_channels
@@ -186,6 +187,19 @@ def num_of_channels(self, num_of_channels: int) -> None:
186
187
for channel_id in range (min ([len (old_data ), len (self .data )])):
187
188
self .data [channel_id ] = old_data [channel_id ]
188
189
190
+ def is_connected (self ) -> bool :
191
+ """
192
+ checks if the DMX class has a connection to the device
193
+
194
+ :return:
195
+ """
196
+ connected = False
197
+ devices = serial .tools .list_ports .comports ()
198
+ if self .device is not None :
199
+ if self .device in devices :
200
+ connected = True
201
+ return connected
202
+
189
203
def set_data (self , channel_id : int , data : int , auto_send : bool = True ) -> None :
190
204
"""
191
205
@@ -224,11 +238,12 @@ def __del__(self) -> None:
224
238
"""
225
239
if isinstance (self .ser , serial .Serial ):
226
240
if self .ser .is_open :
241
+ if self .is_connected ():
242
+ self .num_of_channels = 512
243
+ self .data = np .zeros ([self .num_of_channels ], np .uint8 )
244
+ self .send ()
245
+ self .send () # make sure it has been send
227
246
print ("close serial port" )
228
- self .num_of_channels = 512
229
- self .data = np .zeros ([self .num_of_channels ], np .uint8 )
230
- self .send ()
231
- self .send () # make sure it is send
232
247
self .ser .close ()
233
248
234
249
@@ -289,7 +304,7 @@ def sleep_us(sleep_in_us: int) -> None:
289
304
dmx .set_data (4 , i )
290
305
time .sleep (0.01 )
291
306
292
- my_device_sn = dmx .use_device .serial_number
307
+ my_device_sn = dmx .device .serial_number
293
308
del dmx
294
309
295
310
dmx2 = DMX (serial_number = my_device_sn )
0 commit comments