from typing import Optional, List
from ..exceptions import WSTypeError
from ..rest import Authenticator
from ..config import broker_analytics_socket_urls, MAX_WS_RECONNECT_RETRIES, REALTIME, BR
from .websocket_default_functions import _on_open, _on_message, _on_error, _on_close
import websocket
import json
import ssl
import threading
[docs]
class BrokerAnalyticsWebSocketClient:
"""
This class connects with BTG Solutions Data Services Broker Analytics WebSocket,
receiving broker analytics events (top tickers by broker and top brokers by ticker).
* Main use case:
>>> from btgsolutions_dataservices import BrokerAnalyticsWebSocketClient
>>> ws = BrokerAnalyticsWebSocketClient(
>>> api_key='YOUR_API_KEY',
>>> ssl=True
>>> )
>>> ws.run()
>>> ws.available_tickers()
>>> ws.available_brokers()
>>> ws.subscribe_top_tickers(n=10, brokers=['85'])
>>> ws.subscribe_top_brokers(n=5)
>>> ws.subscribed_to()
>>> ws.get_last_event(analytics_type='top_tickers', n=3, brokers=['85', '3'])
>>> ws.get_last_event(analytics_type='top_brokers', n=5)
>>> ws.unsubscribe_top_tickers(brokers=['85'])
>>> ws.unsubscribe_top_brokers()
>>> ws.close()
Parameters
----------------
api_key: str
User identification key.
Field is required.
ssl: bool
Enable or disable ssl configuration.
Field is not required. Default: True (enable).
"""
def __init__(
self,
api_key: str,
ssl: Optional[bool] = True,
**kwargs,
):
self.api_key = api_key
self.ssl = ssl
self.__authenticator = Authenticator(self.api_key)
self.__nro_reconnect_retries = 0
try:
self.url = broker_analytics_socket_urls[BR][REALTIME]
except Exception:
raise WSTypeError(
"There is no WebSocket type for Broker Analytics (brazil/realtime).\nPlease check your request parameters and try again"
)
self.websocket_cfg = kwargs
[docs]
def run(
self,
on_open=None,
on_message=None,
on_error=None,
on_close=None,
reconnect=True
):
"""
Initializes a connection to websocket and starts to receive Broker Analytics events.
Parameters
----------
on_open: function
- Called at opening connection to websocket.
- Field is not required.
- Default: prints that the connection was opened in case of success.
on_message: function
- Called every time it receives a message.
- Arguments:
1. Data received from the server.
- Field is not required.
- Default: prints the data.
on_error: function
- Called when a error occurs.
- Arguments:
1. Exception object.
- Field is not required.
- Default: prints the error.
on_close: function
- Called when connection is closed.
- Arguments:
1. close_status_code.
2. close_msg.
- Field is not required.
- Default: prints a message that the connection was closed.
reconnect: bool
Try reconnect if connection is closed.
Field is not required.
Default: True.
"""
if on_open is None:
on_open = _on_open
if on_message is None:
on_message = _on_message
if on_error is None:
on_error = _on_error
if on_close is None:
on_close = _on_close
def intermediary_on_open(ws):
on_open()
self.__nro_reconnect_retries = 0
def intermediary_on_message(ws, data):
on_message(data)
def intermediary_on_error(ws, error):
on_error(error)
def intermediary_on_close(ws, close_status_code, close_msg):
on_close(close_status_code, close_msg)
if reconnect:
if self.__nro_reconnect_retries == MAX_WS_RECONNECT_RETRIES:
print("### Fail retriyng reconnect")
return
self.__nro_reconnect_retries += 1
print(f"### Reconnecting.... Attempts: {self.__nro_reconnect_retries}/{MAX_WS_RECONNECT_RETRIES}")
self.run(on_open, on_message, on_error, on_close, reconnect)
self.ws = websocket.WebSocketApp(
url=self.url,
on_open=intermediary_on_open,
on_message=intermediary_on_message,
on_error=intermediary_on_error,
on_close=intermediary_on_close,
header={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.54 Safari/537.36",
"Sec-WebSocket-Protocol": self.__authenticator.token,
}
)
ssl_conf = {} if self.ssl else {"sslopt": {"cert_reqs": ssl.CERT_NONE}}
wst = threading.Thread(target=self.ws.run_forever, kwargs=ssl_conf)
wst.daemon = True
wst.start()
while True:
if self.ws.sock is not None and self.ws.sock.connected:
break
pass
def __send(self, data):
if not isinstance(data, str):
data = json.dumps(data)
print(f"Sending data: {data}")
return self.ws.send(data)
[docs]
def close(self):
"""
Closes connection with websocket.
"""
self.ws.close()
[docs]
def subscribe_top_tickers(self, n: int = 10, brokers: Optional[List[str]] = None):
"""
Subscribes to top tickers by broker.
Parameters
----------
n: int
Number of results.
Field is not required. Default: 10.
brokers: list
Broker ids filter.
Field is not required.
"""
params = {'type': 'top_tickers', 'n': n}
if brokers is not None:
params['brokers'] = brokers
self.__send({'action': 'subscribe', 'params': params})
[docs]
def subscribe_top_brokers(self, n: int = 5, tickers: Optional[List[str]] = None):
"""
Subscribes to top brokers by ticker.
Parameters
----------
n: int
Number of results.
Field is not required. Default: 5.
tickers: list
Ticker symbols filter.
Field is not required.
"""
params = {'type': 'top_brokers', 'n': n}
if tickers is not None:
params['tickers'] = tickers
self.__send({'action': 'subscribe', 'params': params})
[docs]
def unsubscribe_top_tickers(self, brokers: Optional[List[str]] = None):
"""
Unsubscribes from top tickers by broker.
Parameters
----------
brokers: list
Broker ids filter.
Field is not required.
"""
params = {'type': 'top_tickers'}
if brokers is not None:
params['brokers'] = brokers
self.__send({'action': 'unsubscribe', 'params': params})
[docs]
def unsubscribe_top_brokers(self, tickers: Optional[List[str]] = None):
"""
Unsubscribes from top brokers by ticker.
Parameters
----------
tickers: list
Ticker symbols filter.
Field is not required.
"""
params = {'type': 'top_brokers'}
if tickers is not None:
params['tickers'] = tickers
self.__send({'action': 'unsubscribe', 'params': params})
[docs]
def get_last_event(
self,
analytics_type: Optional[str] = None,
n: Optional[int] = None,
brokers: Optional[List[str]] = None,
tickers: Optional[List[str]] = None,
):
"""
Returns latest broker analytics event.
Parameters
----------
analytics_type: str
Analytics type.
Options: 'top_tickers' or 'top_brokers'.
Field is not required.
n: int
Number of results.
Field is not required.
brokers: list
Broker ids filter.
Field is not required.
tickers: list
Ticker symbols filter.
Field is not required.
"""
payload = {'action': 'get_last_event'}
if analytics_type is not None:
params = {'type': analytics_type}
if n is not None:
params['n'] = n
if brokers is not None:
params['brokers'] = brokers
if tickers is not None:
params['tickers'] = tickers
payload['params'] = params
self.__send(payload)
[docs]
def subscribed_to(self):
"""
Returns current subscriptions.
"""
self.__send({'action': 'subscribed_to'})
[docs]
def available_tickers(self):
"""
Returns available tickers.
"""
self.__send({'action': 'available_tickers'})
[docs]
def available_brokers(self):
"""
Returns available brokers.
"""
self.__send({'action': 'available_brokers'})