Page tree

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In our example we run a machine learning algorithm which produces a flare prediction to store within our database. Hereby, the algorithm consists of a training phase and a test or prediction phase. Within the training phase the algorithm learns and tunes its parameters which then can be stored within the database as a configuration for later use. Afterwards, within the test phase, we use this configuration to compute flare predictions which are also stored within the database. The following code shows the two corresponding functions.

Code Block
languagepy
linenumberstrue
def train_model(model, train_data, validation_data, max_epoches, batch_size, environment):
    # train model (e.g. until max_epoches are reached or validation loss increases)
	model.train(train_data, validation_data, max_epoches, batch_size)
	# store model parameters within database
    post_data = {
        "algorithm_run_id": environment.r_id,
        "config_data": model.get_parameters(),
        "description": {},
    }
    response = requests.post('http://localhost:8004/algoconfig/%s' % cfg_name, data=post_data)
    if response['has_erro'] == True:
        raise ValueError('An error occured while storing the algorithm's configuration:\n%s' % response['error'])
    return response['data']

def test_model(model, test_data, environment):
    # test model (e.g. predict the test_data)
	(    model.run(test_data)
    (time_start, position_hg, prediction_data) = model.get_prediction()
	# store predictions within database
    post_data = [
        {
            "algorithm_config": environment['algorithm']['cfg_name'],
            "algorithm_run_id": environment['runtime']['run_id'],
            "lat_hg": position_hg[0],
            "long_hg": position_hg[1],
            "prediction_data": prediction_data,
            "source_data": [get_fc_id(row) for row in test_data],
            "time_start": time_start
        }
    ]
    response = requests.post('http://localhost:8004/prediction/bulk', data=post_data)

Integration

Given the two above functions we can now define our algorithm's workflow.

Code Block
languagepy
linenumberstrue
import
json import requests  # Setup environment
environment = {}
with open("params.json") as params_file:
    environment = json.loads(params_file.read())

# Download regions and properties
response = requests.get(
    'http://localhost:8002/region/%s/list?cadence=%s&time_start=between(%s,%s)'
    % (
        environment['algorithm']['dataset'],
        environment['algorithm']['cadence'],
        environment['algorithm']['time_start'],
        environment['algorithm']['time_end']
    )
).json()
if response['has_errorerro'] == True:
    raise ValueError('No data found!')
else:
    # if we already have an algorithm configuration stored within the database,
    # we do not need to extract any train not validation data
    (train_data, validation_data, test_data) = createDataPartitions(response['data'])

# Setup model
model = MyMLAlgorithm(envirnoment['algorithm']['params'])

# Check wherever we have to train our algorithm or if we can download an already existing configuration
response = requests.get(
    'http://localhost:8004/algoconfig/list?algorithm_config_name=%s&algorithm_config_version=%s'
    % (environment['algorithm']['cfg_name'], 'latest')
).json()
if response['has_error'] == False and response['result-count'] > 0:
    # as we requested the latest configuration we expect only one result within 'data'
    algo_cfg = response['data'][0]
    model.set_parameters(algo_cfg)
else:
    train_model(
        model, train_data, validation_data,
        envirnoment['algorithm']['max_epoches'], envirnoment['algorithm']['batch_size'],
        environment
    )


Now the problem is that the ml_result is not structured in the way that the prediction service would understand it. So we have to restructure it to accomplish following definition:

...

 raise ValueError('An error occured while storing the algorithm's prediction:\n%s' % response['error'])
    return response['data']

The post_data structure is equivalent to the the algorithm_config_data or prediction_data definitions as given by the routes /algoconfig/{name} and /prediction/bulk:

Image AddedImage Added
  • Every algorithm_configuration_data needs at least an algorithm_run_id and
    config_data attribute.
  • Every specific configuration value has to be added to the config_data attribute and has
    to be a key-value pair.
  • Every prediction_data needs at least an algorithm_run_id, prediction_data, source_data and
    time_start attribute.
  • Every specific prediction value has to be added to the prediction_data attribute and has to be
    a key-value pair.

This would then look like this:

...

languagepy

...

 

...

Ingest

Now the ingest of this post_data is very simple. We first have to add the new dataset to the prediction service and then add a new configuration set.

Code Block
languagepy
# add dataset
print('creating dataset...')
requests.post("http://localhost:8004/dataset/bulk", json=ml_datasets)

# add configuration set
print('storing data...')
requests.post("http://localhost:8004/configset/%s" % ml_datasets[0]['name'], json=post_data)

The addresses of the routes are the same we looked up before.

Retrieve

Now to check if everything worked we should retrieve all the configuration sets which are stored under this dataset. This can be done with a GET request.

Code Block
languagepy
# retrieving data
print('downloading all data...')
ml_config_sets = requests.get("http://localhost:8004/configset/%s/list" % ml_datasets[0]['name']).json()

print(ml_config_sets)

Source Code

Here you can download the full python source code.

prediction_request_ingest.py

A more complete example using the above functions is given by the article 'Request prediction data from database (REST API)'.

 

Info

This page was adopted from Ingest property data in database (REST API).

 

params_file