123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230 |
- ###########################################################################
- #
- # OpenOPC for Python Library Module
- #
- # Copyright (c) 2007-2012 Barry Barnreiter (barry_b@users.sourceforge.net)
- # Copyright (c) 2014 Anton D. Kachalov (mouse@yandex.ru)
- # Copyright (c) 2017 José A. Maita (jose.a.maita@gmail.com)
- #
- ###########################################################################
- import os
- import sys
- import time
- import types
- import string
- import socket
- import re
- import Pyro4.core
- from multiprocessing import Queue
- __version__ = '1.2.0'
- current_client = None
- # Win32 only modules not needed for 'open' protocol mode
- if os.name == 'nt':
- try:
- import win32com.client
- import win32com.server.util
- import win32event
- import pythoncom
- import pywintypes
- import SystemHealth
-
- # Win32 variant types
- pywintypes.datetime = pywintypes.TimeType
- vt = dict([(pythoncom.__dict__[vtype], vtype) for vtype in pythoncom.__dict__.keys() if vtype[:2] == "VT"])
- # Allow gencache to create the cached wrapper objects
- win32com.client.gencache.is_readonly = False
-
- # Under p2exe the call in gencache to __init__() does not happen
- # so we use Rebuild() to force the creation of the gen_py folder
- win32com.client.gencache.Rebuild(verbose=0)
- # So we can work on Windows in "open" protocol mode without the need for the win32com modules
- except ImportError:
- win32com_found = False
- else:
- win32com_found = True
- else:
- win32com_found = False
- # OPC Constants
- SOURCE_CACHE = 1
- SOURCE_DEVICE = 2
- OPC_STATUS = (0, 'Running', 'Failed', 'NoConfig', 'Suspended', 'Test')
- BROWSER_TYPE = (0, 'Hierarchical', 'Flat')
- ACCESS_RIGHTS = (0, 'Read', 'Write', 'Read/Write')
- OPC_QUALITY = ('Bad', 'Uncertain', 'Unknown', 'Good')
- OPC_CLASS = 'Matrikon.OPC.Automation;Graybox.OPC.DAWrapper;HSCOPC.Automation;RSI.OPCAutomation;OPC.Automation'
- OPC_SERVER = 'Hci.TPNServer;HwHsc.OPCServer;opc.deltav.1;AIM.OPC.1;Yokogawa.ExaopcDAEXQ.1;OSI.DA.1;OPC.PHDServerDA.1;Aspen.Infoplus21_DA.1;National Instruments.OPCLabVIEW;RSLinx OPC Server;KEPware.KEPServerEx.V4;Matrikon.OPC.Simulation;Prosys.OPC.Simulation;CCOPC.XMLWrapper.1;OPC.SimaticHMI.CoRtHmiRTm.1'
- OPC_CLIENT = 'OpenOPC'
- def quality_str(quality_bits):
- """Convert OPC quality bits to a descriptive string"""
- quality = (quality_bits >> 6) & 3
- return OPC_QUALITY[quality]
- def type_check(tags):
- """Perform a type check on a list of tags"""
-
- if type(tags) in (list, tuple):
- single = False
- elif tags == None:
- tags = []
- single = False
- else:
- tags = [tags]
- single = True
- if len([t for t in tags if type(t) not in (str,bytes)]) == 0:
- valid = True
- else:
- valid = False
- return tags, single, valid
- def wild2regex(string):
- """Convert a Unix wildcard glob into a regular expression"""
- return string.replace('.','\.').replace('*','.*').replace('?','.').replace('!','^')
- def tags2trace(tags):
- """Convert a list tags into a formatted string suitable for the trace callback log"""
- arg_str = ''
- for i,t in enumerate(tags[1:]):
- if i > 0: arg_str += ','
- arg_str += '%s' % t
- return arg_str
- def exceptional(func, alt_return=None, alt_exceptions=(Exception,), final=None, catch=None):
- """Turns exceptions into an alternative return value"""
- def _exceptional(*args, **kwargs):
- try:
- try:
- return func(*args, **kwargs)
- except alt_exceptions:
- return alt_return
- except:
- if catch: return catch(sys.exc_info(), lambda:func(*args, **kwargs))
- raise
- finally:
- if final: final()
- return _exceptional
- def get_sessions(host='localhost', port=7766):
- """Return sessions in OpenOPC Gateway Service as GUID:host hash"""
-
- import Pyro4.core
- server_obj = Pyro4.Proxy("PYRO:opc@{0}:{1}".format(host, port))
- return server_obj.get_clients()
- def open_client(host='localhost', port=7766):
- """Connect to the specified OpenOPC Gateway Service"""
-
- import Pyro4.core
- server_obj = Pyro4.Proxy("PYRO:opc@{0}:{1}".format(host, port))
- return server_obj.create_client()
- class TimeoutError(Exception):
- def __init__(self, txt):
- Exception.__init__(self, txt)
- class OPCError(Exception):
- def __init__(self, txt):
- Exception.__init__(self, txt)
- class GroupEvents:
- def __init__(self):
- self.client = current_client
-
- def OnDataChange(self, TransactionID, NumItems, ClientHandles, ItemValues, Qualities, TimeStamps):
- self.client.callback_queue.put((TransactionID, ClientHandles, ItemValues, Qualities, TimeStamps))
- @Pyro4.expose # needed for 4.55
- class client():
- def __init__(self, opc_class=None, client_name=None):
- """Instantiate OPC automation class"""
- self.callback_queue = Queue()
- pythoncom.CoInitialize()
- if opc_class == None:
- if 'OPC_CLASS' in os.environ:
- opc_class = os.environ['OPC_CLASS']
- else:
- opc_class = OPC_CLASS
- opc_class_list = opc_class.split(';')
- for i,c in enumerate(opc_class_list):
- try:
- self._opc = win32com.client.gencache.EnsureDispatch(c, 0)
- self.opc_class = c
- break
- except pythoncom.com_error as err:
- if i == len(opc_class_list)-1:
- error_msg = 'Dispatch: %s' % self._get_error_str(err)
- raise OPCError(error_msg)
-
- self._event = win32event.CreateEvent(None,0,0,None)
- self.opc_server = None
- self.opc_host = None
- self.client_name = client_name
- self._groups = {}
- self._group_tags = {}
- self._group_valid_tags = {}
- self._group_server_handles = {}
- self._group_handles_tag = {}
- self._group_hooks = {}
- self._open_serv = None
- self._open_self = None
- self._open_host = None
- self._open_port = None
- self._open_guid = None
- self._prev_serv_time = None
- self._tx_id = 0
- self.trace = None
- self.cpu = None
- def set_trace(self, trace):
- if self._open_serv == None:
- self.trace = trace
- def connect(self, opc_server=None, opc_host='localhost'):
- """Connect to the specified OPC server"""
- pythoncom.CoInitialize()
-
- if opc_server == None:
- # Initial connect using environment vars
- if self.opc_server == None:
- if 'OPC_SERVER' in os.environ:
- opc_server = os.environ['OPC_SERVER']
- else:
- opc_server = OPC_SERVER
- # Reconnect using previous server name
- else:
- opc_server = self.opc_server
- opc_host = self.opc_host
- opc_server_list = opc_server.split(';')
- connected = False
- for s in opc_server_list:
- try:
- if self.trace: self.trace('Connect(%s,%s)' % (s, opc_host))
- self._opc.Connect(s, opc_host)
- except pythoncom.com_error as err:
- if len(opc_server_list) == 1:
- error_msg = 'Connect: %s' % self._get_error_str(err)
- raise OPCError(error_msg)
- else:
- # Set client name since some OPC servers use it for security
- try:
- if self.client_name == None:
- if 'OPC_CLIENT' in os.environ:
- self._opc.ClientName = os.environ['OPC_CLIENT']
- else:
- self._opc.ClientName = OPC_CLIENT
- else:
- self._opc.ClientName = self.client_name
- except:
- pass
- connected = True
- break
- if not connected:
- raise OPCError('Connect: Cannot connect to any of the servers in the OPC_SERVER list')
- # With some OPC servers, the next OPC call immediately after Connect()
- # will occationally fail. Sleeping for 1/100 second seems to fix this.
- time.sleep(0.01)
- self.opc_server = opc_server
- if opc_host == 'localhost':
- opc_host = socket.gethostname()
- self.opc_host = opc_host
- # On reconnect we need to remove the old group names from OpenOPC's internal
- # cache since they are now invalid
- self._groups = {}
- self._group_tags = {}
- self._group_valid_tags = {}
- self._group_server_handles = {}
- self._group_handles_tag = {}
- self._group_hooks = {}
- def GUID(self):
- return self._open_guid
- def close(self, del_object=True):
- """Disconnect from the currently connected OPC server"""
- try:
- pythoncom.CoInitialize()
- self.remove(self.groups())
- except pythoncom.com_error as err:
- error_msg = 'Disconnect: %s' % self._get_error_str(err)
- raise OPCError(error_msg)
- except OPCError:
- pass
- finally:
- if self.trace: self.trace('Disconnect()')
- self._opc.Disconnect()
- # Remove this object from the open gateway service
- if self._open_serv and del_object:
- self._open_serv.release_client(self._open_self)
- def iread(self, tags=None, group=None, size=None, pause=0, source='hybrid', update=-1, timeout=5000, sync=False, include_error=False, rebuild=False):
- """Iterable version of read()"""
- def add_items(tags):
- names = list(tags)
- names.insert(0,0)
- errors = []
-
- if self.trace: self.trace('Validate(%s)' % tags2trace(names))
-
- try:
- errors = opc_items.Validate(len(names)-1, names)
- except:
- pass
-
- valid_tags = []
- valid_values = []
- client_handles = []
- if not sub_group in self._group_handles_tag:
- self._group_handles_tag[sub_group] = {}
- n = 0
- elif len(self._group_handles_tag[sub_group]) > 0:
- n = max(self._group_handles_tag[sub_group]) + 1
- else:
- n = 0
-
- for i, tag in enumerate(tags):
- if errors[i] == 0:
- valid_tags.append(tag)
- client_handles.append(n)
- self._group_handles_tag[sub_group][n] = tag
- n += 1
- elif include_error:
- error_msgs[tag] = self._opc.GetErrorString(errors[i])
-
- if self.trace and errors[i] != 0: self.trace('%s failed validation' % tag)
- client_handles.insert(0,0)
- valid_tags.insert(0,0)
- server_handles = []
- errors = []
- if self.trace: self.trace('AddItems(%s)' % tags2trace(valid_tags))
-
- try:
- server_handles, errors = opc_items.AddItems(len(client_handles)-1, valid_tags, client_handles)
- except:
- pass
-
- valid_tags_tmp = []
- server_handles_tmp = []
- valid_tags.pop(0)
- if not sub_group in self._group_server_handles:
- self._group_server_handles[sub_group] = {}
-
- for i, tag in enumerate(valid_tags):
- if errors[i] == 0:
- valid_tags_tmp.append(tag)
- server_handles_tmp.append(server_handles[i])
- self._group_server_handles[sub_group][tag] = server_handles[i]
- elif include_error:
- error_msgs[tag] = self._opc.GetErrorString(errors[i])
-
- valid_tags = valid_tags_tmp
- server_handles = server_handles_tmp
- return valid_tags, server_handles
- def remove_items(tags):
- if self.trace: self.trace('RemoveItems(%s)' % tags2trace(['']+tags))
- server_handles = [self._group_server_handles[sub_group][tag] for tag in tags]
- server_handles.insert(0,0)
- errors = []
- try:
- errors = opc_items.Remove(len(server_handles)-1, server_handles)
- except pythoncom.com_error as err:
- error_msg = 'RemoveItems: %s' % self._get_error_str(err)
- raise OPCError(error_msg)
- try:
- self._update_tx_time()
- pythoncom.CoInitialize()
-
- if include_error:
- sync = True
-
- if sync:
- update = -1
- tags, single, valid = type_check(tags)
- if not valid:
- raise TypeError("iread(): 'tags' parameter must be a string or a list of strings")
- # Group exists
- if group in self._groups and not rebuild:
- num_groups = self._groups[group]
- data_source = SOURCE_CACHE
- # Group non-existant
- else:
- if size:
- # Break-up tags into groups of 'size' tags
- tag_groups = [tags[i:i+size] for i in range(0, len(tags), size)]
- else:
- tag_groups = [tags]
-
- num_groups = len(tag_groups)
- data_source = SOURCE_DEVICE
- results = []
- for gid in range(num_groups):
- if gid > 0 and pause > 0: time.sleep(pause/1000.0)
-
- error_msgs = {}
- opc_groups = self._opc.OPCGroups
- opc_groups.DefaultGroupUpdateRate = update
- # Anonymous group
- if group == None:
- try:
- if self.trace: self.trace('AddGroup()')
- opc_group = opc_groups.Add()
- except pythoncom.com_error as err:
- error_msg = 'AddGroup: %s' % self._get_error_str(err)
- raise OPCError(error_msg)
- sub_group = group
- new_group = True
- else:
- sub_group = '%s.%d' % (group, gid)
- # Existing named group
- try:
- if self.trace: self.trace('GetOPCGroup(%s)' % sub_group)
- opc_group = opc_groups.GetOPCGroup(sub_group)
- new_group = False
- # New named group
- except:
- try:
- if self.trace: self.trace('AddGroup(%s)' % sub_group)
- opc_group = opc_groups.Add(sub_group)
- except pythoncom.com_error as err:
- error_msg = 'AddGroup: %s' % self._get_error_str(err)
- raise OPCError(error_msg)
- self._groups[str(group)] = len(tag_groups)
- new_group = True
-
- opc_items = opc_group.OPCItems
- if new_group:
- opc_group.IsSubscribed = 1
- opc_group.IsActive = 1
- if not sync:
- if self.trace: self.trace('WithEvents(%s)' % opc_group.Name)
- global current_client
- current_client = self
- self._group_hooks[opc_group.Name] = win32com.client.WithEvents(opc_group, GroupEvents)
- tags = tag_groups[gid]
-
- valid_tags, server_handles = add_items(tags)
-
- self._group_tags[sub_group] = tags
- self._group_valid_tags[sub_group] = valid_tags
- # Rebuild existing group
- elif rebuild:
- tags = tag_groups[gid]
- valid_tags = self._group_valid_tags[sub_group]
- add_tags = [t for t in tags if t not in valid_tags]
- del_tags = [t for t in valid_tags if t not in tags]
- if len(add_tags) > 0:
- valid_tags, server_handles = add_items(add_tags)
- valid_tags = self._group_valid_tags[sub_group] + valid_tags
- if len(del_tags) > 0:
- remove_items(del_tags)
- valid_tags = [t for t in valid_tags if t not in del_tags]
- self._group_tags[sub_group] = tags
- self._group_valid_tags[sub_group] = valid_tags
-
- if source == 'hybrid': data_source = SOURCE_DEVICE
- # Existing group
- else:
- tags = self._group_tags[sub_group]
- valid_tags = self._group_valid_tags[sub_group]
- if sync:
- server_handles = [item.ServerHandle for item in opc_items]
- tag_value = {}
- tag_quality = {}
- tag_time = {}
- tag_error = {}
-
- # Sync Read
- if sync:
- values = []
- errors = []
- qualities = []
- timestamps= []
-
- if len(valid_tags) > 0:
- server_handles.insert(0,0)
-
- if source != 'hybrid':
- data_source = SOURCE_CACHE if source == 'cache' else SOURCE_DEVICE
- if self.trace: self.trace('SyncRead(%s)' % data_source)
-
- try:
- values, errors, qualities, timestamps = opc_group.SyncRead(data_source, len(server_handles)-1, server_handles)
- except pythoncom.com_error as err:
- error_msg = 'SyncRead: %s' % self._get_error_str(err)
- raise OPCError(error_msg)
- for i,tag in enumerate(valid_tags):
- tag_value[tag] = values[i]
- tag_quality[tag] = qualities[i]
- tag_time[tag] = timestamps[i]
- tag_error[tag] = errors[i]
- # Async Read
- else:
- if len(valid_tags) > 0:
- if self._tx_id >= 0xFFFF:
- self._tx_id = 0
- self._tx_id += 1
-
- if source != 'hybrid':
- data_source = SOURCE_CACHE if source == 'cache' else SOURCE_DEVICE
- if self.trace: self.trace('AsyncRefresh(%s)' % data_source)
- try:
- opc_group.AsyncRefresh(data_source, self._tx_id)
- except pythoncom.com_error as err:
- error_msg = 'AsyncRefresh: %s' % self._get_error_str(err)
- raise OPCError(error_msg)
- tx_id = 0
- start = time.time() * 1000
-
- while tx_id != self._tx_id:
- now = time.time() * 1000
- if now - start > timeout:
- raise TimeoutError('Callback: Timeout waiting for data')
- if self.callback_queue.empty():
- pythoncom.PumpWaitingMessages()
- else:
- tx_id, handles, values, qualities, timestamps = self.callback_queue.get()
-
- for i,h in enumerate(handles):
- tag = self._group_handles_tag[sub_group][h]
- tag_value[tag] = values[i]
- tag_quality[tag] = qualities[i]
- tag_time[tag] = timestamps[i]
-
- for tag in tags:
- if tag in tag_value:
- if (not sync and len(valid_tags) > 0) or (sync and tag_error[tag] == 0):
- value = tag_value[tag]
- if type(value) == pywintypes.TimeType:
- value = str(value)
- quality = quality_str(tag_quality[tag])
- timestamp = str(tag_time[tag])
- else:
- value = None
- quality = 'Error'
- timestamp = None
- if include_error:
- error_msgs[tag] = self._opc.GetErrorString(tag_error[tag]).strip('\r\n')
- else:
- value = None
- quality = 'Error'
- timestamp = None
- if include_error and not tag in error_msgs:
- error_msgs[tag] = ''
- if single:
- if include_error:
- yield (value, quality, timestamp, error_msgs[tag])
- else:
- yield (value, quality, timestamp)
- else:
- if include_error:
- yield (tag, value, quality, timestamp, error_msgs[tag])
- else:
- yield (tag, value, quality, timestamp)
- if group == None:
- try:
- if not sync and opc_group.Name in self._group_hooks:
- if self.trace: self.trace('CloseEvents(%s)' % opc_group.Name)
- self._group_hooks[opc_group.Name].close()
- if self.trace: self.trace('RemoveGroup(%s)' % opc_group.Name)
- opc_groups.Remove(opc_group.Name)
- except pythoncom.com_error as err:
- error_msg = 'RemoveGroup: %s' % self._get_error_str(err)
- raise OPCError(error_msg)
- except pythoncom.com_error as err:
- error_msg = 'read: %s' % self._get_error_str(err)
- raise OPCError(error_msg)
- def read(self, tags=None, group=None, size=None, pause=0, source='hybrid', update=-1, timeout=5000, sync=False, include_error=False, rebuild=False):
- """Return list of (value, quality, time) tuples for the specified tag(s)"""
- tags_list, single, valid = type_check(tags)
- if not valid:
- raise TypeError("read(): 'tags' parameter must be a string or a list of strings")
- num_health_tags = len([t for t in tags_list if t[:1] == '@'])
- num_opc_tags = len([t for t in tags_list if t[:1] != '@'])
- if num_health_tags > 0:
- if num_opc_tags > 0:
- raise TypeError("read(): system health and OPC tags cannot be included in the same group")
- results = self._read_health(tags)
- else:
- results = self.iread(tags, group, size, pause, source, update, timeout, sync, include_error, rebuild)
- if single:
- return list(results)[0]
- else:
- return list(results)
- def _read_health(self, tags):
- """Return values of special system health monitoring tags"""
- self._update_tx_time()
- tags, single, valid = type_check(tags)
-
- time_str = time.strftime('%x %H:%M:%S')
- results = []
-
- for t in tags:
- if t == '@MemFree': value = SystemHealth.mem_free()
- elif t == '@MemUsed': value = SystemHealth.mem_used()
- elif t == '@MemTotal': value = SystemHealth.mem_total()
- elif t == '@MemPercent': value = SystemHealth.mem_percent()
- elif t == '@DiskFree': value = SystemHealth.disk_free()
- elif t == '@SineWave': value = SystemHealth.sine_wave()
- elif t == '@SawWave': value = SystemHealth.saw_wave()
- elif t == '@CpuUsage':
- if self.cpu == None:
- self.cpu = SystemHealth.CPU()
- time.sleep(0.1)
- value = self.cpu.get_usage()
-
- else:
- value = None
-
- m = re.match('@TaskMem\((.*?)\)', t)
- if m:
- image_name = m.group(1)
- value = SystemHealth.task_mem(image_name)
- m = re.match('@TaskCpu\((.*?)\)', t)
- if m:
- image_name = m.group(1)
- value = SystemHealth.task_cpu(image_name)
-
- m = re.match('@TaskExists\((.*?)\)', t)
- if m:
- image_name = m.group(1)
- value = SystemHealth.task_exists(image_name)
-
- if value == None:
- quality = 'Error'
- else:
- quality = 'Good'
-
- if single:
- results.append((value, quality, time_str))
- else:
- results.append((t, value, quality, time_str))
-
- return results
- def iwrite(self, tag_value_pairs, size=None, pause=0, include_error=False):
- """Iterable version of write()"""
- try:
- self._update_tx_time()
- pythoncom.CoInitialize()
- def _valid_pair(p):
- if type(p) in (list, tuple) and len(p) >= 2 and type(p[0]) in (str,bytes):
- return True
- else:
- return False
- if type(tag_value_pairs) not in (list, tuple):
- raise TypeError("write(): 'tag_value_pairs' parameter must be a (tag, value) tuple or a list of (tag,value) tuples")
- if tag_value_pairs == None:
- tag_value_pairs = ['']
- single = False
- elif type(tag_value_pairs[0]) in (str,bytes):
- tag_value_pairs = [tag_value_pairs]
- single = True
- else:
- single = False
- invalid_pairs = [p for p in tag_value_pairs if not _valid_pair(p)]
- if len(invalid_pairs) > 0:
- raise TypeError("write(): 'tag_value_pairs' parameter must be a (tag, value) tuple or a list of (tag,value) tuples")
-
- names = [tag[0] for tag in tag_value_pairs]
- tags = [tag[0] for tag in tag_value_pairs]
- values = [tag[1] for tag in tag_value_pairs]
- # Break-up tags & values into groups of 'size' tags
- if size:
- name_groups = [names[i:i+size] for i in range(0, len(names), size)]
- tag_groups = [tags[i:i+size] for i in range(0, len(tags), size)]
- value_groups = [values[i:i+size] for i in range(0, len(values), size)]
- else:
- name_groups = [names]
- tag_groups = [tags]
- value_groups = [values]
-
- num_groups = len(tag_groups)
- status = []
-
- for gid in range(num_groups):
- if gid > 0 and pause > 0: time.sleep(pause/1000.0)
- opc_groups = self._opc.OPCGroups
- opc_group = opc_groups.Add()
- opc_items = opc_group.OPCItems
- names = name_groups[gid]
- tags = tag_groups[gid]
- values = value_groups[gid]
-
- names.insert(0,0)
- errors = []
-
- try:
- errors = opc_items.Validate(len(names)-1, names)
- except:
- pass
-
- n = 1
- valid_tags = []
- valid_values = []
- client_handles = []
- error_msgs = {}
-
- for i, tag in enumerate(tags):
- if errors[i] == 0:
- valid_tags.append(tag)
- valid_values.append(values[i])
- client_handles.append(n)
- error_msgs[tag] = ''
- n += 1
- elif include_error:
- error_msgs[tag] = self._opc.GetErrorString(errors[i])
- client_handles.insert(0,0)
- valid_tags.insert(0,0)
- server_handles = []
- errors = []
-
- try:
- server_handles, errors = opc_items.AddItems(len(client_handles)-1, valid_tags, client_handles)
- except:
- pass
-
- valid_tags_tmp = []
- valid_values_tmp = []
- server_handles_tmp = []
- valid_tags.pop(0)
-
- for i, tag in enumerate(valid_tags):
- if errors[i] == 0:
- valid_tags_tmp.append(tag)
- valid_values_tmp.append(valid_values[i])
- server_handles_tmp.append(server_handles[i])
- error_msgs[tag] = ''
- elif include_error:
- error_msgs[tag] = self._opc.GetErrorString(errors[i])
-
- valid_tags = valid_tags_tmp
- valid_values = valid_values_tmp
- server_handles = server_handles_tmp
- server_handles.insert(0,0)
- valid_values.insert(0,0)
- errors = []
- if len(valid_values) > 1:
- try:
- errors = opc_group.SyncWrite(len(server_handles)-1, server_handles, valid_values)
- except:
- pass
- n = 0
- for tag in tags:
- if tag in valid_tags:
- if errors[n] == 0:
- status = 'Success'
- else:
- status = 'Error'
- if include_error: error_msgs[tag] = self._opc.GetErrorString(errors[n])
- n += 1
- else:
- status = 'Error'
- # OPC servers often include newline and carriage return characters
- # in their error message strings, so remove any found.
- if include_error: error_msgs[tag] = error_msgs[tag].strip('\r\n')
- if single:
- if include_error:
- yield (status, error_msgs[tag])
- else:
- yield status
- else:
- if include_error:
- yield (tag, status, error_msgs[tag])
- else:
- yield (tag, status)
- opc_groups.Remove(opc_group.Name)
- except pythoncom.com_error as err:
- error_msg = 'write: %s' % self._get_error_str(err)
- raise OPCError(error_msg)
- def write(self, tag_value_pairs, size=None, pause=0, include_error=False):
- """Write list of (tag, value) pair(s) to the server"""
- if type(tag_value_pairs) in (list, tuple) and type(tag_value_pairs[0]) in (list, tuple):
- single = False
- else:
- single = True
- status = self.iwrite(tag_value_pairs, size, pause, include_error)
- if single:
- return list(status)[0]
- else:
- return list(status)
- def groups(self):
- """Return a list of active tag groups"""
- return self._groups.keys()
- def remove(self, groups):
- """Remove the specified tag group(s)"""
- try:
- pythoncom.CoInitialize()
- opc_groups = self._opc.OPCGroups
- if type(groups) in (str,bytes):
- groups = [groups]
- single = True
- else:
- single = False
-
- status = []
- for group in groups:
- if group in self._groups:
- for i in range(self._groups[group]):
- sub_group = '%s.%d' % (group, i)
-
- if sub_group in self._group_hooks:
- if self.trace: self.trace('CloseEvents(%s)' % sub_group)
- self._group_hooks[sub_group].close()
- try:
- if self.trace: self.trace('RemoveGroup(%s)' % sub_group)
- errors = opc_groups.Remove(sub_group)
- except pythoncom.com_error as err:
- error_msg = 'RemoveGroup: %s' % self._get_error_str(err)
- raise OPCError(error_msg)
-
- del(self._group_tags[sub_group])
- del(self._group_valid_tags[sub_group])
- del(self._group_handles_tag[sub_group])
- del(self._group_server_handles[sub_group])
- del(self._groups[group])
- except pythoncom.com_error as err:
- error_msg = 'remove: %s' % self._get_error_str(err)
- raise OPCError(error_msg)
-
- def iproperties(self, tags, id=None):
- """Iterable version of properties()"""
- try:
- self._update_tx_time()
- pythoncom.CoInitialize()
- tags, single_tag, valid = type_check(tags)
- if not valid:
- raise TypeError("properties(): 'tags' parameter must be a string or a list of strings")
- try:
- id.remove(0)
- include_name = True
- except:
- include_name = False
- if id != None:
- descriptions= []
-
- if isinstance(id, list) or isinstance(id, tuple):
- property_id = list(id)
- single_property = False
- else:
- property_id = [id]
- single_property = True
- for i in property_id:
- descriptions.append('Property id %d' % i)
- else:
- single_property = False
- properties = []
- for tag in tags:
- if id == None:
- descriptions = []
- property_id = []
- count, property_id, descriptions, datatypes = self._opc.QueryAvailableProperties(tag)
- # Remove bogus negative property id (not sure why this sometimes happens)
- tag_properties = list(map(lambda x, y: (x, y), property_id, descriptions))
- property_id = [p for p, d in tag_properties if p > 0]
- descriptions = [d for p, d in tag_properties if p > 0]
- property_id.insert(0, 0)
- values = []
- errors = []
- values, errors = self._opc.GetItemProperties(tag, len(property_id)-1, property_id)
-
- property_id.pop(0)
- values = [str(v) if type(v) == pywintypes.TimeType else v for v in values]
- # Replace variant id with type strings
- try:
- i = property_id.index(1)
- values[i] = vt[values[i]]
- except:
- pass
- # Replace quality bits with quality strings
- try:
- i = property_id.index(3)
- values[i] = quality_str(values[i])
- except:
- pass
- # Replace access rights bits with strings
- try:
- i = property_id.index(5)
- values[i] = ACCESS_RIGHTS[values[i]]
- except:
- pass
- if id != None:
- if single_property:
- if single_tag:
- tag_properties = values
- else:
- tag_properties = [values]
- else:
- tag_properties = list(map(lambda x, y: (x, y), property_id, values))
- else:
- tag_properties = list(map(lambda x, y, z: (x, y, z), property_id, descriptions, values))
- tag_properties.insert(0, (0, 'Item ID (virtual property)', tag))
- if include_name: tag_properties.insert(0, (0, tag))
- if not single_tag: tag_properties = [tuple([tag] + list(p)) for p in tag_properties]
-
- for p in tag_properties: yield p
- except pythoncom.com_error as err:
- error_msg = 'properties: %s' % self._get_error_str(err)
- raise OPCError(error_msg)
- def properties(self, tags, id=None):
- """Return list of property tuples (id, name, value) for the specified tag(s) """
- if type(tags) not in (list, tuple) and type(id) not in (type(None), list, tuple):
- single = True
- else:
- single = False
- props = self.iproperties(tags, id)
- if single:
- return list(props)[0]
- else:
- return list(props)
- def ilist(self, paths='*', recursive=False, flat=False, include_type=False):
- """Iterable version of list()"""
- try:
- self._update_tx_time()
- pythoncom.CoInitialize()
-
- try:
- browser = self._opc.CreateBrowser()
- # For OPC servers that don't support browsing
- except:
- return
- paths, single, valid = type_check(paths)
- if not valid:
- raise TypeError("list(): 'paths' parameter must be a string or a list of strings")
- if len(paths) == 0: paths = ['*']
- nodes = {}
- for path in paths:
-
- if flat:
- browser.MoveToRoot()
- browser.Filter = ''
- browser.ShowLeafs(True)
- pattern = re.compile('^%s$' % wild2regex(path) , re.IGNORECASE)
- matches = filter(pattern.search, browser)
- if include_type: matches = [(x, node_type) for x in matches]
- for node in matches: yield node
- continue
-
- queue = []
- queue.append(path)
- while len(queue) > 0:
- tag = queue.pop(0)
-
- browser.MoveToRoot()
- browser.Filter = ''
- pattern = None
- path_str = '/'
- path_list = tag.replace('.','/').split('/')
- path_list = [p for p in path_list if len(p) > 0]
- found_filter = False
- path_postfix = '/'
- for i, p in enumerate(path_list):
- if found_filter:
- path_postfix += p + '/'
- elif p.find('*') >= 0:
- pattern = re.compile('^%s$' % wild2regex(p) , re.IGNORECASE)
- found_filter = True
- elif len(p) != 0:
- pattern = re.compile('^.*$')
- browser.ShowBranches()
- # Branch node, so move down
- if len(browser) > 0:
- try:
- browser.MoveDown(p)
- path_str += p + '/'
- except:
- if i < len(path_list)-1: return
- pattern = re.compile('^%s$' % wild2regex(p) , re.IGNORECASE)
- # Leaf node, so append all remaining path parts together
- # to form a single search expression
- else:
- p = string.join(path_list[i:], '.')
- pattern = re.compile('^%s$' % wild2regex(p) , re.IGNORECASE)
- break
-
- browser.ShowBranches()
- if len(browser) == 0:
- browser.ShowLeafs(False)
- lowest_level = True
- node_type = 'Leaf'
- else:
- lowest_level = False
- node_type = 'Branch'
- matches = filter(pattern.search, browser)
-
- if not lowest_level and recursive:
- queue += [path_str + x + path_postfix for x in matches]
- else:
- if lowest_level: matches = [exceptional(browser.GetItemID,x)(x) for x in matches]
- if include_type: matches = [(x, node_type) for x in matches]
- for node in matches:
- if not node in nodes: yield node
- nodes[node] = True
- except pythoncom.com_error as err:
- error_msg = 'list: %s' % self._get_error_str(err)
- raise OPCError(error_msg)
- def list(self, paths='*', recursive=False, flat=False, include_type=False):
- """Return list of item nodes at specified path(s) (tree browser)"""
- nodes = self.ilist(paths, recursive, flat, include_type)
- return list(nodes)
- def servers(self, opc_host='localhost'):
- """Return list of available OPC servers"""
-
- try:
- pythoncom.CoInitialize()
- servers = self._opc.GetOPCServers(opc_host)
- servers = [s for s in servers if s != None]
- return servers
- except pythoncom.com_error as err:
- error_msg = 'servers: %s' % self._get_error_str(err)
- raise OPCError(error_msg)
- def info(self):
- """Return list of (name, value) pairs about the OPC server"""
- try:
- self._update_tx_time()
- pythoncom.CoInitialize()
- info_list = []
- if self._open_serv:
- mode = 'OpenOPC'
- else:
- mode = 'DCOM'
-
- info_list += [('Protocol', mode)]
- if mode == 'OpenOPC':
- info_list += [('Gateway Host', '%s:%s' % (self._open_host, self._open_port))]
- info_list += [('Gateway Version', '%s' % __version__)]
- info_list += [('Class', self.opc_class)]
- info_list += [('Client Name', self._opc.ClientName)]
- info_list += [('OPC Host', self.opc_host)]
- info_list += [('OPC Server', self._opc.ServerName)]
- info_list += [('State', OPC_STATUS[self._opc.ServerState])]
- info_list += [('Version', '%d.%d (Build %d)' % (self._opc.MajorVersion, self._opc.MinorVersion, self._opc.BuildNumber))]
- try:
- browser = self._opc.CreateBrowser()
- browser_type = BROWSER_TYPE[browser.Organization]
- except:
- browser_type = 'Not Supported'
- info_list += [('Browser', browser_type)]
- info_list += [('Start Time', str(self._opc.StartTime))]
- info_list += [('Current Time', str(self._opc.CurrentTime))]
- info_list += [('Vendor', self._opc.VendorInfo)]
- return info_list
- except pythoncom.com_error as err:
- error_msg = 'info: %s' % self._get_error_str(err)
- raise OPCError(error_msg)
- def ping(self):
- """Check if we are still talking to the OPC server"""
- try:
- # Convert OPC server time to milliseconds
- opc_serv_time = int(float(self._opc.CurrentTime) * 1000000.0)
- if opc_serv_time == self._prev_serv_time:
- return False
- else:
- self._prev_serv_time = opc_serv_time
- return True
- except pythoncom.com_error:
- return False
-
- def _get_error_str(self, err):
- """Return the error string for a OPC or COM error code"""
- hr, msg, exc, arg = err.args
-
- if exc == None:
- error_str = str(msg)
- else:
- scode = exc[5]
- try:
- opc_err_str = unicode(self._opc.GetErrorString(scode)).strip('\r\n')
- except:
- opc_err_str = None
- try:
- com_err_str = unicode(pythoncom.GetScodeString(scode)).strip('\r\n')
- except:
- com_err_str = None
- # OPC error codes and COM error codes are overlapping concepts,
- # so we combine them together into a single error message.
-
- if opc_err_str == None and com_err_str == None:
- error_str = str(scode)
- elif opc_err_str == com_err_str:
- error_str = opc_err_str
- elif opc_err_str == None:
- error_str = com_err_str
- elif com_err_str == None:
- error_str = opc_err_str
- else:
- error_str = '%s (%s)' % (opc_err_str, com_err_str)
-
- return error_str
- def _update_tx_time(self):
- """Update the session's last transaction time in the Gateway Service"""
- if self._open_serv:
- self._open_serv._tx_times[self._open_guid] = time.time()
- def __getitem__(self, key):
- """Read single item (tag as dictionary key)"""
- value, quality, time = self.read(key)
- return value
-
- def __setitem__(self, key, value):
- """Write single item (tag as dictionary key)"""
- self.write((key, value))
- return
|