-
Notifications
You must be signed in to change notification settings - Fork 6.7k
Expand file tree
/
Copy pathbulk_insert.py
More file actions
191 lines (158 loc) · 7.74 KB
/
bulk_insert.py
File metadata and controls
191 lines (158 loc) · 7.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# flake8: noqa
# This file is automatically generated. Please do not modify it directly.
# Find the relevant recipe file in the samples/recipes or samples/ingredients
# directory and apply your changes there.
# [START compute_instances_bulk_insert]
import sys
from typing import Any, Iterable, Optional
import uuid
from google.api_core.extended_operation import ExtendedOperation
from google.cloud import compute_v1
def wait_for_extended_operation(
operation: ExtendedOperation, verbose_name: str = "operation", timeout: int = 300
) -> Any:
"""
This method will wait for the extended (long-running) operation to
complete. If the operation is successful, it will return its result.
If the operation ends with an error, an exception will be raised.
If there were any warnings during the execution of the operation
they will be printed to sys.stderr.
Args:
operation: a long-running operation you want to wait on.
verbose_name: (optional) a more verbose name of the operation,
used only during error and warning reporting.
timeout: how long (in seconds) to wait for operation to finish.
If None, wait indefinitely.
Returns:
Whatever the operation.result() returns.
Raises:
This method will raise the exception received from `operation.exception()`
or RuntimeError if there is no exception set, but there is an `error_code`
set for the `operation`.
In case of an operation taking longer than `timeout` seconds to complete,
a `concurrent.futures.TimeoutError` will be raised.
"""
result = operation.result(timeout=timeout)
if operation.error_code:
print(
f"Error during {verbose_name}: [Code: {operation.error_code}]: {operation.error_message}",
file=sys.stderr,
flush=True,
)
print(f"Operation ID: {operation.name}", file=sys.stderr, flush=True)
raise operation.exception() or RuntimeError(operation.error_message)
if operation.warnings:
print(f"Warnings during {verbose_name}:\n", file=sys.stderr, flush=True)
for warning in operation.warnings:
print(f" - {warning.code}: {warning.message}", file=sys.stderr, flush=True)
return result
def get_instance_template(
project_id: str, template_name: str
) -> compute_v1.InstanceTemplate:
"""
Retrieve an instance template, which you can use to create virtual machine
(VM) instances and managed instance groups (MIGs).
Args:
project_id: project ID or project number of the Cloud project you use.
template_name: name of the template to retrieve.
Returns:
InstanceTemplate object that represents the retrieved template.
"""
template_client = compute_v1.InstanceTemplatesClient()
return template_client.get(project=project_id, instance_template=template_name)
def bulk_insert_instance(
project_id: str,
zone: str,
template: compute_v1.InstanceTemplate,
count: int,
name_pattern: str,
min_count: Optional[int] = None,
labels: Optional[dict] = None,
) -> Iterable[compute_v1.Instance]:
"""
Create multiple VMs based on an Instance Template. The newly created instances will
be returned as a list and will share a label with key `bulk_batch` and a random
value.
If the bulk insert operation fails and the requested number of instances can't be created,
and more than min_count instances are created, then those instances can be found using
the `bulk_batch` label with value attached to the raised exception in bulk_batch_id
attribute. So, you can use the following filter: f"label.bulk_batch={err.bulk_batch_id}"
when listing instances in a zone to get the instances that were successfully created.
Args:
project_id: project ID or project number of the Cloud project you want to use.
zone: name of the zone to create the instance in. For example: "us-west3-b"
template: an Instance Template to be used for creation of the new VMs.
name_pattern: The string pattern used for the names of the VMs. The pattern
must contain one continuous sequence of placeholder hash characters (#)
with each character corresponding to one digit of the generated instance
name. Example: a name_pattern of inst-#### generates instance names such
as inst-0001 and inst-0002. If existing instances in the same project and
zone have names that match the name pattern then the generated instance
numbers start after the biggest existing number. For example, if there
exists an instance with name inst-0050, then instance names generated
using the pattern inst-#### begin with inst-0051. The name pattern
placeholder #...# can contain up to 18 characters.
count: The maximum number of instances to create.
min_count (optional): The minimum number of instances to create. If no min_count is
specified then count is used as the default value. If min_count instances
cannot be created, then no instances will be created and instances already
created will be deleted.
labels (optional): A dictionary with labels to be added to the new VMs.
"""
bulk_insert_resource = compute_v1.BulkInsertInstanceResource()
bulk_insert_resource.source_instance_template = template.self_link
bulk_insert_resource.count = count
bulk_insert_resource.min_count = min_count or count
bulk_insert_resource.name_pattern = name_pattern
if not labels:
labels = {}
labels["bulk_batch"] = uuid.uuid4().hex
instance_prop = compute_v1.InstanceProperties()
instance_prop.labels = labels
bulk_insert_resource.instance_properties = instance_prop
bulk_insert_request = compute_v1.BulkInsertInstanceRequest()
bulk_insert_request.bulk_insert_instance_resource_resource = bulk_insert_resource
bulk_insert_request.project = project_id
bulk_insert_request.zone = zone
client = compute_v1.InstancesClient()
operation = client.bulk_insert(bulk_insert_request)
try:
wait_for_extended_operation(operation, "bulk instance creation")
except Exception as err:
err.bulk_batch_id = labels["bulk_batch"]
raise err
list_req = compute_v1.ListInstancesRequest()
list_req.project = project_id
list_req.zone = zone
list_req.filter = " AND ".join(
f"labels.{key}:{value}" for key, value in labels.items()
)
return client.list(list_req)
def create_five_instances(
project_id: str, zone: str, template_name: str, name_pattern: str
):
"""
Create five instances of an instance template.
Args:
project_id: project ID or project number of the Cloud project you want to use.
zone: name of the zone to create the instance in. For example: "us-west3-b"
template_name: name of the template that will be used to create new VMs.
name_pattern: The string pattern used for the names of the VMs.
"""
template = get_instance_template(project_id, template_name)
instances = bulk_insert_instance(project_id, zone, template, 5, name_pattern)
return instances
# [END compute_instances_bulk_insert]