forked from capnproto/pycapnp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathasync_socket_message_server.py
62 lines (43 loc) · 1.54 KB
/
async_socket_message_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#!/usr/bin/env python3
import argparse
import asyncio
import capnp
import addressbook_capnp
async def writeAddressBook(stream, alice_id):
addresses = addressbook_capnp.AddressBook.new_message()
people = addresses.init("people", 1)
alice = people[0]
alice.id = alice_id
alice.name = "Alice"
alice.email = "alice@example.com"
alicePhones = alice.init("phones", 1)
alicePhones[0].number = "555-1212"
alicePhones[0].type = "mobile"
alice.employment.school = "MIT"
await addresses.write_async(stream)
async def new_connection(stream):
message = await addressbook_capnp.AddressBook.read_async(stream)
print(message)
assert message.people[0].name == "Bob"
assert message.people[0].id == 0
await writeAddressBook(stream, 0)
message = await addressbook_capnp.AddressBook.read_async(stream)
print(message)
assert message.people[0].name == "Bob"
assert message.people[0].id == 1
message = await addressbook_capnp.AddressBook.read_async(stream)
print(message)
assert message is None
def parse_args():
parser = argparse.ArgumentParser(
usage="""Runs the server bound to the given address/port ADDRESS. """
)
parser.add_argument("address", help="ADDRESS:PORT")
return parser.parse_args()
async def main():
host, port = parse_args().address.split(":")
server = await capnp.AsyncIoStream.create_server(new_connection, host, port)
async with server:
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(capnp.run(main()))