Source code for btgsolutions_dataservices.websocket.hfn_websocket_client


from typing import Optional, List
from ..exceptions import WSTypeError, DelayedError, FeedError
from ..rest import Authenticator
from ..config import hfn_socket_urls, MAX_WS_RECONNECT_RETRIES, VALID_STREAM_TYPES, VALID_COUNTRIES, REALTIME, BR
from .websocket_default_functions import _on_open, _on_message, _on_error, _on_close
import websocket 
import json
import ssl
import threading

[docs]class HFNWebSocketClient: """ This class connects with BTG Solutions Data Services HFN WebSocket, receiving high frequency news in realtime or delayed feeds. * Main use case: >>> from btgsolutions_dataservices import HFNWebSocketClient >>> ws = HFNWebSocketClient( >>> api_key='YOUR_API_KEY', >>> stream_type='realtime', >>> country='brazil', >>> ssl=True >>> ) >>> ws.run() >>> ws.get_latest_news() >>> ws.close() Parameters ---------------- api_key: str User identification key. Field is required. stream_type: str Websocket connection feed. Options: 'realtime', 'delayed'. Field is not required. Default: 'realtime'. country: str Country name. Options: 'brazil' or 'chile'. Field is not required. Default: 'brazil'. ssl: bool Enable or disable ssl configuration. Field is not required. Default: True (enable). """ def __init__( self, api_key:str, stream_type:Optional[str] = REALTIME, country:Optional[str] = BR, ssl:Optional[bool] = True, **kwargs, ): self.api_key = api_key self.ssl = ssl self.__authenticator = Authenticator(self.api_key) self.__nro_reconnect_retries = 0 if stream_type not in VALID_STREAM_TYPES: raise FeedError(f"Must provide a valid 'stream_type' parameter. Valid options are: {VALID_STREAM_TYPES}") if country not in VALID_COUNTRIES: raise FeedError(f"Must provide a valid 'country' parameter. Valid options are: {VALID_COUNTRIES}") try: self.url = hfn_socket_urls[country][stream_type] except: raise WSTypeError(f"There is no WebSocket type for your specifications (stream_type:{stream_type}, country:{country})\nPlease check your request parameters and try again") self.websocket_cfg = kwargs
[docs] def run( self, on_open = None, on_message = None, on_error = None, on_close = None, reconnect=True ): """ Initializes a connection to websocket and starts to receive high frequency news. Parameters ---------- on_open: function - Called at opening connection to websocket. - Field is not required. - Default: prints that the connection was opened in case of success. on_message: function - Called every time it receives a message. - Arguments: 1. Data received from the server. - Field is not required. - Default: prints the data. on_error: function - Called when a error occurs. - Arguments: 1. Exception object. - Field is not required. - Default: prints the error. on_close: function - Called when connection is closed. - Arguments: 1. close_status_code. 2. close_msg. - Field is not required. - Default: prints a message that the connection was closed. reconnect: bool Try reconnect if connection is closed. Field is not required. Default: True. """ if on_open is None: on_open = _on_open if on_message is None: on_message = _on_message if on_error is None: on_error = _on_error if on_close is None: on_close = _on_close def intermediary_on_open(ws): on_open() self.__nro_reconnect_retries = 0 def intermediary_on_message(ws, data): on_message(data) def intermediary_on_error(ws, error): on_error(error) def intermediary_on_close(ws, close_status_code, close_msg): on_close(close_status_code, close_msg) if reconnect: if self.__nro_reconnect_retries == MAX_WS_RECONNECT_RETRIES: print(f"### Fail retriyng reconnect") return self.__nro_reconnect_retries +=1 print(f"### Reconnecting.... Attempts: {self.__nro_reconnect_retries}/{MAX_WS_RECONNECT_RETRIES}") self.run(on_open, on_message, on_error, on_close, reconnect) self.ws = websocket.WebSocketApp( url=self.url, on_open=intermediary_on_open, on_message=intermediary_on_message, on_error=intermediary_on_error, on_close=intermediary_on_close, header={ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.54 Safari/537.36", "Sec-WebSocket-Protocol": self.__authenticator.token, } ) ssl_conf = {} if self.ssl else {"sslopt": {"cert_reqs": ssl.CERT_NONE}} wst = threading.Thread(target=self.ws.run_forever, kwargs=ssl_conf) wst.daemon = True wst.start() while True: if self.ws.sock is not None and self.ws.sock.connected: break pass
def __send(self, data): """ Class method to be used internally. Sends data to websocket. """ if not isinstance(data, str): data = json.dumps(data) print(f'Sending data: {data}') return self.ws.send(data)
[docs] def close(self): """ Closes connection with websocket. """ self.ws.close()
[docs] def get_latest_news(self): """ Get the latest news from our High Frequency News service. """ self.__send({'action':'latest_news'})