Content-Length: 587935 | pFad | http://github.com/googleapis/python-pubsub/commit/0f9df7a08bdf5fd22605a771872a29449bfeef01

09 Added sample for publishing/receiving messages with custom attributes… · googleapis/python-pubsub@0f9df7a · GitHub
Skip to content

Commit 0f9df7a

Browse files
chenyumicplamut
chenyumic
authored andcommitted
Added sample for publishing/receiving messages with custom attributes [(#1409)](GoogleCloudPlatform/python-docs-samples#1409)
1 parent b24f725 commit 0f9df7a

File tree

4 files changed

+84
-0
lines changed

4 files changed

+84
-0
lines changed

samples/snippets/publisher.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,23 @@ def publish_messages(project, topic_name):
6969
print('Published messages.')
7070

7171

72+
def publish_messages_with_custom_attributes(project, topic_name):
73+
"""Publishes multiple messages with custom attributes
74+
to a Pub/Sub topic."""
75+
publisher = pubsub_v1.PublisherClient()
76+
topic_path = publisher.topic_path(project, topic_name)
77+
78+
for n in range(1, 10):
79+
data = u'Message number {}'.format(n)
80+
# Data must be a bytestring
81+
data = data.encode('utf-8')
82+
# Add two attributes, origen and username, to the message
83+
publisher.publish(
84+
topic_path, data, origen='python-sample', username='gcp')
85+
86+
print('Published messages with custom attributes.')
87+
88+
7289
def publish_messages_with_futures(project, topic_name):
7390
"""Publishes multiple messages to a Pub/Sub topic and prints their
7491
message IDs."""
@@ -132,6 +149,11 @@ def publish_messages_with_batch_settings(project, topic_name):
132149
'publish', help=publish_messages.__doc__)
133150
publish_parser.add_argument('topic_name')
134151

152+
publish_with_custom_attributes_parser = subparsers.add_parser(
153+
'publish-with-custom-attributes',
154+
help=publish_messages_with_custom_attributes.__doc__)
155+
publish_with_custom_attributes_parser.add_argument('topic_name')
156+
135157
publish_with_futures_parser = subparsers.add_parser(
136158
'publish-with-futures',
137159
help=publish_messages_with_futures.__doc__)
@@ -152,6 +174,8 @@ def publish_messages_with_batch_settings(project, topic_name):
152174
delete_topic(args.project, args.topic_name)
153175
elif args.command == 'publish':
154176
publish_messages(args.project, args.topic_name)
177+
elif args.command == 'publish-with-custom-attributes':
178+
publish_messages_with_custom_attributes(args.project, args.topic_name)
155179
elif args.command == 'publish-with-futures':
156180
publish_messages_with_futures(args.project, args.topic_name)
157181
elif args.command == 'publish-with-batch-settings':

samples/snippets/publisher_test.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,13 @@ def test_publish(topic, capsys):
8181
assert 'Published' in out
8282

8383

84+
def test_publish_with_custom_attributes(topic, capsys):
85+
publisher.publish_messages_with_custom_attributes(PROJECT, TOPIC)
86+
87+
out, _ = capsys.readouterr()
88+
assert 'Published' in out
89+
90+
8491
def test_publish_with_batch_settings(topic, capsys):
8592
publisher.publish_messages_with_batch_settings(PROJECT, TOPIC)
8693

samples/snippets/subscriber.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,30 @@ def callback(message):
143143
time.sleep(60)
144144

145145

146+
def receive_messages_with_custom_attributes(project, subscription_name):
147+
"""Receives messages from a pull subscription."""
148+
subscriber = pubsub_v1.SubscriberClient()
149+
subscription_path = subscriber.subscription_path(
150+
project, subscription_name)
151+
152+
def callback(message):
153+
print('Received message: {}'.format(message.data))
154+
if message.attributes:
155+
print('Attributes:')
156+
for key in message.attributes:
157+
value = message.attributes.get(key)
158+
print('{}: {}'.format(key, value))
159+
message.ack()
160+
161+
subscriber.subscribe(subscription_path, callback=callback)
162+
163+
# The subscriber is non-blocking, so we must keep the main thread from
164+
# exiting to allow it to process messages in the background.
165+
print('Listening for messages on {}'.format(subscription_path))
166+
while True:
167+
time.sleep(60)
168+
169+
146170
def receive_messages_with_flow_control(project, subscription_name):
147171
"""Receives messages from a pull subscription with flow control."""
148172
subscriber = pubsub_v1.SubscriberClient()
@@ -227,6 +251,11 @@ def callback(message):
227251
'receive', help=receive_messages.__doc__)
228252
receive_parser.add_argument('subscription_name')
229253

254+
receive_with_custom_attributes_parser = subparsers.add_parser(
255+
'receive-custom-attributes',
256+
help=receive_messages_with_custom_attributes.__doc__)
257+
receive_with_custom_attributes_parser.add_argument('subscription_name')
258+
230259
receive_with_flow_control_parser = subparsers.add_parser(
231260
'receive-flow-control',
232261
help=receive_messages_with_flow_control.__doc__)
@@ -259,6 +288,9 @@ def callback(message):
259288
args.project, args.subscription_name, args.endpoint)
260289
elif args.command == 'receive':
261290
receive_messages(args.project, args.subscription_name)
291+
elif args.command == 'receive-custom-attributes':
292+
receive_messages_with_custom_attributes(
293+
args.project, args.subscription_name)
262294
elif args.command == 'receive-flow-control':
263295
receive_messages_with_flow_control(
264296
args.project, args.subscription_name)

samples/snippets/subscriber_test.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,11 @@ def _publish_messages(publisher_client, topic):
129129
topic, data=data)
130130

131131

132+
def _publish_messages_with_custom_attributes(publisher_client, topic):
133+
data = u'Test message'.encode('utf-8')
134+
publisher_client.publish(topic, data=data, origen='python-sample')
135+
136+
132137
def _make_sleep_patch():
133138
real_sleep = time.sleep
134139

@@ -155,6 +160,22 @@ def test_receive(publisher_client, topic, subscription, capsys):
155160
assert 'Message 1' in out
156161

157162

163+
def test_receive_with_custom_attributes(
164+
publisher_client, topic, subscription, capsys):
165+
_publish_messages_with_custom_attributes(publisher_client, topic)
166+
167+
with _make_sleep_patch():
168+
with pytest.raises(RuntimeError, match='sigil'):
169+
subscriber.receive_messages_with_custom_attributes(
170+
PROJECT, SUBSCRIPTION)
171+
172+
out, _ = capsys.readouterr()
173+
assert 'Test message' in out
174+
assert 'Attributes' in out
175+
assert 'origen' in out
176+
assert 'python-sample' in out
177+
178+
158179
def test_receive_with_flow_control(
159180
publisher_client, topic, subscription, capsys):
160181
_publish_messages(publisher_client, topic)

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/googleapis/python-pubsub/commit/0f9df7a08bdf5fd22605a771872a29449bfeef01

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy