Skip to content

Commit 028a15d

Browse files
committed
Merge pull request #809 from tseaver/797-pubsub-topic_publish_w_timestamp
Issue #797: support auto-adding timestamp to pubsub messages.
2 parents 4c63b2a + 627e0de commit 028a15d

2 files changed

Lines changed: 81 additions & 4 deletions

File tree

gcloud/pubsub/test_topic.py

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,22 @@ def test_ctor_wo_inferred_project_or_connection(self):
3838
self.assertEqual(topic.full_name,
3939
'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME))
4040
self.assertTrue(topic.connection is conn)
41+
self.assertFalse(topic.timestamp_messages)
4142

42-
def test_ctor_w_explicit_project_and_connection(self):
43+
def test_ctor_w_explicit_project_connection_and_timestamp(self):
4344
TOPIC_NAME = 'topic_name'
4445
PROJECT = 'PROJECT'
4546
conn = _Connection()
46-
topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn)
47+
topic = self._makeOne(TOPIC_NAME,
48+
project=PROJECT,
49+
connection=conn,
50+
timestamp_messages=True)
4751
self.assertEqual(topic.name, TOPIC_NAME)
4852
self.assertEqual(topic.project, PROJECT)
4953
self.assertEqual(topic.full_name,
5054
'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME))
5155
self.assertTrue(topic.connection is conn)
56+
self.assertTrue(topic.timestamp_messages)
5257

5358
def test_from_api_repr_wo_connection(self):
5459
from gcloud.pubsub._testing import _monkey_defaults
@@ -146,6 +151,67 @@ def test_publish_single_bytes_wo_attrs(self):
146151
self.assertEqual(req['path'], '/%s:publish' % PATH)
147152
self.assertEqual(req['data'], {'messages': [MESSAGE]})
148153

154+
def test_publish_single_bytes_wo_attrs_w_add_timestamp(self):
155+
import base64
156+
import datetime
157+
from gcloud.pubsub import topic as MUT
158+
from gcloud._testing import _Monkey
159+
NOW = datetime.datetime.utcnow()
160+
161+
def _utcnow():
162+
return NOW
163+
164+
TOPIC_NAME = 'topic_name'
165+
PROJECT = 'PROJECT'
166+
PAYLOAD = b'This is the message text'
167+
B64 = base64.b64encode(PAYLOAD).decode('ascii')
168+
MSGID = 'DEADBEEF'
169+
MESSAGE = {'data': B64,
170+
'attributes': {'timestamp': '%sZ' % NOW.isoformat()}}
171+
PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
172+
conn = _Connection({'messageIds': [MSGID]})
173+
topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn,
174+
timestamp_messages=True)
175+
with _Monkey(MUT, _NOW=_utcnow):
176+
msgid = topic.publish(PAYLOAD)
177+
self.assertEqual(msgid, MSGID)
178+
self.assertEqual(len(conn._requested), 1)
179+
req = conn._requested[0]
180+
self.assertEqual(req['method'], 'POST')
181+
self.assertEqual(req['path'], '/%s:publish' % PATH)
182+
self.assertEqual(req['data'], {'messages': [MESSAGE]})
183+
184+
def test_publish_single_bytes_w_add_timestamp_w_ts_in_attrs(self):
185+
import base64
186+
import datetime
187+
from gcloud.pubsub import topic as MUT
188+
from gcloud._testing import _Monkey
189+
NOW = datetime.datetime.utcnow()
190+
191+
def _utcnow(): # pragma: NO COVER
192+
return NOW
193+
194+
TOPIC_NAME = 'topic_name'
195+
PROJECT = 'PROJECT'
196+
PAYLOAD = b'This is the message text'
197+
B64 = base64.b64encode(PAYLOAD).decode('ascii')
198+
MSGID = 'DEADBEEF'
199+
OVERRIDE = '2015-04-10T16:46:22.868399Z'
200+
MESSAGE = {'data': B64,
201+
'attributes': {'timestamp': OVERRIDE}}
202+
PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
203+
conn = _Connection({'messageIds': [MSGID]})
204+
topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn,
205+
timestamp_messages=True)
206+
with _Monkey(MUT, _NOW=_utcnow):
207+
msgid = topic.publish(PAYLOAD, timestamp=OVERRIDE)
208+
self.assertEqual(msgid, MSGID)
209+
self.assertEqual(len(conn._requested), 1)
210+
req = conn._requested[0]
211+
self.assertEqual(req['method'], 'POST')
212+
self.assertEqual(req['path'], '/%s:publish' % PATH)
213+
self.assertEqual(req['data'], {'messages': [MESSAGE]})
214+
149215
def test_publish_single_w_attrs(self):
150216
import base64
151217
TOPIC_NAME = 'topic_name'

gcloud/pubsub/topic.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@
1515
"""Define API Topics."""
1616

1717
import base64
18+
import datetime
1819

1920
from gcloud._helpers import get_default_project
2021
from gcloud.exceptions import NotFound
2122
from gcloud.pubsub._implicit_environ import get_default_connection
2223

24+
_NOW = datetime.datetime.utcnow
25+
2326

2427
class Topic(object):
2528
"""Topics are targets to which messages can be published.
@@ -39,16 +42,22 @@ class Topic(object):
3942
:type connection: :class:gcloud.pubsub.connection.Connection
4043
:param connection: the connection to use. If not passed,
4144
falls back to the default inferred from the
42-
environment.
45+
46+
:type timestamp_messages: boolean
47+
:param timestamp_messages: If true, the topic will add a ``timestamp`` key
48+
to the attributes of each published message:
49+
the value will be an RFC 3339 timestamp.
4350
"""
44-
def __init__(self, name, project=None, connection=None):
51+
def __init__(self, name, project=None, connection=None,
52+
timestamp_messages=False):
4553
if project is None:
4654
project = get_default_project()
4755
if connection is None:
4856
connection = get_default_connection()
4957
self.name = name
5058
self.project = project
5159
self.connection = connection
60+
self.timestamp_messages = timestamp_messages
5261

5362
@classmethod
5463
def from_api_repr(cls, resource, connection=None):
@@ -113,6 +122,8 @@ def publish(self, message, **attrs):
113122
:rtype: str
114123
:returns: message ID assigned by the server to the published message
115124
"""
125+
if self.timestamp_messages and 'timestamp' not in attrs:
126+
attrs['timestamp'] = '%sZ' % _NOW().isoformat()
116127
message_b = base64.b64encode(message).decode('ascii')
117128
message_data = {'data': message_b, 'attributes': attrs}
118129
data = {'messages': [message_data]}

0 commit comments

Comments
 (0)