Source code for btgsolutions_dataservices.rest.book_scope

from datetime import datetime, timedelta, timezone
from typing import Optional, List
from ..exceptions import BadResponse
import requests
from ..config import url_api_v1
from .authenticator import Authenticator
from io import BytesIO
import pyarrow as pa
import pyarrow.parquet as pq

VALID_SELECT_OPTIONS = {"trades", "book_snapshot", "book_incremental"}
VALID_MARKET_TYPES = {"derivatives", "options", "stocks"}
MAX_MARKET_SCOPE_WINDOW = timedelta(minutes=10)
ROW_LIMIT = 10000


[docs] class BookScope: """ This class provides market scope data (trades, book snapshots, book incremental) for a given symbol and time window. * Main use case: >>> from btgsolutions_dataservices import BookScope >>> book_scope = BookScope( >>> api_key='YOUR_API_KEY', >>> ) >>> book_scope.get( >>> symbol='DOLM26', >>> market_type='derivatives', >>> start_time='2026-05-28T14:12:00Z', >>> end_time='2026-05-28T14:15:00Z', >>> ) Parameters ---------------- api_key: str User identification key. Field is required. """ def __init__( self, api_key: Optional[str], ): self.api_key = api_key self.token = Authenticator(self.api_key).token self.headers = {"authorization": f"authorization {self.token}"} self.base_url = url_api_v1 def _fetch_all_pages(self, endpoint_url: str, params: dict) -> bytes: """Fetches all pages via rpt_seq cursor and returns a single combined parquet.""" tables = [] rpt_seq = None while True: page_params = {**params} if rpt_seq is not None: page_params["rpt_seq"] = rpt_seq response = requests.get(endpoint_url, params=page_params, headers=self.headers, timeout=50) if response.status_code != 200: try: error_body = response.json() detail = error_body.get("detail", error_body.get("error", response.text)) except Exception: detail = response.text raise BadResponse(f"Error: {detail}") table = pq.read_table(BytesIO(response.content)) tables.append(table) if table.num_rows < ROW_LIMIT: break rpt_seq = table.column("rpt_seq")[-1].as_py() combined = pa.concat_tables(tables) buffer = BytesIO() pq.write_table(combined, buffer) return buffer.getvalue() @staticmethod def _combine_parquets(parquet_payloads: List[bytes]) -> bytes: """Combines multiple parquet payloads into a single parquet file.""" tables = [BookScope._normalize_timestamps(pq.read_table(BytesIO(payload))) for payload in parquet_payloads] combined = pa.concat_tables(tables, promote_options="default") buffer = BytesIO() pq.write_table(combined, buffer) return buffer.getvalue() @staticmethod def _normalize_timestamps(table: pa.Table) -> pa.Table: """Normalizes timestamp columns so tables can be concatenated safely.""" normalized = table for field in table.schema: if pa.types.is_timestamp(field.type): target_type = pa.timestamp("us", tz=field.type.tz) column_index = normalized.schema.get_field_index(field.name) normalized = normalized.set_column( column_index, field.name, normalized[field.name].cast(target_type), ) return normalized @staticmethod def _parse_iso8601(value: str, field_name: str) -> datetime: try: return datetime.fromisoformat(value.replace("Z", "+00:00")) except ValueError as exc: raise ValueError(f"{field_name} must be a valid ISO-8601 datetime string.") from exc @staticmethod def _resolve_scope(end_time: datetime) -> str: """Determines whether the request is 'intraday' or 'historical' based on end_time.""" now = datetime.now(timezone.utc) today_start = now.replace(hour=0, minute=0, second=0, microsecond=0) if end_time >= today_start: return "intraday" return "historical"
[docs] def get( self, symbol: str, market_type: str, start_time: str, end_time: str, select: List[str] = ["trades", "book_snapshot", "book_incremental"], aggregate_info: bool = False, ): """ Returns trades, book snapshot and book incremental data for the given symbol and time window. Pagination is handled automatically: if the API returns the maximum number of rows (10 000), additional pages are fetched using the rpt_seq cursor until all events in the requested horizon are retrieved. The caller always receives a single, complete DataFrame per dataset. Parameters ---------------- symbol: str The ticker symbol to query. Field is required. Example: 'PETR4', 'DOLM26'. market_type: str Market type. Field is required. Allowed values: 'derivatives', 'options', 'stocks'. start_time: str Start of the analysis window in ISO-8601 UTC format. Field is required. Example: '2026-05-28T14:12:00Z'. end_time: str End of the analysis window in ISO-8601 UTC format. Field is required. Example: '2026-05-28T14:15:00Z'. select: list of str Which datasets to return. Field is optional. Default: ['trades', 'book_snapshot', 'book_incremental']. Allowed values: 'trades', 'book_snapshot', 'book_incremental'. aggregate_info: bool When True, returns a single pandas.DataFrame containing the selected datasets. Field is optional. Default: False. Returns ---------------- dict or pandas.DataFrame When aggregate_info is False (default): dictionary with the selected keys ('trades', 'book_snapshot', 'book_incremental'), each mapping directly to a pandas.DataFrame with all rows for the requested time window. When aggregate_info is True: returns a single pandas.DataFrame with all selected datasets combined. """ normalized_market_type = market_type.lower() if normalized_market_type not in VALID_MARKET_TYPES: raise ValueError( f"Invalid market_type: {market_type}. Valid options are: {VALID_MARKET_TYPES}" ) invalid = set(select) - VALID_SELECT_OPTIONS if invalid: raise ValueError(f"Invalid select options: {invalid}. Valid options are: {VALID_SELECT_OPTIONS}") parsed_start_time = self._parse_iso8601(start_time, "start_time") parsed_end_time = self._parse_iso8601(end_time, "end_time") if parsed_start_time >= parsed_end_time: raise ValueError("start_time must be earlier than end_time.") if parsed_end_time - parsed_start_time > MAX_MARKET_SCOPE_WINDOW: raise ValueError("The maximum allowed range between start_time and end_time is 10 minutes.") base_params = dict(symbol=symbol, start_time=start_time, end_time=end_time, limit=ROW_LIMIT) scope = self._resolve_scope(parsed_end_time) endpoint_map = { "trades": f"{self.base_url}/marketdata/br/b3/book-scope/{scope}/{normalized_market_type}/trades", "book_snapshot": f"{self.base_url}/marketdata/br/b3/book-scope/{scope}/{normalized_market_type}/book-snapshot", "book_incremental": f"{self.base_url}/marketdata/br/b3/book-scope/{scope}/{normalized_market_type}/book-incremental", } result = {} parquet_payloads = [] for key in select: data = self._fetch_all_pages(endpoint_map[key], base_params) if aggregate_info: parquet_payloads.append(data) else: result[key] = pq.read_table(BytesIO(data)).to_pandas() if aggregate_info: return pq.read_table(BytesIO(self._combine_parquets(parquet_payloads))).to_pandas() return result