Skip to content

stop_times.py

fetch(cod_stop, session, fetch_conf)

Fetch the stops waiting time for a given stop reusing a session.

Passes some additional data to the fetch_log function. The CSV column names are: 'actual_date, cod_stop, resp_time, resp_status, resp_length, timeout, connection_error, max_connections, timeout_time'

Parameters:

Name Type Description Default
cod_stop str

The stop code in CRTM's format (e.g. 8_17491).

required
session object

The aiohttp ClientSession.

required
fetch_conf dict

Dictionary with configuration parameters for fetching the content.

required

Returns:

Type Description
str

Response text.

Source code in crtm_poll/crtm_api/stop_times.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
async def fetch(cod_stop, session, fetch_conf):
    """Fetch the stops waiting time for a given stop reusing a session.

    Passes some additional data to the fetch_log function. The  CSV column
    names are:
    'actual_date, cod_stop, resp_time, resp_status, resp_length, timeout,
        connection_error, max_connections, timeout_time'

    Arguments:
        cod_stop (str): The stop code in CRTM's format (e.g. 8_17491).
        session (object): The aiohttp ClientSession.
        fetch_conf (dict): Dictionary with configuration parameters for
            fetching the content.

    Returns:
        str: Response text.
    """
    global counter

    actual_time = None
    resp_time = None
    resp_status = None
    resp_length = None
    timeout = None
    connection_error = None

    url = 'https://www.crtm.es/widgets/api/GetStopsTimes.php'
    params = {
                'codStop': cod_stop,
                'type': 1,
                'orderBy': 2,
                'stopTimesByIti': 3
            }
    actual_time = datetime.datetime.now()
    try:
        async with session.get(url, params=params) as response:
            dt_2 = datetime.datetime.now()
            resp_time = (dt_2 - actual_time).total_seconds()
            resp_status = response.status
            resp_text = await response.text()
            resp_length = len(resp_text)
            timeout = False
            connection_error = False
            logger.info("Response time: " + str(resp_time)
                        + " code: " + str(resp_status)
                        + " length: " + str(resp_length))
            fetch_log(fetch_conf['log'], actual_time, cod_stop,
                      resp_time, resp_status,
                      resp_length, timeout, connection_error,
                      fetch_conf['max_connections'], fetch_conf['timeout'])
            return resp_text
    except asyncio.TimeoutError:
        dt_2 = datetime.datetime.now()
        resp_time = (dt_2 - actual_time).total_seconds()
        timeout = True
        connection_error = False
        logger.warning("Timeout")
        fetch_log(fetch_conf['log'], actual_time, cod_stop,
                  resp_time, resp_status,
                  resp_length, timeout, connection_error,
                  fetch_conf['max_connections'], fetch_conf['timeout'])
    except client_exceptions.ClientConnectorError:
        dt_2 = datetime.datetime.now()
        resp_time = (dt_2 - actual_time).total_seconds()
        timeout = False
        connection_error = True
        logger.warning("Connection error")
        fetch_log(fetch_conf['log'], actual_time, cod_stop,
                  resp_time, resp_status,
                  resp_length, timeout, connection_error,
                  fetch_conf['max_connections'], fetch_conf['timeout'])

fetch_log(fetch_log=None, *args)

Write the passed arguments as CSV to fetch_log if set.

Parameters:

Name Type Description Default
fetch_log str

Path to the fethc log file.

None
*args object

CSV line column values.

()
Source code in crtm_poll/crtm_api/stop_times.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def fetch_log(fetch_log=None, *args):
    """Write the passed arguments as CSV to fetch_log if set.

    Arguments:
        fetch_log (str): Path to the fethc log file.
        *args (object): CSV line column values.
    """
    if (fetch_log):
        csv_columns = 'actual_date,cod_stop,resp_time,resp_status,' \
                      'resp_length,timeout,connection_error,' \
                      'max_connections,timeout_time'
        log_csv = ",".join([str(arg) for arg in args])
        logger.debug("CSV fetch log line: " + log_csv)
        with FileLock(fetch_log + '.lock', timeout=10):
            path_exists = pathlib.Path(fetch_log).exists()
            with open(fetch_log, 'a+') as f:
                if (path_exists):
                    f.write('\n')
                else:
                    f.write(csv_columns + '\n')
                f.write(log_csv)

get_stop_times(cod_stop, fetch_conf)

Get the stop times for all the bus lines in a given stop.

Parameters:

Name Type Description Default
cod_stop str

The stop code in CRTM's format (e.g. 8_17491).

required
fetch_conf dict

Dictionary with configuration parameters for fetching the content.

required

Returns:

Type Description
list

API answer in JSON format. float: Request time in seconds.

Source code in crtm_poll/crtm_api/stop_times.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def get_stop_times(cod_stop, fetch_conf):
    """Get the stop times for all the bus lines in a given stop.

    Arguments:
        cod_stop (str): The stop code in CRTM's format (e.g. 8_17491).
        fetch_conf (dict): Dictionary with configuration parameters for
            fetching the content.

    Returns:
        list: API answer in JSON format.
        float: Request time in seconds.
    """
    json_array, total_time = get_stop_times_batch([cod_stop], fetch_conf)

    try:
        json = json_array[0]
        time = total_time

        return json, time
    except IndexError:
        logger.error("Empty answer")
        sys.exit(1)

get_stop_times_batch(cod_stops, fetch_conf)

Get the stop times for all the bus lines for the given stops.

Parameters:

Name Type Description Default
cod_stops list

List of stop codes in CRTM's format (e.g. 8_17491).

required
fetch_conf dict

Dictionary with configuration parameters for fetching the content.

required

Returns:

Type Description
list

API answers in JSON format. float: Total spent time in seconds.

Source code in crtm_poll/crtm_api/stop_times.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def get_stop_times_batch(cod_stops, fetch_conf):
    """Get the stop times for all the bus lines for the given stops.

    Arguments:
        cod_stops (list): List of stop codes in CRTM's format (e.g. 8_17491).
        fetch_conf (dict): Dictionary with configuration parameters for
            fetching the content.

    Returns:
        list: API answers in JSON format.
        float: Total spent time in seconds.
    """

    dt_1 = datetime.datetime.now()
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(run(cod_stops, fetch_conf))
    loop.run_until_complete(future)
    dt_2 = datetime.datetime.now()

    json_array = list(filter(None, future.result()))
    total_time = (dt_2 - dt_1).total_seconds()

    return json_array, total_time

get_stop_times_batch_parsed(cod_stops, fetch_conf)

Get the stop times for all the bus lines for the given stops parsing the JSON answer to CSV.

The CSV column names are: 'actual_date,cod_stop,cod_line,cod_issue,eta,destination_stop'

Parameters:

Name Type Description Default
cod_stops list

List of stop codes in CRTM's format (e.g. 8_17491).

required

Returns:

Type Description
list

Parsed API answers in CSV format. float: Total spent time in seconds.

Source code in crtm_poll/crtm_api/stop_times.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def get_stop_times_batch_parsed(cod_stops, fetch_conf):
    """Get the stop times for all the bus lines for the given stops parsing
    the JSON answer to CSV.

    The CSV column names are:
    'actual_date,cod_stop,cod_line,cod_issue,eta,destination_stop'

    Arguments:
        cod_stops (list): List of stop codes in CRTM's format (e.g. 8_17491).

    Returns:
        list: Parsed API answers in CSV format.
        float: Total spent time in seconds.
    """

    json_array, total_time = get_stop_times_batch(cod_stops, fetch_conf)

    csv_array = []

    for stop in json_array:
        try:
            split_stop = stop.split('{', 1)
        except AttributeError:
            logger.warning("Empty answer")
            continue

        if (len(split_stop) > 1):
            stop = '{' + split_stop[1]

        try:
            stop_json = json.loads(stop)
        except ValueError:
            logger.warning("json error")
            continue

        try:
            times = stop_json['stopTimes']['times']['Time']
            for time in times:
                selected_fields = [
                        stop_json['stopTimes']['actualDate'],
                        stop_json['stopTimes']['stop']['codStop'],
                        time['line']['codLine'],
                        time['codIssue'],
                        time['time'],
                        time['destinationStop']['codStop'],
                        ]
                row = ','.join(selected_fields)
                csv_array.append(row)
        except (KeyError, TypeError):
            logger.warning("Answer without times")
            continue
        except Exception as e:
            logger.warning("Unknown error: " + str(e))
            continue

    return csv_array, total_time

run(cod_stops, fetch_conf)

Async function that generates a aiohttp ClientSession and fetches the given stops.

Parameters:

Name Type Description Default
cod_stops list

List of stop codes in CRTM's format (e.g. 8_17491).

required
fetch_conf dict

Dictionary with configuration parameters for fetching the content.

required

Returns:

Type Description
list

The responses.

Source code in crtm_poll/crtm_api/stop_times.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
async def run(cod_stops, fetch_conf):
    """Async function that generates a aiohttp ClientSession and fetches the
    given stops.

    Arguments:
        cod_stops (list): List of stop codes in CRTM's format (e.g. 8_17491).
        fetch_conf (dict): Dictionary with configuration parameters for
            fetching the content.

    Returns:
        list: The responses.
    """
    tasks = []

    # Fetch all responses within one Client session,
    # keep connection alive for all requests.
    connector = TCPConnector(limit=fetch_conf['max_connections'])
    timeout = ClientTimeout(total=fetch_conf['timeout'])
    async with ClientSession(connector=connector, timeout=timeout) as session:
        for cod_stop in cod_stops:

            task = asyncio.ensure_future(fetch(cod_stop, session, fetch_conf))
            tasks.append(task)

        responses = await asyncio.gather(*tasks)
        # you now have all response bodies in this variable
        return responses

Last update: