This was a fairly simple experiment. I wanted to migrate some data I had in a MySQL database into a brand new InfluxDB installation. It seemed simple enough. Just take a MySQL table and its columns and dump them into InfluxDB, where the table name becomes the measurement name, and the columns becomes fields or tags. There would need to be some control over which column becomes a field and which becomes a tag. Additionally, for large tables, there would need to be a provision for batching the transfer of data, which means we will need to keep track of how much data has already been processed.

So I can up with the script below. Here’s how it works:

  1. We define the connection credentials for MySQL and InfluxDB. We also define the source MySQL database and the target InfluxDB database.
  2. We specify a list of tables we want to migrate, which contain the source table name, the target measurement name where the data for that table goes, and a list of columns to move over. For columns, we need their names, a flag to indicate whether they should be moved as fields or tags, and the data type of that column so we can define default values accordingly in case the column comes up as N ULL.
  3. We also specifically mention the name of the timestamp column for that table.
  4. We also need the name of the auto increment column for that table, so we can keep track of how much data we’ve processed.

Once the above configuration has been defined, its pretty simple:

  1. We assemble a SQL query based on the configuration above.
  2. We fetch a limited number of rows from MySQL, process them according to tag/field specifications, and put them in InfluxDB. We keep a track of the last auto increment ID we processed, so the next time we only get data after that.

Have a look:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import MySQLdb
from influxdb import InfluxDBClient
import datetime


def file_read(path: str):
    """ Read a file and return content. """
    
    try:
        handler = open(path, 'r')
        data = handler.read()
        handler.close()
        return data
    
    except Exception as e:
        print('Exception: %s' % str(e))
        return None


def file_write(path: str, mode: str, content: str):
    """ Write content to a file. """
    
    handler = open(path, mode)
    
    handler.write(content)
    
    handler.close()


def get_data_from_mysql(host: str, username: str, password: str, db: str, sql: str):
    """ Run SQL query and get data from MySQL table. """
    
    db = MySQLdb.connect(host, username, password, db)
    
    cursor = db.cursor(MySQLdb.cursors.DictCursor)
    
    try:
        cursor.execute(sql)
        data = cursor.fetchall()
    except Exception as e:
        print("MySQL error %s: %s" % (e.args[0], e.args[1]))
        data = None
    
    db.close()
    
    return data


main_config = {
    'state_file_path'  : '/tmp/migrate_mysql_to_influxdb_state_file_',
    'mysql_host'       : 'localhost',
    'mysql_username'   : 'username',
    'mysql_password'   : 'password',
    'mysql_database'   : 'db',
    'influxdb_host'    : 'localhost',
    'influxdb_port'    : '8086',
    'influxdb_database': 'db',
    'tables'           : [
        {
            'table_name'           : 'table_1',
            'measurement_name'     : 'table_1',
            'columns'              : [
                {
                    'column_name': 'column_1',
                    'is_tag'     : True,
                    'type'       : 'string'
                },
                {
                    'column_name': 'column_2',
                    'is_tag'     : False,
                    'type'       : 'float'
                }
            ],
            'unix_timestamp_column': 'timestamp',
            'auto_increment_column': 'id'
        },
        {
            'table_name'           : 'table_2',
            'measurement_name'     : 'table_2',
            'columns'              : [
                {
                    'column_name': 'column_1',
                    'is_tag'     : True,
                    'type'       : 'string'
                },
                {
                    'column_name': 'column_2',
                    'is_tag'     : False,
                    'type'       : 'string'
                }
            ],
            'unix_timestamp_column': 'timestamp',
            'auto_increment_column': 'id'
        }
    ]
}

if __name__ == '__main__':
    
    mysql_tables = main_config['tables']
    state_file_prefix = main_config['state_file_path']
    
    for table in mysql_tables:
        
        table_name = table['table_name']
        measurement = table['measurement_name']
        state_file = state_file_prefix + table_name
        
        last_state_value = file_read(state_file) if file_read(state_file) else '0'
        
        columns_list = [table['unix_timestamp_column'], table['auto_increment_column']]
        default_values = {}
        tags_list = []
        for item in table['columns']:
            
            columns_list.append(item['column_name'])
            
            if item['type'] == 'string':
                
                default_values[item['column_name']] = ''
            if item['type'] == 'int':
                
                default_values[item['column_name']] = 0
            if item['type'] == 'float':
                
                default_values[item['column_name']] = 0.0
            
            if item['is_tag']:
                
                tags_list.append(item['column_name'])
        
        columns_list = ','.join(columns_list)
        
        sql = 'SELECT ' + columns_list + ' FROM ' + table_name + ' WHERE ' + table[
            'auto_increment_column'] + ' > ' + last_state_value + ' LIMIT 10000'
        
        data = get_data_from_mysql(host=main_config['mysql_host'], username=main_config['mysql_username'],
                                   password=main_config['mysql_password'], db=main_config['mysql_database'], sql=sql)
        
        influxdb_data = []
        
        if len(data) > 0:
            
            influxdb_client = InfluxDBClient(main_config['influxdb_host'], main_config['influxdb_port'], '', '',
                                             main_config['influxdb_database'])
            
            for item in data:
                
                timestamp = 0
                fields = {}
                tags = {}
                max_auto_increment_value = 0
                for key, value in item.items():
                    
                    if key == table['auto_increment_column']:
                        
                        max_auto_increment_value = value
                    
                    elif key == table['unix_timestamp_column']:
                        
                        timestamp = datetime.datetime.fromtimestamp(value).isoformat()
                    else:
                        
                        if key in tags_list:
                            
                            tags[key] = value
                        else:
                            
                            fields[key] = value if value else default_values[key]
                
                data_point = {
                    "measurement": measurement,
                    "tags"       : tags,
                    "time"       : timestamp,
                    "fields"     : fields
                }
                
                influxdb_data.append(data_point)
                
                file_write(state_file, 'w', str(max_auto_increment_value))
            
            influxdb_client.write_points(influxdb_data)
            
            print('Written ' + str(len(data)) + ' points for table ' + table_name + '.')
        
        else:
            
            print('No data retrieved from MySQL for table ' + table_name + '.')

The source code can also be found here.