...
In our example we run a machine learning algorithm which produces a flare prediction to store within our database. Hereby, the algorithm consists of a training phase and a test or prediction phase. Within the training phase the algorithm learns and tunes its parameters which then can be stored within the database as a configuration for later use. Afterwards, within the test phase, we use this configuration to compute flare predictions which are also stored within the database. The following code shows the two corresponding functions.
Code Block | ||||
---|---|---|---|---|
| ||||
def train_model(model, train_data, validation_data, max_epoches, batch_size, environment): # train model (e.g. until max_epoches are reached or validation loss increases) model.train(train_data, validation_data, max_epoches, batch_size) # store model parameters within database post_data = { "algorithm_run_id": environment.r_id, "config_data": model.get_parameters(), "description": {}, } response = requests.post('http://localhost:8004/algoconfig/%s' % cfg_name, data=post_data) if response['has_erro'] == True: raise ValueError('An error occured while storing the algorithm's configuration:\n%s' % response['error']) return response['data'] def test_model(model, test_data, environment): # test model (e.g. predict the test_data) ( model.run(test_data) (time_start, position_hg, prediction_data) = model.get_prediction() # store predictions within database post_data = [ { "algorithm_config": environment['algorithm']['cfg_name'], "algorithm_run_id": environment['runtime']['run_id'], "lat_hg": position_hg[0], "long_hg": position_hg[1], "prediction_data": prediction_data, "source_data": [get_fc_id(row) for row in test_data], "time_start": time_start } ] response = requests.post('http://localhost:8004/prediction/bulk', data=post_data) |
Integration
Given the two above functions we can now define our algorithm's workflow.
Code Block | ||||
---|---|---|---|---|
| ||||
import json import requests # Setup environment environment = {} with open("params.json") as params_file: environment = json.loads(params_file.read()) # Download regions and properties response = requests.get( 'http://localhost:8002/region/%s/list?cadence=%s&time_start=between(%s,%s)' % ( environment['algorithm']['dataset'], environment['algorithm']['cadence'], environment['algorithm']['time_start'], environment['algorithm']['time_end'] ) ).json() if response['has_errorerro'] == True: raise ValueError('No data found!') else: # if we already have an algorithm configuration stored within the database, # we do not need to extract any train not validation data (train_data, validation_data, test_data) = createDataPartitions(response['data']) # Setup model model = MyMLAlgorithm(envirnoment['algorithm']['params']) # Check wherever we have to train our algorithm or if we can download an already existing configuration response = requests.get( 'http://localhost:8004/algoconfig/list?algorithm_config_name=%s&algorithm_config_version=%s' % (environment['algorithm']['cfg_name'], 'latest') ).json() if response['has_error'] == False and response['result-count'] > 0: # as we requested the latest configuration we expect only one result within 'data' algo_cfg = response['data'][0] model.set_parameters(algo_cfg) else: train_model( model, train_data, validation_data, envirnoment['algorithm']['max_epoches'], envirnoment['algorithm']['batch_size'], environment ) |
Now the problem is that the ml_result is not structured in the way that the prediction service would understand it. So we have to restructure it to accomplish following definition:
...
raise ValueError('An error occured while storing the algorithm's prediction:\n%s' % response['error'])
return response['data'] |
The post_data structure is equivalent to the the algorithm_config_data or prediction_data definitions as given by the routes /algoconfig/{name} and /prediction/bulk:
|
|
This would then look like this:
...
language | py |
---|
...
...
Ingest
Now the ingest of this post_data is very simple. We first have to add the new dataset to the prediction service and then add a new configuration set.
Code Block | ||
---|---|---|
| ||
# add dataset
print('creating dataset...')
requests.post("http://localhost:8004/dataset/bulk", json=ml_datasets)
# add configuration set
print('storing data...')
requests.post("http://localhost:8004/configset/%s" % ml_datasets[0]['name'], json=post_data) |
The addresses of the routes are the same we looked up before.
Retrieve
Now to check if everything worked we should retrieve all the configuration sets which are stored under this dataset. This can be done with a GET request.
Code Block | ||
---|---|---|
| ||
# retrieving data
print('downloading all data...')
ml_config_sets = requests.get("http://localhost:8004/configset/%s/list" % ml_datasets[0]['name']).json()
print(ml_config_sets) |
Source Code
Here you can download the full python source code.
A more complete example using the above functions is given by the article 'Request prediction data from database (REST API)'.
Info |
---|
This page was adopted from Ingest property data in database (REST API). |
params_file