# Reading from a subresource In this section, we'll implement a stream for the survey responses stream. This stream structure is a little different because it depends on the surveys stream. Start by creating a new base class for substreams: ```python class SurveyMonkeySubstream(HttpStream, ABC): def __init__(self, name: str, path: str, primary_key: Union[str, List[str]], parent_stream: Stream, **kwargs: Any) -> None: self._name = name self._path = path self._primary_key = primary_key self._parent_stream = parent_stream super().__init__(**kwargs) url_base = "https://api.surveymonkey.com" def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: links = response.json().get("links", {}) if "next" in links: return {"next_url": links["next"]} else: return {} def request_params( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None ) -> MutableMapping[str, Any]: if next_page_token: return urlparse(next_page_token["next_url"]).query else: return {"per_page": _PAGE_SIZE} def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: yield from response.json().get("data", []) @property def name(self) -> str: return self._name def path( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> str: try: return self._path.format(stream_slice=stream_slice) except Exception as e: raise e @property def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: return self._primary_key def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: for _slice in self._parent_stream.stream_slices(): for parent_record in self._parent_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=_slice): yield parent_record ``` This class is similar to the base class, but it does not support incremental reads, and its stream slices are generated by reading records from a parent stream. This is how we'll ensure we always read all survey responses. Note that using this approach, the connector will checkpoint after reading responses for each survey. Don't forget to update the `streams` method to also instantiate the surveys responses stream: ```python def streams(self, config: Mapping[str, Any]) -> List[Stream]: auth = TokenAuthenticator(token=config["access_token"]) surveys = SurveyMonkeyBaseStream(name="surveys", path="/v3/surveys", primary_key="id", data_field="data", cursor_field="date_modified", authenticator=auth) survey_responses = SurveyMonkeySubstream(name="survey_responses", path="/v3/surveys/{stream_slice[id]}/responses/", primary_key="id", authenticator=auth, parent_stream=surveys) return [ surveys, survey_responses ] ``` Before moving on, we'll enable request caching on the surveys stream to avoid fetching the records both for the surveys stream and for the survey responses stream. You can do this by setting the `use_cache` property to true on the `SurveyMonkeyBaseStream` class. ``` @property def use_cache(self) -> bool: return True ``` Now add the stream to the configured catalog: ```json { "streams": [ { "stream": { "name": "surveys", "json_schema": {}, "supported_sync_modes": ["full_refresh", "incremental"] }, "sync_mode": "incremental", "destination_sync_mode": "overwrite" }, { "stream": { "name": "survey_responses", "json_schema": {}, "supported_sync_modes": ["full_refresh"] }, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite" } ] } ``` and create a new schema file in `source_survey_monkey_demo/schemas/survey_responses.json`. You can use the connector builder to generate the schema, or paste the one provided below: ```json { "$schema": "http://json-schema.org/schema#", "properties": { "analyze_url": { "type": ["string", "null"] }, "collect_stats": { "properties": { "status": { "properties": { "open": { "type": ["number", "null"] } }, "type": ["object", "null"] }, "total_count": { "type": ["number", "null"] }, "type": { "properties": { "weblink": { "type": ["number", "null"] } }, "type": ["object", "null"] } }, "type": ["object", "null"] }, "date_created": { "type": ["string", "null"] }, "date_modified": { "type": ["string", "null"] }, "href": { "type": ["string", "null"] }, "id": { "type": ["string", "null"] }, "language": { "type": ["string", "null"] }, "nickname": { "type": ["string", "null"] }, "preview": { "type": ["string", "null"] }, "question_count": { "type": ["number", "null"] }, "response_count": { "type": ["number", "null"] }, "title": { "type": ["string", "null"] } }, "type": "object" } ``` You should now be able to read your survey responses: ``` poetry run source-survey-monkey-demo read --config secrets/config.json --catalog integration_tests/configured_catalog.json ``` In the [next section](8-concurrency.md) we'll update the connector so it reads stream slices concurrently.